ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5
6use ferriskey::{Client, Value};
7use ff_core::keys::{ExecKeyContext, IndexKeys};
8use ff_core::partition::PartitionConfig;
9use ff_core::types::*;
10use tokio::sync::Semaphore;
11
12use crate::config::WorkerConfig;
13use crate::task::ClaimedTask;
14use crate::SdkError;
15
16/// FlowFabric worker — connects to Valkey, claims executions, and provides
17/// the worker-facing API.
18///
19/// # Admission control
20///
21/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
22/// **bypasses the scheduler's admission controls**: it reads the eligible
23/// ZSET directly and mints its own claim grant without consulting budget
24/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
25/// benchmarks, tests, and single-tenant development where the scheduler
26/// hop is measurement noise, not for production.
27///
28/// For production deployments, consume scheduler-issued grants via
29/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
30/// budget breach, quota sliding-window, concurrency cap, and
31/// capability-match checks before issuing grants.
32///
33/// # Usage
34///
35/// ```rust,ignore
36/// use ff_core::backend::BackendConfig;
37/// use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
38/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
39///
40/// let config = WorkerConfig {
41/// backend: BackendConfig::valkey("localhost", 6379),
42/// worker_id: WorkerId::new("w1"),
43/// worker_instance_id: WorkerInstanceId::new("w1-i1"),
44/// namespace: Namespace::new("default"),
45/// lanes: vec![LaneId::new("main")],
46/// capabilities: Vec::new(),
47/// lease_ttl_ms: 30_000,
48/// claim_poll_interval_ms: 1_000,
49/// max_concurrent_tasks: 1,
50/// };
51/// let worker = FlowFabricWorker::connect(config).await?;
52///
53/// loop {
54/// if let Some(task) = worker.claim_next().await? {
55/// // Process task...
56/// task.complete(Some(b"result".to_vec())).await?;
57/// } else {
58/// tokio::time::sleep(Duration::from_secs(1)).await;
59/// }
60/// }
61/// ```
62pub struct FlowFabricWorker {
63 client: Client,
64 config: WorkerConfig,
65 partition_config: PartitionConfig,
66 /// Sorted, deduplicated, comma-separated capabilities — computed once
67 /// from `config.capabilities` at connect time. Passed as ARGV[9] to
68 /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
69 /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
70 /// reproducible logs and tests.
71 #[cfg(feature = "direct-valkey-claim")]
72 worker_capabilities_csv: String,
73 /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
74 /// per-mismatch logs so the 4KB CSV never echoes on every reject
75 /// during an incident. Full CSV logged once at connect-time WARN for
76 /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
77 #[cfg(feature = "direct-valkey-claim")]
78 worker_capabilities_hash: String,
79 #[cfg(feature = "direct-valkey-claim")]
80 lane_index: AtomicUsize,
81 /// Concurrency cap for in-flight tasks. Permits are acquired or
82 /// transferred by [`claim_next`] (feature-gated),
83 /// [`claim_from_grant`] (always available), and
84 /// [`claim_from_reclaim_grant`], transferred to the returned
85 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
86 /// Holds `max_concurrent_tasks` permits total.
87 ///
88 /// [`claim_next`]: FlowFabricWorker::claim_next
89 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
90 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
91 concurrency_semaphore: Arc<Semaphore>,
92 /// Rolling offset for chunked partition scans. Each poll advances the
93 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
94 /// chunk)` polls every partition is covered. The initial value is
95 /// derived from `worker_instance_id` so idle workers spread their
96 /// scans across different partitions from the first poll onward.
97 ///
98 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
99 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
100 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
101 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
102 /// correctness because the sequence simply restarts a new cycle.
103 #[cfg(feature = "direct-valkey-claim")]
104 scan_cursor: AtomicUsize,
105 /// The [`EngineBackend`] the Stage-1b trait forwarders route
106 /// through.
107 ///
108 /// **RFC-012 Stage 1b.** Always populated:
109 /// [`FlowFabricWorker::connect`] now wraps the worker's own
110 /// `ferriskey::Client` in a `ValkeyBackend` via
111 /// `ValkeyBackend::from_client_and_partitions`, and
112 /// [`FlowFabricWorker::connect_with`] replaces that default with
113 /// the caller-supplied `Arc<dyn EngineBackend>`. The
114 /// [`FlowFabricWorker::backend`] accessor still returns
115 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
116 /// narrows the return type once consumers have migrated.
117 ///
118 /// Hot paths (claim, deliver_signal, admin queries) still use the
119 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
120 /// migrates them through this field, and Stage 1d removes the
121 /// embedded client.
122 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
123 /// Optional handle to the same underlying backend viewed as a
124 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
125 /// Populated by [`Self::connect`] from the bundled
126 /// `ValkeyBackend` (which implements the trait); supplied by the
127 /// caller on [`Self::connect_with`] as an explicit
128 /// `Option<Arc<dyn CompletionBackend>>` — `None` means "this
129 /// backend does not support push-based completion" (e.g. a future
130 /// Postgres backend without LISTEN/NOTIFY, or a test mock). Cairn
131 /// and other completion-subscription consumers reach this through
132 /// [`Self::completion_backend`].
133 completion_backend_handle:
134 Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
135}
136
137/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
138/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
139/// O(num_flow_partitions).
140#[cfg(feature = "direct-valkey-claim")]
141const PARTITION_SCAN_CHUNK: usize = 32;
142
143impl FlowFabricWorker {
144 /// Connect to Valkey and prepare the worker.
145 ///
146 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
147 /// library — that is the server's responsibility (ff-server calls
148 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
149 /// the library is already loaded.
150 ///
151 /// # Smoke / dev scripts: rotate `WorkerInstanceId`
152 ///
153 /// The SDK writes a SET-NX liveness sentinel keyed on the worker's
154 /// `WorkerInstanceId`. When a smoke / dev script reuses the same
155 /// `WorkerInstanceId` across restarts, subsequent runs trap behind
156 /// the prior run's SET-NX until the liveness key's TTL (≈ 2× the
157 /// configured lease TTL) expires — the worker appears stuck and
158 /// claims nothing. Iterative scripts should synthesise a fresh
159 /// `WorkerInstanceId` per process (e.g. `WorkerInstanceId::new()`
160 /// or embed a UUID/timestamp) rather than hard-coding a stable
161 /// value. Production workers that cleanly shut down release the
162 /// key; only crashed / kill -9'd processes hit this trap.
163 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
164 if config.lanes.is_empty() {
165 return Err(SdkError::Config {
166 context: "worker_config".into(),
167 field: None,
168 message: "at least one lane is required".into(),
169 });
170 }
171
172 // Build the ferriskey client from the nested `BackendConfig`.
173 // Delegates to `ff_backend_valkey::build_client` so host/port +
174 // TLS + cluster + `BackendTimeouts::request` +
175 // `BackendRetry` wiring lives in exactly one place (pre-Stage
176 // 1c this path had its own `ClientBuilder` chain that diverged
177 // from the backend's shape; RFC-012 Stage 1c tranche 1
178 // consolidates).
179 let client = ff_backend_valkey::build_client(&config.backend).await?;
180
181 // Verify connectivity
182 let pong: String = client
183 .cmd("PING")
184 .execute()
185 .await
186 .map_err(|e| crate::backend_context(e, "PING failed"))?;
187 if pong != "PONG" {
188 return Err(SdkError::Config {
189 context: "worker_connect".into(),
190 field: None,
191 message: format!("unexpected PING response: {pong}"),
192 });
193 }
194
195 // Guard against two worker processes sharing the same
196 // `worker_instance_id`. A duplicate instance would clobber each
197 // other's lease_current/active_index entries and double-claim work.
198 // SET NX on a liveness key with 2× lease TTL; if the key already
199 // exists another process is live. The key auto-expires if this
200 // process crashes without renewal, so a restart after a hard crash
201 // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
202 //
203 // Known limitations of this minimal scheme (documented for operators):
204 // 1. **Startup-only, not runtime.** There is no heartbeat renewal
205 // path. After `2 × lease_ttl_ms` elapses the alive key expires
206 // naturally even while this worker is still running, and a
207 // second process with the same `worker_instance_id` launched
208 // later will successfully SET NX alongside the first. The check
209 // catches misconfiguration at boot; it does not fence duplicates
210 // that appear mid-lifetime. Production deployments should rely
211 // on the orchestrator (Kubernetes, systemd unit with
212 // `Restart=on-failure`, etc.) as the authoritative single-
213 // instance enforcer; this SET NX is belt-and-suspenders.
214 //
215 // 2. **Restart delay after a crash.** If a worker crashes
216 // ungracefully (SIGKILL, container OOM) and is restarted within
217 // `2 × lease_ttl_ms`, the alive key is still present and the
218 // new process exits with `SdkError::Config("duplicate
219 // worker_instance_id ...")`. Options for operators:
220 // - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
221 // before restarting.
222 // - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
223 // unblock the restart.
224 // - Use a fresh `worker_instance_id` for the restart (the
225 // orchestrator should already do this per-Pod).
226 //
227 // 3. **No graceful cleanup on shutdown.** There is no explicit
228 // `disconnect()` call that DELs the alive key. On clean
229 // `SIGTERM` the key lingers until its TTL expires. A follow-up
230 // can add `FlowFabricWorker::disconnect(self)` for callers that
231 // want to skip the restart-delay window.
232 let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
233 let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
234 let set_result: Option<String> = client
235 .cmd("SET")
236 .arg(&alive_key)
237 .arg("1")
238 .arg("NX")
239 .arg("PX")
240 .arg(alive_ttl_ms.to_string().as_str())
241 .execute()
242 .await
243 .map_err(|e| crate::backend_context(e, "SET NX worker alive key"))?;
244 if set_result.is_none() {
245 return Err(SdkError::Config {
246 context: "worker_connect".into(),
247 field: Some("worker_instance_id".into()),
248 message: format!(
249 "duplicate worker_instance_id '{}': another process already holds {alive_key}",
250 config.worker_instance_id
251 ),
252 });
253 }
254
255 // Read partition config from Valkey (set by ff-server on startup).
256 // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
257 let partition_config = read_partition_config(&client).await
258 .unwrap_or_else(|e| {
259 tracing::warn!(
260 error = %e,
261 "ff:config:partitions not found, using defaults"
262 );
263 PartitionConfig::default()
264 });
265
266 let max_tasks = config.max_concurrent_tasks.max(1);
267 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
268
269 tracing::info!(
270 worker_id = %config.worker_id,
271 instance_id = %config.worker_instance_id,
272 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
273 "FlowFabricWorker connected"
274 );
275
276 #[cfg(feature = "direct-valkey-claim")]
277 let scan_cursor_init = scan_cursor_seed(
278 config.worker_instance_id.as_str(),
279 partition_config.num_flow_partitions.max(1) as usize,
280 );
281
282 // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
283 // and deduplicates in one pass; string joining happens once here.
284 //
285 // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
286 // - `,` is the CSV delimiter; a token containing one would split
287 // mid-parse and could let a {"gpu"} worker appear to satisfy
288 // {"gpu,cuda"} (silent auth bypass).
289 // - Empty strings would produce leading/adjacent commas on the
290 // wire and inflate token count for no semantic reason.
291 // - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
292 // `"gpu\n"` vs `"gpu"` produce silent mismatches that are
293 // miserable to debug. Reject anything outside printable ASCII
294 // excluding space (`'!'..='~'`) at ingress so a typo fails
295 // loudly at connect instead of silently mis-routing forever.
296 // Reject at boot so operator misconfig is loud, symmetric with the
297 // scheduler path.
298 #[cfg(feature = "direct-valkey-claim")]
299 for cap in &config.capabilities {
300 if cap.is_empty() {
301 return Err(SdkError::Config {
302 context: "worker_config".into(),
303 field: Some("capabilities".into()),
304 message: "capability token must not be empty".into(),
305 });
306 }
307 if cap.contains(',') {
308 return Err(SdkError::Config {
309 context: "worker_config".into(),
310 field: Some("capabilities".into()),
311 message: format!(
312 "capability token may not contain ',' (CSV delimiter): {cap:?}"
313 ),
314 });
315 }
316 // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
317 // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
318 // characters above 0x7F are ALLOWED so i18n caps like
319 // "东京-gpu" can be used. The CSV wire form is byte-safe for
320 // multibyte UTF-8 because `,` is always a single byte and
321 // never part of a multibyte continuation (only 0x80-0xBF are
322 // continuations, ',' is 0x2C).
323 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
324 return Err(SdkError::Config {
325 context: "worker_config".into(),
326 field: Some("capabilities".into()),
327 message: format!(
328 "capability token must not contain whitespace or control \
329 characters: {cap:?}"
330 ),
331 });
332 }
333 }
334 #[cfg(feature = "direct-valkey-claim")]
335 let worker_capabilities_csv: String = {
336 let set: std::collections::BTreeSet<&str> = config
337 .capabilities
338 .iter()
339 .map(|s| s.as_str())
340 .filter(|s| !s.is_empty())
341 .collect();
342 if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
343 return Err(SdkError::Config {
344 context: "worker_config".into(),
345 field: Some("capabilities".into()),
346 message: format!(
347 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
348 ff_core::policy::CAPS_MAX_TOKENS,
349 set.len()
350 ),
351 });
352 }
353 let csv = set.into_iter().collect::<Vec<_>>().join(",");
354 if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
355 return Err(SdkError::Config {
356 context: "worker_config".into(),
357 field: Some("capabilities".into()),
358 message: format!(
359 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
360 ff_core::policy::CAPS_MAX_BYTES,
361 csv.len()
362 ),
363 });
364 }
365 csv
366 };
367
368 // Short stable digest of the sorted caps CSV, computed once so
369 // per-mismatch logs carry a stable identifier instead of the 4KB
370 // CSV. Shared helper — ff-scheduler uses the same one for its
371 // own per-mismatch logs, so cross-component log lines are
372 // diffable against each other.
373 #[cfg(feature = "direct-valkey-claim")]
374 let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
375
376 // Full CSV logged once at connect so per-mismatch logs (which
377 // carry only the 8-hex hash) can be cross-referenced by ops.
378 #[cfg(feature = "direct-valkey-claim")]
379 if !worker_capabilities_csv.is_empty() {
380 tracing::info!(
381 worker_instance_id = %config.worker_instance_id,
382 worker_caps_hash = %worker_capabilities_hash,
383 worker_caps = %worker_capabilities_csv,
384 "worker connected with capabilities (full CSV — mismatch logs use hash only)"
385 );
386 }
387
388 // Non-authoritative advertisement of caps for operator visibility
389 // (CLI introspection, dashboards). The AUTHORITATIVE source for
390 // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
391 // that, never this string. Lossy here is correctness-safe.
392 //
393 // Storage: a single STRING key holding the sorted CSV. Rationale:
394 // * **Atomic overwrite.** `SET` is a single command — a concurrent
395 // reader can never observe a transient empty value (the prior
396 // DEL+SADD pair had that window).
397 // * **Crash cleanup without refresh loop.** The alive-key SET NX
398 // is startup-only (see §1 above); there's no periodic renew
399 // to piggy-back on, so a TTL on caps would independently
400 // expire mid-flight and hide a live worker's caps from ops
401 // tools. Instead we drop the TTL: each reconnect overwrites;
402 // a crashed worker leaves a stale CSV until a new process
403 // with the same worker_instance_id boots (which triggers
404 // `duplicate worker_instance_id` via alive-key guard anyway —
405 // the orchestrator allocates a new id, and operators can DEL
406 // the stale caps key if they care).
407 // * **Empty caps = DEL.** A restart from {gpu} to {} clears the
408 // advertisement rather than leaving stale data.
409 // Cluster-safe advertisement: the per-worker caps STRING lives at
410 // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
411 // and the INSTANCE ID is SADD'd to the global workers-index SET
412 // `ff:idx:workers` (single slot). The unblock scanner's cluster
413 // enumeration uses SMEMBERS on the index + per-member GET on each
414 // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
415 // cluster mode only scans the shard the SCAN lands on and misses
416 // workers whose key hashes elsewhere). Pattern mirrors Batch A
417 // `budget_policies_index` / `flow_index` / `deps_all_edges`:
418 // operations stay atomic per command, the index is the
419 // cluster-wide enumeration surface.
420 #[cfg(feature = "direct-valkey-claim")]
421 {
422 let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
423 let index_key = ff_core::keys::workers_index_key();
424 let instance_id = config.worker_instance_id.to_string();
425 if worker_capabilities_csv.is_empty() {
426 // No caps advertised. DEL the per-worker caps string AND
427 // SREM from the index so the scanner doesn't GET an empty
428 // string for a worker that never declares caps.
429 let _ = client
430 .cmd("DEL")
431 .arg(&caps_key)
432 .execute::<Option<i64>>()
433 .await;
434 if let Err(e) = client
435 .cmd("SREM")
436 .arg(&index_key)
437 .arg(&instance_id)
438 .execute::<Option<i64>>()
439 .await
440 {
441 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
442 "SREM workers-index failed; continuing (non-authoritative)");
443 }
444 } else {
445 // Atomic overwrite of the caps STRING (one-command). Then
446 // SADD to the index (idempotent — re-running connect for
447 // the same id is a no-op at SADD level). The per-worker
448 // caps key is written BEFORE the index SADD so that when
449 // the scanner observes the id in the index, the caps key
450 // is guaranteed to resolve to a non-stale CSV (the reverse
451 // order would leak an index entry pointing at a stale or
452 // empty caps key during a narrow window).
453 if let Err(e) = client
454 .cmd("SET")
455 .arg(&caps_key)
456 .arg(&worker_capabilities_csv)
457 .execute::<Option<String>>()
458 .await
459 {
460 tracing::warn!(error = %e, key = %caps_key,
461 "SET worker caps advertisement failed; continuing");
462 }
463 if let Err(e) = client
464 .cmd("SADD")
465 .arg(&index_key)
466 .arg(&instance_id)
467 .execute::<Option<i64>>()
468 .await
469 {
470 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
471 "SADD workers-index failed; continuing");
472 }
473 }
474 }
475
476 // RFC-012 Stage 1b: wrap the dialed client in a
477 // ValkeyBackend so `ClaimedTask`'s trait forwarders have
478 // something to call. `from_client_and_partitions` reuses the
479 // already-dialed client — no second connection.
480 // Share the concrete `Arc<ValkeyBackend>` across the two
481 // trait objects — one allocation, both accessors yield
482 // identity-equivalent handles.
483 let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
484 ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
485 client.clone(),
486 partition_config,
487 );
488 let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
489 let completion_backend_handle: Option<
490 Arc<dyn ff_core::completion_backend::CompletionBackend>,
491 > = Some(valkey_backend);
492
493 Ok(Self {
494 client,
495 config,
496 partition_config,
497 #[cfg(feature = "direct-valkey-claim")]
498 worker_capabilities_csv,
499 #[cfg(feature = "direct-valkey-claim")]
500 worker_capabilities_hash,
501 #[cfg(feature = "direct-valkey-claim")]
502 lane_index: AtomicUsize::new(0),
503 concurrency_semaphore,
504 #[cfg(feature = "direct-valkey-claim")]
505 scan_cursor: AtomicUsize::new(scan_cursor_init),
506 backend,
507 completion_backend_handle,
508 })
509 }
510
511 /// Store pre-built [`EngineBackend`] and (optional)
512 /// [`CompletionBackend`] handles on the worker. Builds the worker
513 /// via the legacy [`FlowFabricWorker::connect`] path first (so the
514 /// embedded `ferriskey::Client` that the Stage 1b non-migrated hot
515 /// paths still use is dialed), then replaces the default
516 /// `ValkeyBackend` wrapper with the caller-supplied trait objects.
517 ///
518 /// The `completion` argument is explicit: 0.3.3 previously accepted
519 /// only `backend` and `completion_backend()` silently returned
520 /// `None` on this path because `Arc<dyn EngineBackend>` cannot be
521 /// upcast to `Arc<dyn CompletionBackend>` without loss of
522 /// trait-object identity. 0.3.4 lets the caller decide.
523 ///
524 /// - `Some(arc)` — caller supplies a completion backend.
525 /// [`Self::completion_backend`] returns `Some(clone)`.
526 /// - `None` — this backend does not support push-based completion
527 /// (future Postgres backend without LISTEN/NOTIFY, test mocks).
528 /// [`Self::completion_backend`] returns `None`.
529 ///
530 /// When the underlying backend implements both traits (as
531 /// `ValkeyBackend` does), pass the same `Arc` twice — the two
532 /// trait-object views share one allocation:
533 ///
534 /// ```rust,ignore
535 /// use std::sync::Arc;
536 /// use ff_backend_valkey::ValkeyBackend;
537 /// use ff_sdk::{FlowFabricWorker, WorkerConfig};
538 ///
539 /// # async fn doc(worker_config: WorkerConfig,
540 /// # backend_config: ff_backend_valkey::BackendConfig)
541 /// # -> Result<(), ff_sdk::SdkError> {
542 /// // Valkey (completion supported):
543 /// let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
544 /// let worker = FlowFabricWorker::connect_with(
545 /// worker_config,
546 /// valkey.clone(),
547 /// Some(valkey),
548 /// ).await?;
549 /// # Ok(()) }
550 /// ```
551 ///
552 /// Backend without completion support:
553 ///
554 /// ```rust,ignore
555 /// let worker = FlowFabricWorker::connect_with(
556 /// worker_config,
557 /// backend,
558 /// None,
559 /// ).await?;
560 /// ```
561 ///
562 /// **Stage 1b + Round-7 scope — what the injected backend covers
563 /// today.** The injected backend currently covers these per-task
564 /// `ClaimedTask` ops: `update_progress` / `resume_signals` /
565 /// `delay_execution` / `move_to_waiting_children` / `complete` /
566 /// `cancel` / `fail` / `create_pending_waitpoint` /
567 /// `append_frame` / `report_usage`. A mock backend therefore sees
568 /// that portion of the worker's per-task write surface. Lease
569 /// renewal also routes through `backend.renew(&handle)`. Round-7
570 /// (#135/#145) closed the four trait-shape gaps tracked by #117,
571 /// but `suspend` still reaches the embedded `ferriskey::Client`
572 /// directly via `ff_suspend_execution` — this is the deferred
573 /// suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
574 /// work. `claim_next` / `claim_from_grant` /
575 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
576 /// are Stage 1c hot-path work. Stage 1d removes the embedded
577 /// client entirely.
578 ///
579 /// Today's constructor is therefore NOT yet a drop-in way to swap
580 /// in a non-Valkey backend — it requires a reachable Valkey node
581 /// for `suspend` plus the remaining hot-path ops. Tests that
582 /// exercise only the migrated per-task ops can run fully against
583 /// a mock backend.
584 ///
585 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
586 /// [`CompletionBackend`]: ff_core::completion_backend::CompletionBackend
587 pub async fn connect_with(
588 config: WorkerConfig,
589 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
590 completion: Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
591 ) -> Result<Self, SdkError> {
592 let mut worker = Self::connect(config).await?;
593 worker.backend = backend;
594 worker.completion_backend_handle = completion;
595 Ok(worker)
596 }
597
598 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
599 /// ops through.
600 ///
601 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
602 /// the `Option` wrapper is retained for API stability with the
603 /// Stage-1a shape. Stage 1c narrows the return type to
604 /// `&Arc<dyn EngineBackend>`.
605 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
606 Some(&self.backend)
607 }
608
609 /// Crate-internal direct borrow of the backend. The public
610 /// [`Self::backend`] still returns `Option` for API stability
611 /// (Stage 1b holdover). Snapshot trait-forwarders in
612 /// [`crate::snapshot`] need an un-wrapped reference.
613 pub(crate) fn backend_ref(
614 &self,
615 ) -> &Arc<dyn ff_core::engine_backend::EngineBackend> {
616 &self.backend
617 }
618
619 /// Handle to the completion-event subscription backend, for
620 /// consumers that need to observe execution completions (DAG
621 /// reconcilers, tenant-isolated subscribers).
622 ///
623 /// Returns `Some` when the worker was built through
624 /// [`Self::connect`] on the default `valkey-default` feature
625 /// (the bundled `ValkeyBackend` implements
626 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)),
627 /// or via [`Self::connect_with`] with a `Some(..)` completion
628 /// handle. Returns `None` when the caller passed `None` to
629 /// [`Self::connect_with`] — i.e. the backend does not support
630 /// push-based completion streams (future Postgres without
631 /// LISTEN/NOTIFY, test mocks).
632 ///
633 /// The returned handle shares the same underlying allocation as
634 /// [`Self::backend`]; calls through it (e.g.
635 /// `subscribe_completions_filtered`) hit the same connection
636 /// the worker itself uses.
637 pub fn completion_backend(
638 &self,
639 ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
640 self.completion_backend_handle.clone()
641 }
642
643 /// Get the worker config.
644 pub fn config(&self) -> &WorkerConfig {
645 &self.config
646 }
647
648 /// Get the server-published partition config this worker bound to at
649 /// `connect()`. Exposed so consumers that mint custom
650 /// [`ExecutionId`]s (e.g. for `describe_execution` lookups on ids
651 /// produced outside this worker) stay aligned with the server's
652 /// `num_flow_partitions` — using `PartitionConfig::default()`
653 /// assumes 256 partitions and silently misses data on deployments
654 /// with any other value.
655 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
656 &self.partition_config
657 }
658
659 /// Attempt to claim the next eligible execution.
660 ///
661 /// Phase 1 simplified claim flow:
662 /// 1. Pick a lane (round-robin across configured lanes)
663 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
664 /// 3. Claim the execution via `ff_claim_execution`
665 /// 4. Read execution payload + tags
666 /// 5. Return a [`ClaimedTask`] with auto lease renewal
667 ///
668 /// Gated behind the `direct-valkey-claim` feature — bypasses the
669 /// scheduler's budget / quota / capability admission checks. Enable
670 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
671 /// the scheduler hop would be measurement noise (benches) or when
672 /// the test harness needs a deterministic worker-local path. Prefer
673 /// the scheduler-routed HTTP claim path in production.
674 ///
675 /// # `None` semantics
676 ///
677 /// `Ok(None)` means **no work was found in the partition window this
678 /// poll covered**, not "the cluster is idle". Each call scans a chunk
679 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
680 /// `scan_cursor`; the cursor advances by that chunk size on every
681 /// invocation, so a worker covers every partition exactly once every
682 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
683 ///
684 /// Callers should treat `None` as "poll again soon" (typically after
685 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
686 /// time". Backing off too aggressively on `None` can starve workers
687 /// when work lives on partitions outside the current window.
688 ///
689 /// Returns `Err` on Valkey errors or script failures.
690 #[cfg(feature = "direct-valkey-claim")]
691 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
692 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
693 // try_acquire returns immediately — if no permits available, the worker
694 // is at capacity and should not claim more work.
695 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
696 Ok(p) => p,
697 Err(_) => return Ok(None), // At capacity — no claim attempted
698 };
699
700 let lane_id = self.next_lane();
701 let now = TimestampMs::now();
702
703 // Phase 1: We scan eligible executions directly by reading the eligible
704 // ZSET across execution partitions. In production the scheduler
705 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
706 // simplified inline claim.
707 //
708 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
709 // partitions starting at a rolling offset. This keeps idle Valkey
710 // load at O(chunk) per worker-second instead of O(num_partitions),
711 // and the worker-instance-seeded initial cursor spreads concurrent
712 // workers across different partition windows.
713 let num_partitions = self.partition_config.num_flow_partitions as usize;
714 if num_partitions == 0 {
715 return Ok(None);
716 }
717 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
718 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
719
720 for step in 0..chunk {
721 let partition_idx = ((start + step) % num_partitions) as u16;
722 let partition = ff_core::partition::Partition {
723 family: ff_core::partition::PartitionFamily::Execution,
724 index: partition_idx,
725 };
726 let idx = IndexKeys::new(&partition);
727 let eligible_key = idx.lane_eligible(&lane_id);
728
729 // ZRANGEBYSCORE to get the highest-priority eligible execution.
730 // Score format: -(priority * 1_000_000_000_000) + created_at_ms
731 // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
732 let result: Value = self
733 .client
734 .cmd("ZRANGEBYSCORE")
735 .arg(&eligible_key)
736 .arg("-inf")
737 .arg("+inf")
738 .arg("LIMIT")
739 .arg("0")
740 .arg("1")
741 .execute()
742 .await
743 .map_err(|e| crate::backend_context(e, "ZRANGEBYSCORE failed"))?;
744
745 let execution_id_str = match extract_first_array_string(&result) {
746 Some(s) => s,
747 None => continue, // No eligible executions on this partition
748 };
749
750 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
751 SdkError::from(ff_script::error::ScriptError::Parse {
752 fcall: "claim_execution_from_eligible_set".into(),
753 execution_id: None,
754 message: format!("bad execution_id in eligible set: {e}"),
755 })
756 })?;
757
758 // Step 1: Issue claim grant
759 let grant_result = self
760 .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
761 .await;
762
763 match grant_result {
764 Ok(()) => {}
765 Err(SdkError::Engine(ref boxed))
766 if matches!(
767 **boxed,
768 crate::EngineError::Validation {
769 kind: crate::ValidationKind::CapabilityMismatch,
770 ..
771 }
772 ) =>
773 {
774 let missing = match &**boxed {
775 crate::EngineError::Validation { detail, .. } => detail.clone(),
776 _ => unreachable!(),
777 };
778 // Block-on-mismatch (RFC-009 §7.5) — parity with
779 // ff-scheduler's Scheduler::claim_for_worker. Without
780 // this, the inline-direct-claim path would hot-loop
781 // on an unclaimable top-of-zset (every tick picks the
782 // same execution, wastes an FCALL, logs, releases,
783 // repeats). The scheduler-side unblock scanner
784 // promotes blocked_route executions back to eligible
785 // when a worker with matching caps registers.
786 tracing::info!(
787 execution_id = %execution_id,
788 worker_id = %self.config.worker_id,
789 worker_caps_hash = %self.worker_capabilities_hash,
790 missing = %missing,
791 "capability mismatch, blocking execution off eligible (SDK inline claim)"
792 );
793 self.block_route(&execution_id, &lane_id, &partition, &idx).await;
794 continue;
795 }
796 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
797 tracing::debug!(
798 execution_id = %execution_id,
799 error = %e,
800 "claim grant failed (retryable), trying next partition"
801 );
802 continue;
803 }
804 Err(e) => return Err(e),
805 }
806
807 // Step 2: Claim the execution
808 match self
809 .claim_execution(&execution_id, &lane_id, &partition, now)
810 .await
811 {
812 Ok(mut task) => {
813 // Transfer concurrency permit to the task. When the task is
814 // completed/failed/cancelled/dropped the permit returns to
815 // the semaphore, allowing another claim.
816 task.set_concurrency_permit(permit);
817 return Ok(Some(task));
818 }
819 Err(SdkError::Engine(ref boxed))
820 if matches!(
821 **boxed,
822 crate::EngineError::Contention(
823 crate::ContentionKind::UseClaimResumedExecution
824 )
825 ) =>
826 {
827 // Execution was resumed from suspension — attempt_interrupted.
828 // ff_claim_execution rejects this; use ff_claim_resumed_execution
829 // which reuses the existing attempt instead of creating a new one.
830 tracing::debug!(
831 execution_id = %execution_id,
832 "execution is resumed, using claim_resumed path"
833 );
834 match self
835 .claim_resumed_execution(&execution_id, &lane_id, &partition)
836 .await
837 {
838 Ok(mut task) => {
839 task.set_concurrency_permit(permit);
840 return Ok(Some(task));
841 }
842 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
843 tracing::debug!(
844 execution_id = %execution_id,
845 error = %e2,
846 "claim_resumed failed (retryable), trying next partition"
847 );
848 continue;
849 }
850 Err(e2) => return Err(e2),
851 }
852 }
853 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
854 tracing::debug!(
855 execution_id = %execution_id,
856 error = %e,
857 "claim execution failed (retryable), trying next partition"
858 );
859 continue;
860 }
861 Err(e) => return Err(e),
862 }
863 }
864
865 // No eligible work found on any partition
866 Ok(None)
867 }
868
869 #[cfg(feature = "direct-valkey-claim")]
870 async fn issue_claim_grant(
871 &self,
872 execution_id: &ExecutionId,
873 lane_id: &LaneId,
874 partition: &ff_core::partition::Partition,
875 idx: &IndexKeys,
876 ) -> Result<(), SdkError> {
877 let ctx = ExecKeyContext::new(partition, execution_id);
878
879 // KEYS (3): exec_core, claim_grant_key, eligible_zset
880 let keys: Vec<String> = vec![
881 ctx.core(),
882 ctx.claim_grant(),
883 idx.lane_eligible(lane_id),
884 ];
885
886 // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
887 // capability_hash, grant_ttl_ms, route_snapshot_json,
888 // admission_summary, worker_capabilities_csv (sorted)
889 let args: Vec<String> = vec![
890 execution_id.to_string(),
891 self.config.worker_id.to_string(),
892 self.config.worker_instance_id.to_string(),
893 lane_id.to_string(),
894 String::new(), // capability_hash
895 "5000".to_owned(), // grant_ttl_ms (5 seconds)
896 String::new(), // route_snapshot_json
897 String::new(), // admission_summary
898 self.worker_capabilities_csv.clone(), // sorted CSV
899 ];
900
901 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
902 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
903
904 let raw: Value = self
905 .client
906 .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
907 .await
908 .map_err(SdkError::from)?;
909
910 crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
911 }
912
913 /// Move an execution from the lane's eligible ZSET into its
914 /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
915 /// after a `CapabilityMismatch` reject — without this, the inline
916 /// direct-claim path would re-pick the same top-of-zset every tick
917 /// (same pattern the scheduler's block_candidate handles). The
918 /// engine's unblock scanner periodically promotes blocked_route
919 /// back to eligible once a worker with matching caps registers.
920 ///
921 /// Best-effort: transport or logical rejects (e.g. the execution
922 /// already went terminal between pick and block) are logged and the
923 /// outer loop simply `continue`s to the next partition. Parity with
924 /// ff-scheduler::Scheduler::block_candidate.
925 #[cfg(feature = "direct-valkey-claim")]
926 async fn block_route(
927 &self,
928 execution_id: &ExecutionId,
929 lane_id: &LaneId,
930 partition: &ff_core::partition::Partition,
931 idx: &IndexKeys,
932 ) {
933 let ctx = ExecKeyContext::new(partition, execution_id);
934 let core_key = ctx.core();
935 let eligible_key = idx.lane_eligible(lane_id);
936 let blocked_key = idx.lane_blocked_route(lane_id);
937 let eid_s = execution_id.to_string();
938 let now_ms = TimestampMs::now().0.to_string();
939
940 let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
941 let argv: [&str; 4] = [
942 &eid_s,
943 "waiting_for_capable_worker",
944 "no connected worker satisfies required_capabilities",
945 &now_ms,
946 ];
947
948 match self
949 .client
950 .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
951 .await
952 {
953 Ok(v) => {
954 // Parse Lua result so a logical reject (e.g. execution
955 // went terminal mid-flight) is visible — same fix we
956 // applied to ff-scheduler's block_candidate.
957 if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
958 tracing::warn!(
959 execution_id = %execution_id,
960 error = %e,
961 "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
962 will re-evaluate"
963 );
964 }
965 }
966 Err(e) => {
967 tracing::warn!(
968 execution_id = %execution_id,
969 error = %e,
970 "SDK block_route: transport failure; eligible ZSET unchanged"
971 );
972 }
973 }
974 }
975
976 /// Low-level claim of a granted execution. Invokes
977 /// `ff_claim_execution` and returns a `ClaimedTask` with auto
978 /// lease renewal.
979 ///
980 /// Previously gated behind `direct-valkey-claim`; ungated so
981 /// the public [`claim_from_grant`] entry point can reuse the
982 /// same FCALL plumbing. The method stays private — external
983 /// callers use `claim_from_grant`.
984 ///
985 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
986 async fn claim_execution(
987 &self,
988 execution_id: &ExecutionId,
989 lane_id: &LaneId,
990 partition: &ff_core::partition::Partition,
991 _now: TimestampMs,
992 ) -> Result<ClaimedTask, SdkError> {
993 let ctx = ExecKeyContext::new(partition, execution_id);
994 let idx = IndexKeys::new(partition);
995
996 // Pre-read total_attempt_count from exec_core to derive next attempt index.
997 // The Lua uses total_attempt_count as the new index and dynamically builds
998 // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
999 // we pass the correct index for documentation/debugging.
1000 let total_str: Option<String> = self.client
1001 .cmd("HGET")
1002 .arg(ctx.core())
1003 .arg("total_attempt_count")
1004 .execute()
1005 .await
1006 .unwrap_or(None);
1007 let next_idx = total_str
1008 .as_deref()
1009 .and_then(|s| s.parse::<u32>().ok())
1010 .unwrap_or(0);
1011 let att_idx = AttemptIndex::new(next_idx);
1012
1013 let lease_id = LeaseId::new().to_string();
1014 let attempt_id = AttemptId::new().to_string();
1015 let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
1016
1017 // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
1018 let keys: Vec<String> = vec![
1019 ctx.core(), // 1 exec_core
1020 ctx.claim_grant(), // 2 claim_grant
1021 idx.lane_eligible(lane_id), // 3 eligible_zset
1022 idx.lease_expiry(), // 4 lease_expiry_zset
1023 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1024 ctx.attempt_hash(att_idx), // 6 attempt_hash (placeholder)
1025 ctx.attempt_usage(att_idx), // 7 attempt_usage (placeholder)
1026 ctx.attempt_policy(att_idx), // 8 attempt_policy (placeholder)
1027 ctx.attempts(), // 9 attempts_zset
1028 ctx.lease_current(), // 10 lease_current
1029 ctx.lease_history(), // 11 lease_history
1030 idx.lane_active(lane_id), // 12 active_index
1031 idx.attempt_timeout(), // 13 attempt_timeout_zset
1032 idx.execution_deadline(), // 14 execution_deadline_zset
1033 ];
1034
1035 // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
1036 let args: Vec<String> = vec![
1037 execution_id.to_string(), // 1 execution_id
1038 self.config.worker_id.to_string(), // 2 worker_id
1039 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1040 lane_id.to_string(), // 4 lane
1041 String::new(), // 5 capability_hash
1042 lease_id.clone(), // 6 lease_id
1043 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1044 renew_before_ms.to_string(), // 8 renew_before_ms
1045 attempt_id.clone(), // 9 attempt_id
1046 "{}".to_owned(), // 10 attempt_policy_json
1047 String::new(), // 11 attempt_timeout_ms
1048 String::new(), // 12 execution_deadline_at
1049 ];
1050
1051 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1052 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1053
1054 let raw: Value = self
1055 .client
1056 .fcall("ff_claim_execution", &key_refs, &arg_refs)
1057 .await
1058 .map_err(SdkError::from)?;
1059
1060 // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
1061 // attempt_id, attempt_type, lease_expires_at}
1062 let arr = match &raw {
1063 Value::Array(arr) => arr,
1064 _ => {
1065 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1066 fcall: "ff_claim_execution".into(),
1067 execution_id: Some(execution_id.to_string()),
1068 message: "expected Array".into(),
1069 }));
1070 }
1071 };
1072
1073 let status_code = match arr.first() {
1074 Some(Ok(Value::Int(n))) => *n,
1075 _ => {
1076 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1077 fcall: "ff_claim_execution".into(),
1078 execution_id: Some(execution_id.to_string()),
1079 message: "bad status code".into(),
1080 }));
1081 }
1082 };
1083
1084 if status_code != 1 {
1085 let err_field_str = |idx: usize| -> String {
1086 arr.get(idx)
1087 .and_then(|v| match v {
1088 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1089 Ok(Value::SimpleString(s)) => Some(s.clone()),
1090 _ => None,
1091 })
1092 .unwrap_or_default()
1093 };
1094 let error_code = {
1095 let s = err_field_str(1);
1096 if s.is_empty() { "unknown".to_owned() } else { s }
1097 };
1098 let detail = err_field_str(2);
1099
1100 return Err(SdkError::from(
1101 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1102 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1103 fcall: "ff_claim_execution".into(),
1104 execution_id: Some(execution_id.to_string()),
1105 message: format!("unknown error: {error_code}"),
1106 }),
1107 ));
1108 }
1109
1110 // Extract fields from success response
1111 let field_str = |idx: usize| -> String {
1112 arr.get(idx + 2) // skip status_code and "OK"
1113 .and_then(|v| match v {
1114 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1115 Ok(Value::SimpleString(s)) => Some(s.clone()),
1116 Ok(Value::Int(n)) => Some(n.to_string()),
1117 _ => None,
1118 })
1119 .unwrap_or_default()
1120 };
1121
1122 // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1123 // Positions: 0 1 2 3 4 5
1124 let lease_id = LeaseId::parse(&field_str(0))
1125 .unwrap_or_else(|_| LeaseId::new());
1126 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1127 // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1128 let attempt_id = AttemptId::parse(&field_str(3))
1129 .unwrap_or_else(|_| AttemptId::new());
1130 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1131
1132 // Read execution payload and metadata
1133 let (input_payload, execution_kind, tags) = self
1134 .read_execution_context(execution_id, partition)
1135 .await?;
1136
1137 Ok(ClaimedTask::new(
1138 self.client.clone(),
1139 self.backend.clone(),
1140 self.partition_config,
1141 execution_id.clone(),
1142 attempt_index,
1143 attempt_id,
1144 lease_id,
1145 lease_epoch,
1146 self.config.lease_ttl_ms,
1147 lane_id.clone(),
1148 self.config.worker_instance_id.clone(),
1149 input_payload,
1150 execution_kind,
1151 tags,
1152 ))
1153 }
1154
1155 /// Consume a [`ClaimGrant`] and claim the granted execution on
1156 /// this worker. The intended production entry point: pair with
1157 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1158 /// scheduler-issued grants into the SDK without enabling the
1159 /// `direct-valkey-claim` feature (which bypasses budget/quota
1160 /// admission control).
1161 ///
1162 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1163 /// so a saturated worker does not consume the grant: the grant
1164 /// stays valid for its remaining TTL and the caller can either
1165 /// release it back to the scheduler or retry after some other
1166 /// in-flight task completes.
1167 ///
1168 /// On success the returned [`ClaimedTask`] holds a concurrency
1169 /// permit that releases automatically on
1170 /// `complete`/`fail`/`cancel`/drop — same contract as
1171 /// `claim_next`.
1172 ///
1173 /// # Arguments
1174 ///
1175 /// * `lane` — the lane the grant was issued for. Must match what
1176 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1177 /// uses it to look up `lane_eligible`, `lane_active`, and the
1178 /// `worker_leases` index slot.
1179 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1180 ///
1181 /// # Errors
1182 ///
1183 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1184 /// permits all held. Retryable; the grant is untouched.
1185 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1186 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1187 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1188 /// (wrapped in [`SdkError::Engine`]).
1189 /// * `ScriptError::CapabilityMismatch` — execution's required
1190 /// capabilities not a subset of this worker's caps (wrapped in
1191 /// [`SdkError::Engine`]). Surfaced post-grant if a race
1192 /// between grant issuance and caps change allows it.
1193 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1194 /// unexpected shape (wrapped in [`SdkError::Engine`]).
1195 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1196 /// transport error during the FCALL or the
1197 /// `read_execution_context` follow-up.
1198 ///
1199 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1200 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1201 pub async fn claim_from_grant(
1202 &self,
1203 lane: LaneId,
1204 grant: ff_core::contracts::ClaimGrant,
1205 ) -> Result<ClaimedTask, SdkError> {
1206 // Semaphore check FIRST. If the worker is saturated we must
1207 // surface the condition to the caller without touching the
1208 // grant — silently returning Ok(None) (as claim_next does)
1209 // would drop a grant the scheduler has already committed work
1210 // to issuing, wasting the slot until its TTL elapses.
1211 let permit = self
1212 .concurrency_semaphore
1213 .clone()
1214 .try_acquire_owned()
1215 .map_err(|_| SdkError::WorkerAtCapacity)?;
1216
1217 let now = TimestampMs::now();
1218 let partition = grant.partition().map_err(|e| SdkError::Config {
1219 context: "claim_from_grant".to_owned(),
1220 field: Some("partition_key".to_owned()),
1221 message: e.to_string(),
1222 })?;
1223 let mut task = match self
1224 .claim_execution(&grant.execution_id, &lane, &partition, now)
1225 .await
1226 {
1227 Ok(task) => task,
1228 Err(SdkError::Engine(ref boxed))
1229 if matches!(
1230 **boxed,
1231 crate::EngineError::Contention(
1232 crate::ContentionKind::UseClaimResumedExecution
1233 )
1234 ) =>
1235 {
1236 // Execution was resumed from suspension — attempt_interrupted.
1237 // ff_claim_execution rejects this; use ff_claim_resumed_execution
1238 // which reuses the existing attempt instead of creating a new one.
1239 // Mirrors the fallback inside `claim_next` so HTTP-routed callers
1240 // (`claim_via_server` → `claim_from_grant`) get the same transparent
1241 // re-claim behavior.
1242 tracing::debug!(
1243 execution_id = %grant.execution_id,
1244 "execution is resumed, using claim_resumed path"
1245 );
1246 self.claim_resumed_execution(&grant.execution_id, &lane, &partition)
1247 .await?
1248 }
1249 Err(e) => return Err(e),
1250 };
1251 task.set_concurrency_permit(permit);
1252 Ok(task)
1253 }
1254
1255 /// Scheduler-routed claim: POST the server's
1256 /// `/v1/workers/{id}/claim`, then chain to
1257 /// [`Self::claim_from_grant`].
1258 ///
1259 /// Batch C item 2 PR-B. This is the production entry point —
1260 /// budget + quota + capability admission run server-side inside
1261 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1262 /// enable the `direct-valkey-claim` feature.
1263 ///
1264 /// Returns `Ok(None)` when the server says no eligible execution
1265 /// (HTTP 204). Callers typically back off by
1266 /// `config.claim_poll_interval_ms` and try again, same cadence
1267 /// as the direct-claim path's `Ok(None)`.
1268 ///
1269 /// The `admin` client is the established HTTP surface
1270 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1271 /// second reqwest client around. Build once at worker boot and
1272 /// hand in by reference on every claim.
1273 pub async fn claim_via_server(
1274 &self,
1275 admin: &crate::FlowFabricAdminClient,
1276 lane: &LaneId,
1277 grant_ttl_ms: u64,
1278 ) -> Result<Option<ClaimedTask>, SdkError> {
1279 let req = crate::admin::ClaimForWorkerRequest {
1280 worker_id: self.config.worker_id.to_string(),
1281 lane_id: lane.to_string(),
1282 worker_instance_id: self.config.worker_instance_id.to_string(),
1283 capabilities: self.config.capabilities.clone(),
1284 grant_ttl_ms,
1285 };
1286 let Some(resp) = admin.claim_for_worker(req).await? else {
1287 return Ok(None);
1288 };
1289 let grant = resp.into_grant()?;
1290 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1291 }
1292
1293 /// Consume a [`ReclaimGrant`] and transition the granted
1294 /// `attempt_interrupted` execution into a `started` state on this
1295 /// worker. Symmetric partner to [`claim_from_grant`] for the
1296 /// resume path.
1297 ///
1298 /// The grant must have been issued to THIS worker (matching
1299 /// `worker_id` at grant time). A mismatch returns
1300 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1301 /// atomically by `ff_claim_resumed_execution`; a second call with
1302 /// the same grant also returns `InvalidClaimGrant`.
1303 ///
1304 /// # Concurrency
1305 ///
1306 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1307 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1308 /// assume pre-existing capacity on this worker — a reclaim can
1309 /// land on a fresh worker instance that just came up after a
1310 /// crash/restart and is picking up a previously-interrupted
1311 /// execution. If the worker is saturated, the grant stays valid
1312 /// for its remaining TTL and the caller can release it or retry.
1313 ///
1314 /// On success the returned [`ClaimedTask`] holds a concurrency
1315 /// permit that releases automatically on
1316 /// `complete`/`fail`/`cancel`/drop.
1317 ///
1318 /// # Errors
1319 ///
1320 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1321 /// permits all held. Retryable; the grant is untouched (no
1322 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1323 /// atomically consume the grant key).
1324 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1325 /// or `worker_id` mismatch.
1326 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1327 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1328 /// `attempt_interrupted`.
1329 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1330 /// not `runnable`.
1331 /// * `ScriptError::ExecutionNotFound` — core key missing.
1332 /// * [`SdkError::Backend`] / [`SdkError::BackendContext`] —
1333 /// transport.
1334 ///
1335 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1336 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1337 pub async fn claim_from_reclaim_grant(
1338 &self,
1339 grant: ff_core::contracts::ReclaimGrant,
1340 ) -> Result<ClaimedTask, SdkError> {
1341 // Semaphore check FIRST — same load-bearing ordering as
1342 // `claim_from_grant`. If the worker is saturated, surface
1343 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1344 // atomic consume on the grant key, so calling it past-
1345 // saturation would destroy the grant while leaving no
1346 // permit to attach to the returned `ClaimedTask`.
1347 let permit = self
1348 .concurrency_semaphore
1349 .clone()
1350 .try_acquire_owned()
1351 .map_err(|_| SdkError::WorkerAtCapacity)?;
1352
1353 // Grant carries partition + lane_id so no round-trip is needed
1354 // to resolve them before the FCALL.
1355 let partition = grant.partition().map_err(|e| SdkError::Config {
1356 context: "claim_from_reclaim_grant".to_owned(),
1357 field: Some("partition_key".to_owned()),
1358 message: e.to_string(),
1359 })?;
1360 let mut task = self
1361 .claim_resumed_execution(
1362 &grant.execution_id,
1363 &grant.lane_id,
1364 &partition,
1365 )
1366 .await?;
1367 task.set_concurrency_permit(permit);
1368 Ok(task)
1369 }
1370
1371 /// Low-level resume claim. Forwards through
1372 /// [`EngineBackend::claim_resumed_execution`](ff_core::engine_backend::EngineBackend::claim_resumed_execution)
1373 /// — the trait-level trigger surface landed in issue #150 — and
1374 /// returns a [`ClaimedTask`] bound to the resumed attempt.
1375 ///
1376 /// The method stays private; external callers use
1377 /// [`claim_from_reclaim_grant`].
1378 ///
1379 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1380 async fn claim_resumed_execution(
1381 &self,
1382 execution_id: &ExecutionId,
1383 lane_id: &LaneId,
1384 partition: &ff_core::partition::Partition,
1385 ) -> Result<ClaimedTask, SdkError> {
1386 let ctx = ExecKeyContext::new(partition, execution_id);
1387
1388 // Pre-read current_attempt_index for the existing attempt hash key.
1389 // This is load-bearing: the backend's KEYS[6] must point to the
1390 // real attempt hash, and the backend takes the index verbatim
1391 // from `ClaimResumedExecutionArgs::current_attempt_index`.
1392 let att_idx_str: Option<String> = self.client
1393 .cmd("HGET")
1394 .arg(ctx.core())
1395 .arg("current_attempt_index")
1396 .execute()
1397 .await
1398 .map_err(|e| crate::backend_context(e, "read attempt_index"))?;
1399 let att_idx = AttemptIndex::new(
1400 att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1401 );
1402
1403 let args = ff_core::contracts::ClaimResumedExecutionArgs {
1404 execution_id: execution_id.clone(),
1405 worker_id: self.config.worker_id.clone(),
1406 worker_instance_id: self.config.worker_instance_id.clone(),
1407 lane_id: lane_id.clone(),
1408 lease_id: LeaseId::new(),
1409 lease_ttl_ms: self.config.lease_ttl_ms,
1410 current_attempt_index: att_idx,
1411 remaining_attempt_timeout_ms: None,
1412 now: TimestampMs::now(),
1413 };
1414
1415 let ff_core::contracts::ClaimResumedExecutionResult::Claimed(claimed) =
1416 self.backend.claim_resumed_execution(args).await?;
1417
1418 let (input_payload, execution_kind, tags) = self
1419 .read_execution_context(execution_id, partition)
1420 .await?;
1421
1422 Ok(ClaimedTask::new(
1423 self.client.clone(),
1424 self.backend.clone(),
1425 self.partition_config,
1426 execution_id.clone(),
1427 claimed.attempt_index,
1428 claimed.attempt_id,
1429 claimed.lease_id,
1430 claimed.lease_epoch,
1431 self.config.lease_ttl_ms,
1432 lane_id.clone(),
1433 self.config.worker_instance_id.clone(),
1434 input_payload,
1435 execution_kind,
1436 tags,
1437 ))
1438 }
1439
1440 /// Read payload + execution_kind + tags from exec_core. Previously
1441 /// gated behind `direct-valkey-claim`; now shared by the
1442 /// feature-gated inline claim path and the public
1443 /// `claim_from_reclaim_grant` entry point.
1444 async fn read_execution_context(
1445 &self,
1446 execution_id: &ExecutionId,
1447 partition: &ff_core::partition::Partition,
1448 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1449 let ctx = ExecKeyContext::new(partition, execution_id);
1450
1451 // Read payload
1452 let payload: Option<String> = self
1453 .client
1454 .get(&ctx.payload())
1455 .await
1456 .map_err(|e| crate::backend_context(e, "GET payload failed"))?;
1457 let input_payload = payload.unwrap_or_default().into_bytes();
1458
1459 // Read execution_kind from core
1460 let kind: Option<String> = self
1461 .client
1462 .hget(&ctx.core(), "execution_kind")
1463 .await
1464 .map_err(|e| crate::backend_context(e, "HGET execution_kind failed"))?;
1465 let execution_kind = kind.unwrap_or_default();
1466
1467 // Read tags
1468 let tags: HashMap<String, String> = self
1469 .client
1470 .hgetall(&ctx.tags())
1471 .await
1472 .map_err(|e| crate::backend_context(e, "HGETALL tags"))?;
1473
1474 Ok((input_payload, execution_kind, tags))
1475 }
1476
1477 // ── Phase 3: Signal delivery ──
1478
1479 /// Deliver a signal to a suspended execution's waitpoint.
1480 ///
1481 /// The engine atomically records the signal, evaluates the resume condition,
1482 /// and optionally transitions the execution from `suspended` to `runnable`.
1483 ///
1484 /// Forwards through
1485 /// [`EngineBackend::deliver_signal`](ff_core::engine_backend::EngineBackend::deliver_signal)
1486 /// — the trait-level trigger surface landed in issue #150.
1487 pub async fn deliver_signal(
1488 &self,
1489 execution_id: &ExecutionId,
1490 waitpoint_id: &WaitpointId,
1491 signal: crate::task::Signal,
1492 ) -> Result<crate::task::SignalOutcome, SdkError> {
1493 let args = ff_core::contracts::DeliverSignalArgs {
1494 execution_id: execution_id.clone(),
1495 waitpoint_id: waitpoint_id.clone(),
1496 signal_id: ff_core::types::SignalId::new(),
1497 signal_name: signal.signal_name,
1498 signal_category: signal.signal_category,
1499 source_type: signal.source_type,
1500 source_identity: signal.source_identity,
1501 payload: signal.payload,
1502 payload_encoding: Some("json".to_owned()),
1503 correlation_id: None,
1504 idempotency_key: signal.idempotency_key,
1505 target_scope: "waitpoint".to_owned(),
1506 created_at: Some(TimestampMs::now()),
1507 dedup_ttl_ms: None,
1508 resume_delay_ms: None,
1509 max_signals_per_execution: None,
1510 signal_maxlen: None,
1511 waitpoint_token: signal.waitpoint_token,
1512 now: TimestampMs::now(),
1513 };
1514
1515 let result = self.backend.deliver_signal(args).await?;
1516 Ok(match result {
1517 ff_core::contracts::DeliverSignalResult::Accepted { signal_id, effect } => {
1518 if effect == "resume_condition_satisfied" {
1519 crate::task::SignalOutcome::TriggeredResume { signal_id }
1520 } else {
1521 crate::task::SignalOutcome::Accepted { signal_id, effect }
1522 }
1523 }
1524 ff_core::contracts::DeliverSignalResult::Duplicate { existing_signal_id } => {
1525 crate::task::SignalOutcome::Duplicate {
1526 existing_signal_id: existing_signal_id.to_string(),
1527 }
1528 }
1529 })
1530 }
1531
1532 #[cfg(feature = "direct-valkey-claim")]
1533 fn next_lane(&self) -> LaneId {
1534 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1535 self.config.lanes[idx].clone()
1536 }
1537}
1538
1539#[cfg(feature = "direct-valkey-claim")]
1540fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1541 use ff_core::error::ErrorClass;
1542 matches!(
1543 ff_script::engine_error_ext::class(err),
1544 ErrorClass::Retryable | ErrorClass::Informational
1545 )
1546}
1547
1548/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1549/// instance id with FNV-1a to place distinct worker processes on different
1550/// partition windows from their first poll. Zero is valid for single-worker
1551/// clusters but spreads work in multi-worker deployments.
1552#[cfg(feature = "direct-valkey-claim")]
1553fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1554 if num_partitions == 0 {
1555 return 0;
1556 }
1557 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1558}
1559
1560#[cfg(feature = "direct-valkey-claim")]
1561fn extract_first_array_string(value: &Value) -> Option<String> {
1562 match value {
1563 Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1564 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1565 Ok(Value::SimpleString(s)) => Some(s.clone()),
1566 _ => None,
1567 },
1568 _ => None,
1569 }
1570}
1571
1572/// Read partition config from Valkey's `ff:config:partitions` hash.
1573/// Returns Err if the key doesn't exist or can't be read.
1574async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1575 let key = ff_core::keys::global_config_partitions();
1576 let fields: HashMap<String, String> = client
1577 .hgetall(&key)
1578 .await
1579 .map_err(|e| crate::backend_context(e, format!("HGETALL {key}")))?;
1580
1581 if fields.is_empty() {
1582 return Err(SdkError::Config {
1583 context: "read_partition_config".into(),
1584 field: None,
1585 message: "ff:config:partitions not found in Valkey".into(),
1586 });
1587 }
1588
1589 let parse = |field: &str, default: u16| -> u16 {
1590 fields
1591 .get(field)
1592 .and_then(|v| v.parse().ok())
1593 .filter(|&n: &u16| n > 0)
1594 .unwrap_or(default)
1595 };
1596
1597 Ok(PartitionConfig {
1598 num_flow_partitions: parse("num_flow_partitions", 256),
1599 num_budget_partitions: parse("num_budget_partitions", 32),
1600 num_quota_partitions: parse("num_quota_partitions", 32),
1601 })
1602}
1603
1604#[cfg(test)]
1605mod completion_accessor_type_tests {
1606 //! Type-level compile check that
1607 //! [`FlowFabricWorker::completion_backend`] returns an
1608 //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1609 //! the assertion is at the function-pointer type level and the
1610 //! #[test] body exists solely so the compiler elaborates it.
1611 use super::FlowFabricWorker;
1612 use ff_core::completion_backend::CompletionBackend;
1613 use std::sync::Arc;
1614
1615 #[test]
1616 fn completion_backend_accessor_signature() {
1617 // If this line compiles, the public accessor returns the
1618 // advertised type. The function is never called (no live
1619 // worker), so no I/O happens.
1620 let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1621 FlowFabricWorker::completion_backend;
1622 }
1623}
1624
1625#[cfg(all(test, feature = "direct-valkey-claim"))]
1626mod scan_cursor_tests {
1627 use super::scan_cursor_seed;
1628
1629 #[test]
1630 fn stable_for_same_input() {
1631 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1632 }
1633
1634 #[test]
1635 fn distinct_for_different_ids() {
1636 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1637 }
1638
1639 #[test]
1640 fn bounded_by_partition_count() {
1641 for i in 0..100 {
1642 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1643 }
1644 }
1645
1646 #[test]
1647 fn zero_partitions_returns_zero() {
1648 assert_eq!(scan_cursor_seed("w1", 0), 0);
1649 }
1650}