ff_sdk/worker.rs
1use std::collections::HashMap;
2#[cfg(feature = "direct-valkey-claim")]
3use std::sync::atomic::{AtomicUsize, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6
7use ferriskey::{Client, ClientBuilder, Value};
8use ff_core::keys::{ExecKeyContext, IndexKeys};
9use ff_core::partition::PartitionConfig;
10use ff_core::types::*;
11use tokio::sync::Semaphore;
12
13use crate::config::WorkerConfig;
14use crate::task::ClaimedTask;
15use crate::SdkError;
16
17/// FlowFabric worker — connects to Valkey, claims executions, and provides
18/// the worker-facing API.
19///
20/// # Admission control
21///
22/// `claim_next()` lives behind the `direct-valkey-claim` feature flag and
23/// **bypasses the scheduler's admission controls**: it reads the eligible
24/// ZSET directly and mints its own claim grant without consulting budget
25/// (`{b:M}`) or quota (`{q:K}`) policies. Default-off. Intended for
26/// benchmarks, tests, and single-tenant development where the scheduler
27/// hop is measurement noise, not for production.
28///
29/// For production deployments, consume scheduler-issued grants via
30/// [`FlowFabricWorker::claim_from_grant`] — the scheduler enforces
31/// budget breach, quota sliding-window, concurrency cap, and
32/// capability-match checks before issuing grants.
33///
34/// # Usage
35///
36/// ```rust,ignore
37/// use ff_sdk::{FlowFabricWorker, WorkerConfig};
38///
39/// let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
40/// let worker = FlowFabricWorker::connect(config).await?;
41///
42/// loop {
43/// if let Some(task) = worker.claim_next().await? {
44/// // Process task...
45/// task.complete(Some(b"result".to_vec())).await?;
46/// } else {
47/// tokio::time::sleep(Duration::from_secs(1)).await;
48/// }
49/// }
50/// ```
51pub struct FlowFabricWorker {
52 client: Client,
53 config: WorkerConfig,
54 partition_config: PartitionConfig,
55 /// Sorted, deduplicated, comma-separated capabilities — computed once
56 /// from `config.capabilities` at connect time. Passed as ARGV[9] to
57 /// `ff_issue_claim_grant` on every claim. BTreeSet sorting is critical:
58 /// Lua's `ff_issue_claim_grant` relies on a stable CSV form for
59 /// reproducible logs and tests.
60 #[cfg(feature = "direct-valkey-claim")]
61 worker_capabilities_csv: String,
62 /// 8-hex FNV-1a digest of `worker_capabilities_csv`. Used in
63 /// per-mismatch logs so the 4KB CSV never echoes on every reject
64 /// during an incident. Full CSV logged once at connect-time WARN for
65 /// cross-reference. Mirrors `ff-scheduler::claim::worker_caps_digest`.
66 #[cfg(feature = "direct-valkey-claim")]
67 worker_capabilities_hash: String,
68 #[cfg(feature = "direct-valkey-claim")]
69 lane_index: AtomicUsize,
70 /// Concurrency cap for in-flight tasks. Permits are acquired or
71 /// transferred by [`claim_next`] (feature-gated),
72 /// [`claim_from_grant`] (always available), and
73 /// [`claim_from_reclaim_grant`], transferred to the returned
74 /// [`ClaimedTask`], and released on task complete/fail/cancel/drop.
75 /// Holds `max_concurrent_tasks` permits total.
76 ///
77 /// [`claim_next`]: FlowFabricWorker::claim_next
78 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
79 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
80 concurrency_semaphore: Arc<Semaphore>,
81 /// Rolling offset for chunked partition scans. Each poll advances the
82 /// cursor by `PARTITION_SCAN_CHUNK`, so over `ceil(num_partitions /
83 /// chunk)` polls every partition is covered. The initial value is
84 /// derived from `worker_instance_id` so idle workers spread their
85 /// scans across different partitions from the first poll onward.
86 ///
87 /// Overflow: on 64-bit targets `usize` is `u64` — overflow after
88 /// ~2^64 polls (billions of years at any realistic rate). On 32-bit
89 /// targets (wasm32, i686) `usize` is `u32` and wraps after ~4 years
90 /// at 1 poll/sec — acceptable; on wrap, the modulo preserves
91 /// correctness because the sequence simply restarts a new cycle.
92 #[cfg(feature = "direct-valkey-claim")]
93 scan_cursor: AtomicUsize,
94 /// The [`EngineBackend`] the Stage-1b trait forwarders route
95 /// through.
96 ///
97 /// **RFC-012 Stage 1b.** Always populated:
98 /// [`FlowFabricWorker::connect`] now wraps the worker's own
99 /// `ferriskey::Client` in a `ValkeyBackend` via
100 /// `ValkeyBackend::from_client_and_partitions`, and
101 /// [`FlowFabricWorker::connect_with`] replaces that default with
102 /// the caller-supplied `Arc<dyn EngineBackend>`. The
103 /// [`FlowFabricWorker::backend`] accessor still returns
104 /// `Option<&Arc<dyn EngineBackend>>` for API stability — Stage 1c
105 /// narrows the return type once consumers have migrated.
106 ///
107 /// Hot paths (claim, deliver_signal, admin queries) still use the
108 /// embedded `ferriskey::Client` directly at Stage 1b; Stage 1c
109 /// migrates them through this field, and Stage 1d removes the
110 /// embedded client.
111 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
112 /// Optional upcast to the same underlying backend viewed as a
113 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend).
114 /// Populated by [`Self::connect`] from the bundled
115 /// `ValkeyBackend` (which implements the trait); cleared by
116 /// [`Self::connect_with`] because a caller-supplied
117 /// `Arc<dyn EngineBackend>` cannot be re-upcast. Cairn and other
118 /// completion-subscription consumers reach this through
119 /// [`Self::completion_backend`].
120 completion_backend_handle:
121 Option<Arc<dyn ff_core::completion_backend::CompletionBackend>>,
122}
123
124/// Number of partitions scanned per `claim_next()` poll. Keeps idle Valkey
125/// load at O(PARTITION_SCAN_CHUNK) per worker-second instead of
126/// O(num_flow_partitions).
127#[cfg(feature = "direct-valkey-claim")]
128const PARTITION_SCAN_CHUNK: usize = 32;
129
130impl FlowFabricWorker {
131 /// Connect to Valkey and prepare the worker.
132 ///
133 /// Establishes the ferriskey connection. Does NOT load the FlowFabric
134 /// library — that is the server's responsibility (ff-server calls
135 /// `ff_script::loader::ensure_library()` on startup). The SDK assumes
136 /// the library is already loaded.
137 pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError> {
138 if config.lanes.is_empty() {
139 return Err(SdkError::Config {
140 context: "worker_config".into(),
141 field: None,
142 message: "at least one lane is required".into(),
143 });
144 }
145
146 let mut builder = ClientBuilder::new()
147 .host(&config.host, config.port)
148 .connect_timeout(Duration::from_secs(10))
149 .request_timeout(Duration::from_millis(5000));
150 if config.tls {
151 builder = builder.tls();
152 }
153 if config.cluster {
154 builder = builder.cluster();
155 }
156 let client = builder.build()
157 .await
158 .map_err(|e| SdkError::ValkeyContext { source: e, context: "failed to connect".into() })?;
159
160 // Verify connectivity
161 let pong: String = client
162 .cmd("PING")
163 .execute()
164 .await
165 .map_err(|e| SdkError::ValkeyContext { source: e, context: "PING failed".into() })?;
166 if pong != "PONG" {
167 return Err(SdkError::Config {
168 context: "worker_connect".into(),
169 field: None,
170 message: format!("unexpected PING response: {pong}"),
171 });
172 }
173
174 // Guard against two worker processes sharing the same
175 // `worker_instance_id`. A duplicate instance would clobber each
176 // other's lease_current/active_index entries and double-claim work.
177 // SET NX on a liveness key with 2× lease TTL; if the key already
178 // exists another process is live. The key auto-expires if this
179 // process crashes without renewal, so a restart after a hard crash
180 // just waits at most 2× lease_ttl_ms for the ghost entry to clear.
181 //
182 // Known limitations of this minimal scheme (documented for operators):
183 // 1. **Startup-only, not runtime.** There is no heartbeat renewal
184 // path. After `2 × lease_ttl_ms` elapses the alive key expires
185 // naturally even while this worker is still running, and a
186 // second process with the same `worker_instance_id` launched
187 // later will successfully SET NX alongside the first. The check
188 // catches misconfiguration at boot; it does not fence duplicates
189 // that appear mid-lifetime. Production deployments should rely
190 // on the orchestrator (Kubernetes, systemd unit with
191 // `Restart=on-failure`, etc.) as the authoritative single-
192 // instance enforcer; this SET NX is belt-and-suspenders.
193 //
194 // 2. **Restart delay after a crash.** If a worker crashes
195 // ungracefully (SIGKILL, container OOM) and is restarted within
196 // `2 × lease_ttl_ms`, the alive key is still present and the
197 // new process exits with `SdkError::Config("duplicate
198 // worker_instance_id ...")`. Options for operators:
199 // - Wait `2 × lease_ttl_ms` (default 60s with the 30s TTL)
200 // before restarting.
201 // - Manually `DEL ff:worker:<instance_id>:alive` in Valkey to
202 // unblock the restart.
203 // - Use a fresh `worker_instance_id` for the restart (the
204 // orchestrator should already do this per-Pod).
205 //
206 // 3. **No graceful cleanup on shutdown.** There is no explicit
207 // `disconnect()` call that DELs the alive key. On clean
208 // `SIGTERM` the key lingers until its TTL expires. A follow-up
209 // can add `FlowFabricWorker::disconnect(self)` for callers that
210 // want to skip the restart-delay window.
211 let alive_key = format!("ff:worker:{}:alive", config.worker_instance_id);
212 let alive_ttl_ms = (config.lease_ttl_ms.saturating_mul(2)).max(1_000);
213 let set_result: Option<String> = client
214 .cmd("SET")
215 .arg(&alive_key)
216 .arg("1")
217 .arg("NX")
218 .arg("PX")
219 .arg(alive_ttl_ms.to_string().as_str())
220 .execute()
221 .await
222 .map_err(|e| SdkError::ValkeyContext {
223 source: e,
224 context: "SET NX worker alive key".into(),
225 })?;
226 if set_result.is_none() {
227 return Err(SdkError::Config {
228 context: "worker_connect".into(),
229 field: Some("worker_instance_id".into()),
230 message: format!(
231 "duplicate worker_instance_id '{}': another process already holds {alive_key}",
232 config.worker_instance_id
233 ),
234 });
235 }
236
237 // Read partition config from Valkey (set by ff-server on startup).
238 // Falls back to defaults if key doesn't exist (e.g. SDK-only testing).
239 let partition_config = read_partition_config(&client).await
240 .unwrap_or_else(|e| {
241 tracing::warn!(
242 error = %e,
243 "ff:config:partitions not found, using defaults"
244 );
245 PartitionConfig::default()
246 });
247
248 let max_tasks = config.max_concurrent_tasks.max(1);
249 let concurrency_semaphore = Arc::new(Semaphore::new(max_tasks));
250
251 tracing::info!(
252 worker_id = %config.worker_id,
253 instance_id = %config.worker_instance_id,
254 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
255 "FlowFabricWorker connected"
256 );
257
258 #[cfg(feature = "direct-valkey-claim")]
259 let scan_cursor_init = scan_cursor_seed(
260 config.worker_instance_id.as_str(),
261 partition_config.num_flow_partitions.max(1) as usize,
262 );
263
264 // Sort + dedupe capabilities into a stable CSV. BTreeSet both sorts
265 // and deduplicates in one pass; string joining happens once here.
266 //
267 // Ingress validation mirrors Scheduler::claim_for_worker (ff-scheduler):
268 // - `,` is the CSV delimiter; a token containing one would split
269 // mid-parse and could let a {"gpu"} worker appear to satisfy
270 // {"gpu,cuda"} (silent auth bypass).
271 // - Empty strings would produce leading/adjacent commas on the
272 // wire and inflate token count for no semantic reason.
273 // - Non-printable / whitespace chars: `"gpu "` vs `"gpu"` or
274 // `"gpu\n"` vs `"gpu"` produce silent mismatches that are
275 // miserable to debug. Reject anything outside printable ASCII
276 // excluding space (`'!'..='~'`) at ingress so a typo fails
277 // loudly at connect instead of silently mis-routing forever.
278 // Reject at boot so operator misconfig is loud, symmetric with the
279 // scheduler path.
280 #[cfg(feature = "direct-valkey-claim")]
281 for cap in &config.capabilities {
282 if cap.is_empty() {
283 return Err(SdkError::Config {
284 context: "worker_config".into(),
285 field: Some("capabilities".into()),
286 message: "capability token must not be empty".into(),
287 });
288 }
289 if cap.contains(',') {
290 return Err(SdkError::Config {
291 context: "worker_config".into(),
292 field: Some("capabilities".into()),
293 message: format!(
294 "capability token may not contain ',' (CSV delimiter): {cap:?}"
295 ),
296 });
297 }
298 // Reject ASCII control bytes (0x00-0x1F, 0x7F) and any ASCII
299 // whitespace (space, tab, LF, CR, FF, VT). UTF-8 printable
300 // characters above 0x7F are ALLOWED so i18n caps like
301 // "东京-gpu" can be used. The CSV wire form is byte-safe for
302 // multibyte UTF-8 because `,` is always a single byte and
303 // never part of a multibyte continuation (only 0x80-0xBF are
304 // continuations, ',' is 0x2C).
305 if cap.chars().any(|c| c.is_control() || c.is_whitespace()) {
306 return Err(SdkError::Config {
307 context: "worker_config".into(),
308 field: Some("capabilities".into()),
309 message: format!(
310 "capability token must not contain whitespace or control \
311 characters: {cap:?}"
312 ),
313 });
314 }
315 }
316 #[cfg(feature = "direct-valkey-claim")]
317 let worker_capabilities_csv: String = {
318 let set: std::collections::BTreeSet<&str> = config
319 .capabilities
320 .iter()
321 .map(|s| s.as_str())
322 .filter(|s| !s.is_empty())
323 .collect();
324 if set.len() > ff_core::policy::CAPS_MAX_TOKENS {
325 return Err(SdkError::Config {
326 context: "worker_config".into(),
327 field: Some("capabilities".into()),
328 message: format!(
329 "capability set exceeds CAPS_MAX_TOKENS ({}): {}",
330 ff_core::policy::CAPS_MAX_TOKENS,
331 set.len()
332 ),
333 });
334 }
335 let csv = set.into_iter().collect::<Vec<_>>().join(",");
336 if csv.len() > ff_core::policy::CAPS_MAX_BYTES {
337 return Err(SdkError::Config {
338 context: "worker_config".into(),
339 field: Some("capabilities".into()),
340 message: format!(
341 "capability CSV exceeds CAPS_MAX_BYTES ({}): {}",
342 ff_core::policy::CAPS_MAX_BYTES,
343 csv.len()
344 ),
345 });
346 }
347 csv
348 };
349
350 // Short stable digest of the sorted caps CSV, computed once so
351 // per-mismatch logs carry a stable identifier instead of the 4KB
352 // CSV. Shared helper — ff-scheduler uses the same one for its
353 // own per-mismatch logs, so cross-component log lines are
354 // diffable against each other.
355 #[cfg(feature = "direct-valkey-claim")]
356 let worker_capabilities_hash = ff_core::hash::fnv1a_xor8hex(&worker_capabilities_csv);
357
358 // Full CSV logged once at connect so per-mismatch logs (which
359 // carry only the 8-hex hash) can be cross-referenced by ops.
360 #[cfg(feature = "direct-valkey-claim")]
361 if !worker_capabilities_csv.is_empty() {
362 tracing::info!(
363 worker_instance_id = %config.worker_instance_id,
364 worker_caps_hash = %worker_capabilities_hash,
365 worker_caps = %worker_capabilities_csv,
366 "worker connected with capabilities (full CSV — mismatch logs use hash only)"
367 );
368 }
369
370 // Non-authoritative advertisement of caps for operator visibility
371 // (CLI introspection, dashboards). The AUTHORITATIVE source for
372 // scheduling decisions is ARGV[9] on each claim — Lua reads ONLY
373 // that, never this string. Lossy here is correctness-safe.
374 //
375 // Storage: a single STRING key holding the sorted CSV. Rationale:
376 // * **Atomic overwrite.** `SET` is a single command — a concurrent
377 // reader can never observe a transient empty value (the prior
378 // DEL+SADD pair had that window).
379 // * **Crash cleanup without refresh loop.** The alive-key SET NX
380 // is startup-only (see §1 above); there's no periodic renew
381 // to piggy-back on, so a TTL on caps would independently
382 // expire mid-flight and hide a live worker's caps from ops
383 // tools. Instead we drop the TTL: each reconnect overwrites;
384 // a crashed worker leaves a stale CSV until a new process
385 // with the same worker_instance_id boots (which triggers
386 // `duplicate worker_instance_id` via alive-key guard anyway —
387 // the orchestrator allocates a new id, and operators can DEL
388 // the stale caps key if they care).
389 // * **Empty caps = DEL.** A restart from {gpu} to {} clears the
390 // advertisement rather than leaving stale data.
391 // Cluster-safe advertisement: the per-worker caps STRING lives at
392 // `ff:worker:{id}:caps` (lands on whatever slot CRC16 puts it on),
393 // and the INSTANCE ID is SADD'd to the global workers-index SET
394 // `ff:idx:workers` (single slot). The unblock scanner's cluster
395 // enumeration uses SMEMBERS on the index + per-member GET on each
396 // caps key, instead of `SCAN MATCH ff:worker:*:caps` (which in
397 // cluster mode only scans the shard the SCAN lands on and misses
398 // workers whose key hashes elsewhere). Pattern mirrors Batch A
399 // `budget_policies_index` / `flow_index` / `deps_all_edges`:
400 // operations stay atomic per command, the index is the
401 // cluster-wide enumeration surface.
402 #[cfg(feature = "direct-valkey-claim")]
403 {
404 let caps_key = ff_core::keys::worker_caps_key(&config.worker_instance_id);
405 let index_key = ff_core::keys::workers_index_key();
406 let instance_id = config.worker_instance_id.to_string();
407 if worker_capabilities_csv.is_empty() {
408 // No caps advertised. DEL the per-worker caps string AND
409 // SREM from the index so the scanner doesn't GET an empty
410 // string for a worker that never declares caps.
411 let _ = client
412 .cmd("DEL")
413 .arg(&caps_key)
414 .execute::<Option<i64>>()
415 .await;
416 if let Err(e) = client
417 .cmd("SREM")
418 .arg(&index_key)
419 .arg(&instance_id)
420 .execute::<Option<i64>>()
421 .await
422 {
423 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
424 "SREM workers-index failed; continuing (non-authoritative)");
425 }
426 } else {
427 // Atomic overwrite of the caps STRING (one-command). Then
428 // SADD to the index (idempotent — re-running connect for
429 // the same id is a no-op at SADD level). The per-worker
430 // caps key is written BEFORE the index SADD so that when
431 // the scanner observes the id in the index, the caps key
432 // is guaranteed to resolve to a non-stale CSV (the reverse
433 // order would leak an index entry pointing at a stale or
434 // empty caps key during a narrow window).
435 if let Err(e) = client
436 .cmd("SET")
437 .arg(&caps_key)
438 .arg(&worker_capabilities_csv)
439 .execute::<Option<String>>()
440 .await
441 {
442 tracing::warn!(error = %e, key = %caps_key,
443 "SET worker caps advertisement failed; continuing");
444 }
445 if let Err(e) = client
446 .cmd("SADD")
447 .arg(&index_key)
448 .arg(&instance_id)
449 .execute::<Option<i64>>()
450 .await
451 {
452 tracing::warn!(error = %e, key = %index_key, instance = %instance_id,
453 "SADD workers-index failed; continuing");
454 }
455 }
456 }
457
458 // RFC-012 Stage 1b: wrap the dialed client in a
459 // ValkeyBackend so `ClaimedTask`'s trait forwarders have
460 // something to call. `from_client_and_partitions` reuses the
461 // already-dialed client — no second connection.
462 // Share the concrete `Arc<ValkeyBackend>` across the two
463 // trait objects — one allocation, both accessors yield
464 // identity-equivalent handles.
465 let valkey_backend: Arc<ff_backend_valkey::ValkeyBackend> =
466 ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
467 client.clone(),
468 partition_config,
469 );
470 let backend: Arc<dyn ff_core::engine_backend::EngineBackend> = valkey_backend.clone();
471 let completion_backend_handle: Option<
472 Arc<dyn ff_core::completion_backend::CompletionBackend>,
473 > = Some(valkey_backend);
474
475 Ok(Self {
476 client,
477 config,
478 partition_config,
479 #[cfg(feature = "direct-valkey-claim")]
480 worker_capabilities_csv,
481 #[cfg(feature = "direct-valkey-claim")]
482 worker_capabilities_hash,
483 #[cfg(feature = "direct-valkey-claim")]
484 lane_index: AtomicUsize::new(0),
485 concurrency_semaphore,
486 #[cfg(feature = "direct-valkey-claim")]
487 scan_cursor: AtomicUsize::new(scan_cursor_init),
488 backend,
489 completion_backend_handle,
490 })
491 }
492
493 /// Store a pre-built [`EngineBackend`] on the worker. Builds
494 /// the worker via the legacy [`FlowFabricWorker::connect`] path
495 /// first (so the embedded `ferriskey::Client` that the Stage 1b
496 /// non-migrated hot paths still use is dialed), then replaces
497 /// the default `ValkeyBackend` wrapper with the caller-supplied
498 /// `Arc<dyn EngineBackend>`.
499 ///
500 /// **Stage 1b scope — what the injected backend covers today.**
501 /// After this PR, `ClaimedTask`'s 8 migrated ops
502 /// (`renew_lease` / `update_progress` / `resume_signals` /
503 /// `delay_execution` / `move_to_waiting_children` /
504 /// `complete` / `cancel` / `fail`) route through the injected
505 /// backend. That's the first material use of `connect_with`: a
506 /// mock backend now genuinely sees the worker's per-task
507 /// write-surface calls. The 4 trait-shape-deferred ops
508 /// (`create_pending_waitpoint`, `append_frame`, `suspend`,
509 /// `report_usage`) still reach the embedded
510 /// `ferriskey::Client` directly until issue #117's trait
511 /// amendment lands; `claim_next` / `claim_from_grant` /
512 /// `claim_from_reclaim_grant` / `deliver_signal` / admin queries
513 /// are Stage 1c hot-path work. Stage 1d removes the embedded
514 /// client entirely.
515 ///
516 /// Today's constructor is therefore NOT yet a drop-in way to swap
517 /// in a non-Valkey backend — it requires a reachable Valkey node
518 /// for the 4 deferred + hot-path ops. Tests that exercise only
519 /// the 8 migrated ops can run fully against a mock backend.
520 ///
521 /// [`EngineBackend`]: ff_core::engine_backend::EngineBackend
522 pub async fn connect_with(
523 config: WorkerConfig,
524 backend: Arc<dyn ff_core::engine_backend::EngineBackend>,
525 ) -> Result<Self, SdkError> {
526 let mut worker = Self::connect(config).await?;
527 worker.backend = backend;
528 // The caller-supplied trait object cannot be upcast to
529 // `CompletionBackend` without loss of identity; clear the
530 // default `ValkeyBackend`-derived handle so
531 // `completion_backend()` reports `None`, reflecting that
532 // the caller-supplied backend owns the surface now.
533 worker.completion_backend_handle = None;
534 Ok(worker)
535 }
536
537 /// Borrow the `EngineBackend` this worker forwards Stage-1b trait
538 /// ops through.
539 ///
540 /// **RFC-012 Stage 1b.** Always returns `Some(&self.backend)` —
541 /// the `Option` wrapper is retained for API stability with the
542 /// Stage-1a shape. Stage 1c narrows the return type to
543 /// `&Arc<dyn EngineBackend>`.
544 pub fn backend(&self) -> Option<&Arc<dyn ff_core::engine_backend::EngineBackend>> {
545 Some(&self.backend)
546 }
547
548 /// Handle to the completion-event subscription backend, for
549 /// consumers that need to observe execution completions (DAG
550 /// reconcilers, tenant-isolated subscribers).
551 ///
552 /// Returns `Some` when the worker was built through
553 /// [`Self::connect`] on the default `valkey-default` feature
554 /// (the bundled `ValkeyBackend` implements
555 /// [`CompletionBackend`](ff_core::completion_backend::CompletionBackend)).
556 /// Returns `None` when the worker was built via
557 /// [`Self::connect_with`] (the caller-supplied
558 /// `Arc<dyn EngineBackend>` cannot be re-upcast to
559 /// `CompletionBackend`), or for hypothetical future backends
560 /// that don't support push-based completion streams.
561 ///
562 /// The returned handle shares the same underlying allocation as
563 /// [`Self::backend`]; calls through it (e.g.
564 /// `subscribe_completions_filtered`) hit the same connection
565 /// the worker itself uses.
566 pub fn completion_backend(
567 &self,
568 ) -> Option<Arc<dyn ff_core::completion_backend::CompletionBackend>> {
569 self.completion_backend_handle.clone()
570 }
571
572 /// Get a reference to the underlying ferriskey client.
573 pub fn client(&self) -> &Client {
574 &self.client
575 }
576
577 /// Get the worker config.
578 pub fn config(&self) -> &WorkerConfig {
579 &self.config
580 }
581
582 /// Get the server-published partition config this worker bound to at
583 /// `connect()`. Callers need this when computing partition hash-tags
584 /// for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned
585 /// with the server's `num_flow_partitions` — using
586 /// `PartitionConfig::default()` assumes 256 partitions and silently
587 /// misses data on deployments with any other value.
588 pub fn partition_config(&self) -> &ff_core::partition::PartitionConfig {
589 &self.partition_config
590 }
591
592 /// Attempt to claim the next eligible execution.
593 ///
594 /// Phase 1 simplified claim flow:
595 /// 1. Pick a lane (round-robin across configured lanes)
596 /// 2. Issue a claim grant via `ff_issue_claim_grant` on the execution's partition
597 /// 3. Claim the execution via `ff_claim_execution`
598 /// 4. Read execution payload + tags
599 /// 5. Return a [`ClaimedTask`] with auto lease renewal
600 ///
601 /// Gated behind the `direct-valkey-claim` feature — bypasses the
602 /// scheduler's budget / quota / capability admission checks. Enable
603 /// with `ff-sdk = { ..., features = ["direct-valkey-claim"] }` when
604 /// the scheduler hop would be measurement noise (benches) or when
605 /// the test harness needs a deterministic worker-local path. Prefer
606 /// the scheduler-routed HTTP claim path in production.
607 ///
608 /// # `None` semantics
609 ///
610 /// `Ok(None)` means **no work was found in the partition window this
611 /// poll covered**, not "the cluster is idle". Each call scans a chunk
612 /// of [`PARTITION_SCAN_CHUNK`] partitions starting at the rolling
613 /// `scan_cursor`; the cursor advances by that chunk size on every
614 /// invocation, so a worker covers every partition exactly once every
615 /// `ceil(num_flow_partitions / PARTITION_SCAN_CHUNK)` polls.
616 ///
617 /// Callers should treat `None` as "poll again soon" (typically after
618 /// `config.claim_poll_interval_ms`) rather than "sleep for a long
619 /// time". Backing off too aggressively on `None` can starve workers
620 /// when work lives on partitions outside the current window.
621 ///
622 /// Returns `Err` on Valkey errors or script failures.
623 #[cfg(feature = "direct-valkey-claim")]
624 pub async fn claim_next(&self) -> Result<Option<ClaimedTask>, SdkError> {
625 // Enforce max_concurrent_tasks: try to acquire a semaphore permit.
626 // try_acquire returns immediately — if no permits available, the worker
627 // is at capacity and should not claim more work.
628 let permit = match self.concurrency_semaphore.clone().try_acquire_owned() {
629 Ok(p) => p,
630 Err(_) => return Ok(None), // At capacity — no claim attempted
631 };
632
633 let lane_id = self.next_lane();
634 let now = TimestampMs::now();
635
636 // Phase 1: We scan eligible executions directly by reading the eligible
637 // ZSET across execution partitions. In production the scheduler
638 // (ff-scheduler) would handle this. For Phase 1, the SDK does a
639 // simplified inline claim.
640 //
641 // Chunked scan: each poll covers at most PARTITION_SCAN_CHUNK
642 // partitions starting at a rolling offset. This keeps idle Valkey
643 // load at O(chunk) per worker-second instead of O(num_partitions),
644 // and the worker-instance-seeded initial cursor spreads concurrent
645 // workers across different partition windows.
646 let num_partitions = self.partition_config.num_flow_partitions as usize;
647 if num_partitions == 0 {
648 return Ok(None);
649 }
650 let chunk = PARTITION_SCAN_CHUNK.min(num_partitions);
651 let start = self.scan_cursor.fetch_add(chunk, Ordering::Relaxed) % num_partitions;
652
653 for step in 0..chunk {
654 let partition_idx = ((start + step) % num_partitions) as u16;
655 let partition = ff_core::partition::Partition {
656 family: ff_core::partition::PartitionFamily::Execution,
657 index: partition_idx,
658 };
659 let idx = IndexKeys::new(&partition);
660 let eligible_key = idx.lane_eligible(&lane_id);
661
662 // ZRANGEBYSCORE to get the highest-priority eligible execution.
663 // Score format: -(priority * 1_000_000_000_000) + created_at_ms
664 // ZRANGEBYSCORE with "-inf" "+inf" LIMIT 0 1 gives lowest score = highest priority.
665 let result: Value = self
666 .client
667 .cmd("ZRANGEBYSCORE")
668 .arg(&eligible_key)
669 .arg("-inf")
670 .arg("+inf")
671 .arg("LIMIT")
672 .arg("0")
673 .arg("1")
674 .execute()
675 .await
676 .map_err(|e| SdkError::ValkeyContext { source: e, context: "ZRANGEBYSCORE failed".into() })?;
677
678 let execution_id_str = match extract_first_array_string(&result) {
679 Some(s) => s,
680 None => continue, // No eligible executions on this partition
681 };
682
683 let execution_id = ExecutionId::parse(&execution_id_str).map_err(|e| {
684 SdkError::from(ff_script::error::ScriptError::Parse {
685 fcall: "claim_execution_from_eligible_set".into(),
686 execution_id: None,
687 message: format!("bad execution_id in eligible set: {e}"),
688 })
689 })?;
690
691 // Step 1: Issue claim grant
692 let grant_result = self
693 .issue_claim_grant(&execution_id, &lane_id, &partition, &idx)
694 .await;
695
696 match grant_result {
697 Ok(()) => {}
698 Err(SdkError::Engine(ref boxed))
699 if matches!(
700 **boxed,
701 crate::EngineError::Validation {
702 kind: crate::ValidationKind::CapabilityMismatch,
703 ..
704 }
705 ) =>
706 {
707 let missing = match &**boxed {
708 crate::EngineError::Validation { detail, .. } => detail.clone(),
709 _ => unreachable!(),
710 };
711 // Block-on-mismatch (RFC-009 §7.5) — parity with
712 // ff-scheduler's Scheduler::claim_for_worker. Without
713 // this, the inline-direct-claim path would hot-loop
714 // on an unclaimable top-of-zset (every tick picks the
715 // same execution, wastes an FCALL, logs, releases,
716 // repeats). The scheduler-side unblock scanner
717 // promotes blocked_route executions back to eligible
718 // when a worker with matching caps registers.
719 tracing::info!(
720 execution_id = %execution_id,
721 worker_id = %self.config.worker_id,
722 worker_caps_hash = %self.worker_capabilities_hash,
723 missing = %missing,
724 "capability mismatch, blocking execution off eligible (SDK inline claim)"
725 );
726 self.block_route(&execution_id, &lane_id, &partition, &idx).await;
727 continue;
728 }
729 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
730 tracing::debug!(
731 execution_id = %execution_id,
732 error = %e,
733 "claim grant failed (retryable), trying next partition"
734 );
735 continue;
736 }
737 Err(e) => return Err(e),
738 }
739
740 // Step 2: Claim the execution
741 match self
742 .claim_execution(&execution_id, &lane_id, &partition, now)
743 .await
744 {
745 Ok(mut task) => {
746 // Transfer concurrency permit to the task. When the task is
747 // completed/failed/cancelled/dropped the permit returns to
748 // the semaphore, allowing another claim.
749 task.set_concurrency_permit(permit);
750 return Ok(Some(task));
751 }
752 Err(SdkError::Engine(ref boxed))
753 if matches!(
754 **boxed,
755 crate::EngineError::Contention(
756 crate::ContentionKind::UseClaimResumedExecution
757 )
758 ) =>
759 {
760 // Execution was resumed from suspension — attempt_interrupted.
761 // ff_claim_execution rejects this; use ff_claim_resumed_execution
762 // which reuses the existing attempt instead of creating a new one.
763 tracing::debug!(
764 execution_id = %execution_id,
765 "execution is resumed, using claim_resumed path"
766 );
767 match self
768 .claim_resumed_execution(&execution_id, &lane_id, &partition)
769 .await
770 {
771 Ok(mut task) => {
772 task.set_concurrency_permit(permit);
773 return Ok(Some(task));
774 }
775 Err(SdkError::Engine(ref e2)) if is_retryable_claim_error(e2) => {
776 tracing::debug!(
777 execution_id = %execution_id,
778 error = %e2,
779 "claim_resumed failed (retryable), trying next partition"
780 );
781 continue;
782 }
783 Err(e2) => return Err(e2),
784 }
785 }
786 Err(SdkError::Engine(ref e)) if is_retryable_claim_error(e) => {
787 tracing::debug!(
788 execution_id = %execution_id,
789 error = %e,
790 "claim execution failed (retryable), trying next partition"
791 );
792 continue;
793 }
794 Err(e) => return Err(e),
795 }
796 }
797
798 // No eligible work found on any partition
799 Ok(None)
800 }
801
802 #[cfg(feature = "direct-valkey-claim")]
803 async fn issue_claim_grant(
804 &self,
805 execution_id: &ExecutionId,
806 lane_id: &LaneId,
807 partition: &ff_core::partition::Partition,
808 idx: &IndexKeys,
809 ) -> Result<(), SdkError> {
810 let ctx = ExecKeyContext::new(partition, execution_id);
811
812 // KEYS (3): exec_core, claim_grant_key, eligible_zset
813 let keys: Vec<String> = vec![
814 ctx.core(),
815 ctx.claim_grant(),
816 idx.lane_eligible(lane_id),
817 ];
818
819 // ARGV (9): eid, worker_id, worker_instance_id, lane_id,
820 // capability_hash, grant_ttl_ms, route_snapshot_json,
821 // admission_summary, worker_capabilities_csv (sorted)
822 let args: Vec<String> = vec![
823 execution_id.to_string(),
824 self.config.worker_id.to_string(),
825 self.config.worker_instance_id.to_string(),
826 lane_id.to_string(),
827 String::new(), // capability_hash
828 "5000".to_owned(), // grant_ttl_ms (5 seconds)
829 String::new(), // route_snapshot_json
830 String::new(), // admission_summary
831 self.worker_capabilities_csv.clone(), // sorted CSV
832 ];
833
834 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
835 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
836
837 let raw: Value = self
838 .client
839 .fcall("ff_issue_claim_grant", &key_refs, &arg_refs)
840 .await
841 .map_err(SdkError::Valkey)?;
842
843 crate::task::parse_success_result(&raw, "ff_issue_claim_grant")
844 }
845
846 /// Move an execution from the lane's eligible ZSET into its
847 /// blocked_route ZSET via `ff_block_execution_for_admission`. Called
848 /// after a `CapabilityMismatch` reject — without this, the inline
849 /// direct-claim path would re-pick the same top-of-zset every tick
850 /// (same pattern the scheduler's block_candidate handles). The
851 /// engine's unblock scanner periodically promotes blocked_route
852 /// back to eligible once a worker with matching caps registers.
853 ///
854 /// Best-effort: transport or logical rejects (e.g. the execution
855 /// already went terminal between pick and block) are logged and the
856 /// outer loop simply `continue`s to the next partition. Parity with
857 /// ff-scheduler::Scheduler::block_candidate.
858 #[cfg(feature = "direct-valkey-claim")]
859 async fn block_route(
860 &self,
861 execution_id: &ExecutionId,
862 lane_id: &LaneId,
863 partition: &ff_core::partition::Partition,
864 idx: &IndexKeys,
865 ) {
866 let ctx = ExecKeyContext::new(partition, execution_id);
867 let core_key = ctx.core();
868 let eligible_key = idx.lane_eligible(lane_id);
869 let blocked_key = idx.lane_blocked_route(lane_id);
870 let eid_s = execution_id.to_string();
871 let now_ms = TimestampMs::now().0.to_string();
872
873 let keys: [&str; 3] = [&core_key, &eligible_key, &blocked_key];
874 let argv: [&str; 4] = [
875 &eid_s,
876 "waiting_for_capable_worker",
877 "no connected worker satisfies required_capabilities",
878 &now_ms,
879 ];
880
881 match self
882 .client
883 .fcall::<Value>("ff_block_execution_for_admission", &keys, &argv)
884 .await
885 {
886 Ok(v) => {
887 // Parse Lua result so a logical reject (e.g. execution
888 // went terminal mid-flight) is visible — same fix we
889 // applied to ff-scheduler's block_candidate.
890 if let Err(e) = crate::task::parse_success_result(&v, "ff_block_execution_for_admission") {
891 tracing::warn!(
892 execution_id = %execution_id,
893 error = %e,
894 "SDK block_route: Lua rejected; eligible ZSET unchanged, next poll \
895 will re-evaluate"
896 );
897 }
898 }
899 Err(e) => {
900 tracing::warn!(
901 execution_id = %execution_id,
902 error = %e,
903 "SDK block_route: transport failure; eligible ZSET unchanged"
904 );
905 }
906 }
907 }
908
909 /// Low-level claim of a granted execution. Invokes
910 /// `ff_claim_execution` and returns a `ClaimedTask` with auto
911 /// lease renewal.
912 ///
913 /// Previously gated behind `direct-valkey-claim`; ungated so
914 /// the public [`claim_from_grant`] entry point can reuse the
915 /// same FCALL plumbing. The method stays private — external
916 /// callers use `claim_from_grant`.
917 ///
918 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
919 async fn claim_execution(
920 &self,
921 execution_id: &ExecutionId,
922 lane_id: &LaneId,
923 partition: &ff_core::partition::Partition,
924 _now: TimestampMs,
925 ) -> Result<ClaimedTask, SdkError> {
926 let ctx = ExecKeyContext::new(partition, execution_id);
927 let idx = IndexKeys::new(partition);
928
929 // Pre-read total_attempt_count from exec_core to derive next attempt index.
930 // The Lua uses total_attempt_count as the new index and dynamically builds
931 // the attempt key from the hash tag, so KEYS[6-8] are placeholders, but
932 // we pass the correct index for documentation/debugging.
933 let total_str: Option<String> = self.client
934 .cmd("HGET")
935 .arg(ctx.core())
936 .arg("total_attempt_count")
937 .execute()
938 .await
939 .unwrap_or(None);
940 let next_idx = total_str
941 .as_deref()
942 .and_then(|s| s.parse::<u32>().ok())
943 .unwrap_or(0);
944 let att_idx = AttemptIndex::new(next_idx);
945
946 let lease_id = LeaseId::new().to_string();
947 let attempt_id = AttemptId::new().to_string();
948 let renew_before_ms = self.config.lease_ttl_ms * 2 / 3;
949
950 // KEYS (14): must match lua/execution.lua ff_claim_execution positional order
951 let keys: Vec<String> = vec![
952 ctx.core(), // 1 exec_core
953 ctx.claim_grant(), // 2 claim_grant
954 idx.lane_eligible(lane_id), // 3 eligible_zset
955 idx.lease_expiry(), // 4 lease_expiry_zset
956 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
957 ctx.attempt_hash(att_idx), // 6 attempt_hash (placeholder)
958 ctx.attempt_usage(att_idx), // 7 attempt_usage (placeholder)
959 ctx.attempt_policy(att_idx), // 8 attempt_policy (placeholder)
960 ctx.attempts(), // 9 attempts_zset
961 ctx.lease_current(), // 10 lease_current
962 ctx.lease_history(), // 11 lease_history
963 idx.lane_active(lane_id), // 12 active_index
964 idx.attempt_timeout(), // 13 attempt_timeout_zset
965 idx.execution_deadline(), // 14 execution_deadline_zset
966 ];
967
968 // ARGV (12): must match lua/execution.lua ff_claim_execution positional order
969 let args: Vec<String> = vec![
970 execution_id.to_string(), // 1 execution_id
971 self.config.worker_id.to_string(), // 2 worker_id
972 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
973 lane_id.to_string(), // 4 lane
974 String::new(), // 5 capability_hash
975 lease_id.clone(), // 6 lease_id
976 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
977 renew_before_ms.to_string(), // 8 renew_before_ms
978 attempt_id.clone(), // 9 attempt_id
979 "{}".to_owned(), // 10 attempt_policy_json
980 String::new(), // 11 attempt_timeout_ms
981 String::new(), // 12 execution_deadline_at
982 ];
983
984 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
985 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
986
987 let raw: Value = self
988 .client
989 .fcall("ff_claim_execution", &key_refs, &arg_refs)
990 .await
991 .map_err(SdkError::Valkey)?;
992
993 // Parse claim result: {1, "OK", lease_id, lease_epoch, attempt_index,
994 // attempt_id, attempt_type, lease_expires_at}
995 let arr = match &raw {
996 Value::Array(arr) => arr,
997 _ => {
998 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
999 fcall: "ff_claim_execution".into(),
1000 execution_id: Some(execution_id.to_string()),
1001 message: "expected Array".into(),
1002 }));
1003 }
1004 };
1005
1006 let status_code = match arr.first() {
1007 Some(Ok(Value::Int(n))) => *n,
1008 _ => {
1009 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1010 fcall: "ff_claim_execution".into(),
1011 execution_id: Some(execution_id.to_string()),
1012 message: "bad status code".into(),
1013 }));
1014 }
1015 };
1016
1017 if status_code != 1 {
1018 let err_field_str = |idx: usize| -> String {
1019 arr.get(idx)
1020 .and_then(|v| match v {
1021 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1022 Ok(Value::SimpleString(s)) => Some(s.clone()),
1023 _ => None,
1024 })
1025 .unwrap_or_default()
1026 };
1027 let error_code = {
1028 let s = err_field_str(1);
1029 if s.is_empty() { "unknown".to_owned() } else { s }
1030 };
1031 let detail = err_field_str(2);
1032
1033 return Err(SdkError::from(
1034 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1035 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1036 fcall: "ff_claim_execution".into(),
1037 execution_id: Some(execution_id.to_string()),
1038 message: format!("unknown error: {error_code}"),
1039 }),
1040 ));
1041 }
1042
1043 // Extract fields from success response
1044 let field_str = |idx: usize| -> String {
1045 arr.get(idx + 2) // skip status_code and "OK"
1046 .and_then(|v| match v {
1047 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1048 Ok(Value::SimpleString(s)) => Some(s.clone()),
1049 Ok(Value::Int(n)) => Some(n.to_string()),
1050 _ => None,
1051 })
1052 .unwrap_or_default()
1053 };
1054
1055 // Lua returns: ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
1056 // Positions: 0 1 2 3 4 5
1057 let lease_id = LeaseId::parse(&field_str(0))
1058 .unwrap_or_else(|_| LeaseId::new());
1059 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1060 // field_str(2) is expires_at — skip it (lease timing managed by renewal)
1061 let attempt_id = AttemptId::parse(&field_str(3))
1062 .unwrap_or_else(|_| AttemptId::new());
1063 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1064
1065 // Read execution payload and metadata
1066 let (input_payload, execution_kind, tags) = self
1067 .read_execution_context(execution_id, partition)
1068 .await?;
1069
1070 Ok(ClaimedTask::new(
1071 self.client.clone(),
1072 self.backend.clone(),
1073 self.partition_config,
1074 execution_id.clone(),
1075 attempt_index,
1076 attempt_id,
1077 lease_id,
1078 lease_epoch,
1079 self.config.lease_ttl_ms,
1080 lane_id.clone(),
1081 self.config.worker_instance_id.clone(),
1082 input_payload,
1083 execution_kind,
1084 tags,
1085 ))
1086 }
1087
1088 /// Consume a [`ClaimGrant`] and claim the granted execution on
1089 /// this worker. The intended production entry point: pair with
1090 /// [`ff_scheduler::Scheduler::claim_for_worker`] to flow
1091 /// scheduler-issued grants into the SDK without enabling the
1092 /// `direct-valkey-claim` feature (which bypasses budget/quota
1093 /// admission control).
1094 ///
1095 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1096 /// so a saturated worker does not consume the grant: the grant
1097 /// stays valid for its remaining TTL and the caller can either
1098 /// release it back to the scheduler or retry after some other
1099 /// in-flight task completes.
1100 ///
1101 /// On success the returned [`ClaimedTask`] holds a concurrency
1102 /// permit that releases automatically on
1103 /// `complete`/`fail`/`cancel`/drop — same contract as
1104 /// `claim_next`.
1105 ///
1106 /// # Arguments
1107 ///
1108 /// * `lane` — the lane the grant was issued for. Must match what
1109 /// was passed to `Scheduler::claim_for_worker`; the Lua FCALL
1110 /// uses it to look up `lane_eligible`, `lane_active`, and the
1111 /// `worker_leases` index slot.
1112 /// * `grant` — the [`ClaimGrant`] returned by the scheduler.
1113 ///
1114 /// # Errors
1115 ///
1116 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1117 /// permits all held. Retryable; the grant is untouched.
1118 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1119 /// or `worker_id` mismatch (wrapped in [`SdkError::Engine`]).
1120 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed
1121 /// (wrapped in [`SdkError::Engine`]).
1122 /// * `ScriptError::CapabilityMismatch` — execution's required
1123 /// capabilities not a subset of this worker's caps (wrapped in
1124 /// [`SdkError::Engine`]). Surfaced post-grant if a race
1125 /// between grant issuance and caps change allows it.
1126 /// * `ScriptError::Parse` — `ff_claim_execution` returned an
1127 /// unexpected shape (wrapped in [`SdkError::Engine`]).
1128 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1129 /// transport error during the FCALL or the
1130 /// `read_execution_context` follow-up.
1131 ///
1132 /// [`ClaimGrant`]: ff_core::contracts::ClaimGrant
1133 /// [`ff_scheduler::Scheduler::claim_for_worker`]: https://docs.rs/ff-scheduler
1134 pub async fn claim_from_grant(
1135 &self,
1136 lane: LaneId,
1137 grant: ff_core::contracts::ClaimGrant,
1138 ) -> Result<ClaimedTask, SdkError> {
1139 // Semaphore check FIRST. If the worker is saturated we must
1140 // surface the condition to the caller without touching the
1141 // grant — silently returning Ok(None) (as claim_next does)
1142 // would drop a grant the scheduler has already committed work
1143 // to issuing, wasting the slot until its TTL elapses.
1144 let permit = self
1145 .concurrency_semaphore
1146 .clone()
1147 .try_acquire_owned()
1148 .map_err(|_| SdkError::WorkerAtCapacity)?;
1149
1150 let now = TimestampMs::now();
1151 let partition = grant.partition().map_err(|e| SdkError::Config {
1152 context: "claim_from_grant".to_owned(),
1153 field: Some("partition_key".to_owned()),
1154 message: e.to_string(),
1155 })?;
1156 let mut task = self
1157 .claim_execution(&grant.execution_id, &lane, &partition, now)
1158 .await?;
1159 task.set_concurrency_permit(permit);
1160 Ok(task)
1161 }
1162
1163 /// Scheduler-routed claim: POST the server's
1164 /// `/v1/workers/{id}/claim`, then chain to
1165 /// [`Self::claim_from_grant`].
1166 ///
1167 /// Batch C item 2 PR-B. This is the production entry point —
1168 /// budget + quota + capability admission run server-side inside
1169 /// `ff_scheduler::Scheduler::claim_for_worker`. Callers don't
1170 /// enable the `direct-valkey-claim` feature.
1171 ///
1172 /// Returns `Ok(None)` when the server says no eligible execution
1173 /// (HTTP 204). Callers typically back off by
1174 /// `config.claim_poll_interval_ms` and try again, same cadence
1175 /// as the direct-claim path's `Ok(None)`.
1176 ///
1177 /// The `admin` client is the established HTTP surface
1178 /// (`FlowFabricAdminClient`) reused here so workers don't keep a
1179 /// second reqwest client around. Build once at worker boot and
1180 /// hand in by reference on every claim.
1181 pub async fn claim_via_server(
1182 &self,
1183 admin: &crate::FlowFabricAdminClient,
1184 lane: &LaneId,
1185 grant_ttl_ms: u64,
1186 ) -> Result<Option<ClaimedTask>, SdkError> {
1187 let req = crate::admin::ClaimForWorkerRequest {
1188 worker_id: self.config.worker_id.to_string(),
1189 lane_id: lane.to_string(),
1190 worker_instance_id: self.config.worker_instance_id.to_string(),
1191 capabilities: self.config.capabilities.clone(),
1192 grant_ttl_ms,
1193 };
1194 let Some(resp) = admin.claim_for_worker(req).await? else {
1195 return Ok(None);
1196 };
1197 let grant = resp.into_grant()?;
1198 self.claim_from_grant(lane.clone(), grant).await.map(Some)
1199 }
1200
1201 /// Consume a [`ReclaimGrant`] and transition the granted
1202 /// `attempt_interrupted` execution into a `started` state on this
1203 /// worker. Symmetric partner to [`claim_from_grant`] for the
1204 /// resume path.
1205 ///
1206 /// The grant must have been issued to THIS worker (matching
1207 /// `worker_id` at grant time). A mismatch returns
1208 /// `Err(Script(InvalidClaimGrant))`. The grant is consumed
1209 /// atomically by `ff_claim_resumed_execution`; a second call with
1210 /// the same grant also returns `InvalidClaimGrant`.
1211 ///
1212 /// # Concurrency
1213 ///
1214 /// The worker's concurrency semaphore is checked BEFORE the FCALL
1215 /// (same contract as [`claim_from_grant`]). Reclaim does NOT
1216 /// assume pre-existing capacity on this worker — a reclaim can
1217 /// land on a fresh worker instance that just came up after a
1218 /// crash/restart and is picking up a previously-interrupted
1219 /// execution. If the worker is saturated, the grant stays valid
1220 /// for its remaining TTL and the caller can release it or retry.
1221 ///
1222 /// On success the returned [`ClaimedTask`] holds a concurrency
1223 /// permit that releases automatically on
1224 /// `complete`/`fail`/`cancel`/drop.
1225 ///
1226 /// # Errors
1227 ///
1228 /// * [`SdkError::WorkerAtCapacity`] — `max_concurrent_tasks`
1229 /// permits all held. Retryable; the grant is untouched (no
1230 /// FCALL was issued, so `ff_claim_resumed_execution` did not
1231 /// atomically consume the grant key).
1232 /// * `ScriptError::InvalidClaimGrant` — grant missing, consumed,
1233 /// or `worker_id` mismatch.
1234 /// * `ScriptError::ClaimGrantExpired` — grant TTL elapsed.
1235 /// * `ScriptError::NotAResumedExecution` — `attempt_state` is not
1236 /// `attempt_interrupted`.
1237 /// * `ScriptError::ExecutionNotLeaseable` — `lifecycle_phase` is
1238 /// not `runnable`.
1239 /// * `ScriptError::ExecutionNotFound` — core key missing.
1240 /// * [`SdkError::Valkey`] / [`SdkError::ValkeyContext`] —
1241 /// transport.
1242 ///
1243 /// [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
1244 /// [`claim_from_grant`]: FlowFabricWorker::claim_from_grant
1245 pub async fn claim_from_reclaim_grant(
1246 &self,
1247 grant: ff_core::contracts::ReclaimGrant,
1248 ) -> Result<ClaimedTask, SdkError> {
1249 // Semaphore check FIRST — same load-bearing ordering as
1250 // `claim_from_grant`. If the worker is saturated, surface
1251 // WorkerAtCapacity without firing the FCALL; the FCALL is an
1252 // atomic consume on the grant key, so calling it past-
1253 // saturation would destroy the grant while leaving no
1254 // permit to attach to the returned `ClaimedTask`.
1255 let permit = self
1256 .concurrency_semaphore
1257 .clone()
1258 .try_acquire_owned()
1259 .map_err(|_| SdkError::WorkerAtCapacity)?;
1260
1261 // Grant carries partition + lane_id so no round-trip is needed
1262 // to resolve them before the FCALL.
1263 let partition = grant.partition().map_err(|e| SdkError::Config {
1264 context: "claim_from_reclaim_grant".to_owned(),
1265 field: Some("partition_key".to_owned()),
1266 message: e.to_string(),
1267 })?;
1268 let mut task = self
1269 .claim_resumed_execution(
1270 &grant.execution_id,
1271 &grant.lane_id,
1272 &partition,
1273 )
1274 .await?;
1275 task.set_concurrency_permit(permit);
1276 Ok(task)
1277 }
1278
1279 /// Low-level resume claim. Invokes `ff_claim_resumed_execution`
1280 /// and returns a `ClaimedTask` bound to the resumed attempt.
1281 ///
1282 /// Previously gated behind `direct-valkey-claim`; ungated so the
1283 /// public [`claim_from_reclaim_grant`] entry point can reuse it.
1284 /// The method stays private — external callers use
1285 /// `claim_from_reclaim_grant`.
1286 ///
1287 /// [`claim_from_reclaim_grant`]: FlowFabricWorker::claim_from_reclaim_grant
1288 async fn claim_resumed_execution(
1289 &self,
1290 execution_id: &ExecutionId,
1291 lane_id: &LaneId,
1292 partition: &ff_core::partition::Partition,
1293 ) -> Result<ClaimedTask, SdkError> {
1294 let ctx = ExecKeyContext::new(partition, execution_id);
1295 let idx = IndexKeys::new(partition);
1296
1297 // Pre-read current_attempt_index for the existing attempt hash key.
1298 // This is load-bearing: KEYS[6] must point to the real attempt hash.
1299 let att_idx_str: Option<String> = self.client
1300 .cmd("HGET")
1301 .arg(ctx.core())
1302 .arg("current_attempt_index")
1303 .execute()
1304 .await
1305 .map_err(|e| SdkError::ValkeyContext { source: e, context: "read attempt_index".into() })?;
1306 let att_idx = AttemptIndex::new(
1307 att_idx_str.as_deref().and_then(|s| s.parse().ok()).unwrap_or(0),
1308 );
1309
1310 let lease_id = LeaseId::new().to_string();
1311
1312 // KEYS (11): must match lua/signal.lua ff_claim_resumed_execution
1313 let keys: Vec<String> = vec![
1314 ctx.core(), // 1 exec_core
1315 ctx.claim_grant(), // 2 claim_grant
1316 idx.lane_eligible(lane_id), // 3 eligible_zset
1317 idx.lease_expiry(), // 4 lease_expiry_zset
1318 idx.worker_leases(&self.config.worker_instance_id), // 5 worker_leases
1319 ctx.attempt_hash(att_idx), // 6 existing_attempt_hash
1320 ctx.lease_current(), // 7 lease_current
1321 ctx.lease_history(), // 8 lease_history
1322 idx.lane_active(lane_id), // 9 active_index
1323 idx.attempt_timeout(), // 10 attempt_timeout_zset
1324 idx.execution_deadline(), // 11 execution_deadline_zset
1325 ];
1326
1327 // ARGV (8): must match lua/signal.lua ff_claim_resumed_execution
1328 let args: Vec<String> = vec![
1329 execution_id.to_string(), // 1 execution_id
1330 self.config.worker_id.to_string(), // 2 worker_id
1331 self.config.worker_instance_id.to_string(), // 3 worker_instance_id
1332 lane_id.to_string(), // 4 lane
1333 String::new(), // 5 capability_hash
1334 lease_id.clone(), // 6 lease_id
1335 self.config.lease_ttl_ms.to_string(), // 7 lease_ttl_ms
1336 String::new(), // 8 remaining_attempt_timeout_ms
1337 ];
1338
1339 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1340 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1341
1342 let raw: Value = self
1343 .client
1344 .fcall("ff_claim_resumed_execution", &key_refs, &arg_refs)
1345 .await
1346 .map_err(SdkError::Valkey)?;
1347
1348 // Parse result — same format as ff_claim_execution:
1349 // {1, "OK", lease_id, lease_epoch, expires_at, attempt_id, attempt_index, attempt_type}
1350 let arr = match &raw {
1351 Value::Array(arr) => arr,
1352 _ => {
1353 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1354 fcall: "ff_claim_resumed_execution".into(),
1355 execution_id: Some(execution_id.to_string()),
1356 message: "expected Array".into(),
1357 }));
1358 }
1359 };
1360
1361 let status_code = match arr.first() {
1362 Some(Ok(Value::Int(n))) => *n,
1363 _ => {
1364 return Err(SdkError::from(ff_script::error::ScriptError::Parse {
1365 fcall: "ff_claim_resumed_execution".into(),
1366 execution_id: Some(execution_id.to_string()),
1367 message: "bad status code".into(),
1368 }));
1369 }
1370 };
1371
1372 if status_code != 1 {
1373 let err_field_str = |idx: usize| -> String {
1374 arr.get(idx)
1375 .and_then(|v| match v {
1376 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1377 Ok(Value::SimpleString(s)) => Some(s.clone()),
1378 _ => None,
1379 })
1380 .unwrap_or_default()
1381 };
1382 let error_code = {
1383 let s = err_field_str(1);
1384 if s.is_empty() { "unknown".to_owned() } else { s }
1385 };
1386 let detail = err_field_str(2);
1387
1388 return Err(SdkError::from(
1389 ff_script::error::ScriptError::from_code_with_detail(&error_code, &detail)
1390 .unwrap_or_else(|| ff_script::error::ScriptError::Parse {
1391 fcall: "ff_claim_resumed_execution".into(),
1392 execution_id: Some(execution_id.to_string()),
1393 message: format!("unknown error: {error_code}"),
1394 }),
1395 ));
1396 }
1397
1398 let field_str = |idx: usize| -> String {
1399 arr.get(idx + 2)
1400 .and_then(|v| match v {
1401 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1402 Ok(Value::SimpleString(s)) => Some(s.clone()),
1403 Ok(Value::Int(n)) => Some(n.to_string()),
1404 _ => None,
1405 })
1406 .unwrap_or_default()
1407 };
1408
1409 let lease_id = LeaseId::parse(&field_str(0))
1410 .unwrap_or_else(|_| LeaseId::new());
1411 let lease_epoch = LeaseEpoch::new(field_str(1).parse().unwrap_or(1));
1412 let attempt_index = AttemptIndex::new(field_str(4).parse().unwrap_or(0));
1413 let attempt_id = AttemptId::parse(&field_str(3))
1414 .unwrap_or_else(|_| AttemptId::new());
1415
1416 let (input_payload, execution_kind, tags) = self
1417 .read_execution_context(execution_id, partition)
1418 .await?;
1419
1420 Ok(ClaimedTask::new(
1421 self.client.clone(),
1422 self.backend.clone(),
1423 self.partition_config,
1424 execution_id.clone(),
1425 attempt_index,
1426 attempt_id,
1427 lease_id,
1428 lease_epoch,
1429 self.config.lease_ttl_ms,
1430 lane_id.clone(),
1431 self.config.worker_instance_id.clone(),
1432 input_payload,
1433 execution_kind,
1434 tags,
1435 ))
1436 }
1437
1438 /// Read payload + execution_kind + tags from exec_core. Previously
1439 /// gated behind `direct-valkey-claim`; now shared by the
1440 /// feature-gated inline claim path and the public
1441 /// `claim_from_reclaim_grant` entry point.
1442 async fn read_execution_context(
1443 &self,
1444 execution_id: &ExecutionId,
1445 partition: &ff_core::partition::Partition,
1446 ) -> Result<(Vec<u8>, String, HashMap<String, String>), SdkError> {
1447 let ctx = ExecKeyContext::new(partition, execution_id);
1448
1449 // Read payload
1450 let payload: Option<String> = self
1451 .client
1452 .get(&ctx.payload())
1453 .await
1454 .map_err(|e| SdkError::ValkeyContext { source: e, context: "GET payload failed".into() })?;
1455 let input_payload = payload.unwrap_or_default().into_bytes();
1456
1457 // Read execution_kind from core
1458 let kind: Option<String> = self
1459 .client
1460 .hget(&ctx.core(), "execution_kind")
1461 .await
1462 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET execution_kind failed".into() })?;
1463 let execution_kind = kind.unwrap_or_default();
1464
1465 // Read tags
1466 let tags: HashMap<String, String> = self
1467 .client
1468 .hgetall(&ctx.tags())
1469 .await
1470 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGETALL tags".into() })?;
1471
1472 Ok((input_payload, execution_kind, tags))
1473 }
1474
1475 // ── Phase 3: Signal delivery ──
1476
1477 /// Deliver a signal to a suspended execution's waitpoint.
1478 ///
1479 /// The engine atomically records the signal, evaluates the resume condition,
1480 /// and optionally transitions the execution from `suspended` to `runnable`.
1481 pub async fn deliver_signal(
1482 &self,
1483 execution_id: &ExecutionId,
1484 waitpoint_id: &WaitpointId,
1485 signal: crate::task::Signal,
1486 ) -> Result<crate::task::SignalOutcome, SdkError> {
1487 let partition = ff_core::partition::execution_partition(execution_id, &self.partition_config);
1488 let ctx = ExecKeyContext::new(&partition, execution_id);
1489 let idx = IndexKeys::new(&partition);
1490
1491 let signal_id = ff_core::types::SignalId::new();
1492 let now = TimestampMs::now();
1493
1494 // Pre-read lane_id from exec_core — the execution may be on any lane,
1495 // not necessarily one of this worker's configured lanes.
1496 let lane_str: Option<String> = self
1497 .client
1498 .hget(&ctx.core(), "lane_id")
1499 .await
1500 .map_err(|e| SdkError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1501 let lane_id = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1502
1503 // KEYS (14): exec_core, wp_condition, wp_signals_stream,
1504 // exec_signals_zset, signal_hash, signal_payload,
1505 // idem_key, waitpoint_hash, suspension_current,
1506 // eligible_zset, suspended_zset, delayed_zset,
1507 // suspension_timeout_zset, hmac_secrets
1508 let idem_key = if let Some(ref ik) = signal.idempotency_key {
1509 ctx.signal_dedup(waitpoint_id, ik)
1510 } else {
1511 ctx.noop() // must share {p:N} hash tag for cluster mode
1512 };
1513 let keys: Vec<String> = vec![
1514 ctx.core(), // 1
1515 ctx.waitpoint_condition(waitpoint_id), // 2
1516 ctx.waitpoint_signals(waitpoint_id), // 3
1517 ctx.exec_signals(), // 4
1518 ctx.signal(&signal_id), // 5
1519 ctx.signal_payload(&signal_id), // 6
1520 idem_key, // 7
1521 ctx.waitpoint(waitpoint_id), // 8
1522 ctx.suspension_current(), // 9
1523 idx.lane_eligible(&lane_id), // 10
1524 idx.lane_suspended(&lane_id), // 11
1525 idx.lane_delayed(&lane_id), // 12
1526 idx.suspension_timeout(), // 13
1527 idx.waitpoint_hmac_secrets(), // 14
1528 ];
1529
1530 let payload_str = signal
1531 .payload
1532 .as_ref()
1533 .map(|p| String::from_utf8_lossy(p).into_owned())
1534 .unwrap_or_default();
1535
1536 // ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,
1537 // signal_category, source_type, source_identity,
1538 // payload, payload_encoding, idempotency_key,
1539 // correlation_id, target_scope, created_at,
1540 // dedup_ttl_ms, resume_delay_ms, signal_maxlen,
1541 // max_signals_per_execution, waitpoint_token
1542 let args: Vec<String> = vec![
1543 signal_id.to_string(), // 1
1544 execution_id.to_string(), // 2
1545 waitpoint_id.to_string(), // 3
1546 signal.signal_name, // 4
1547 signal.signal_category, // 5
1548 signal.source_type, // 6
1549 signal.source_identity, // 7
1550 payload_str, // 8
1551 "json".to_owned(), // 9 payload_encoding
1552 signal.idempotency_key.unwrap_or_default(), // 10
1553 String::new(), // 11 correlation_id
1554 "waitpoint".to_owned(), // 12 target_scope
1555 now.to_string(), // 13 created_at
1556 "86400000".to_owned(), // 14 dedup_ttl_ms
1557 "0".to_owned(), // 15 resume_delay_ms
1558 "1000".to_owned(), // 16 signal_maxlen
1559 "10000".to_owned(), // 17 max_signals
1560 // WIRE BOUNDARY — raw token must reach Lua unredacted. Do NOT
1561 // use ToString/Display (those are redacted for log safety);
1562 // .as_str() is the explicit opt-in that gets the secret bytes.
1563 signal.waitpoint_token.as_str().to_owned(), // 18 waitpoint_token
1564 ];
1565
1566 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1567 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1568
1569 let raw: Value = self
1570 .client
1571 .fcall("ff_deliver_signal", &key_refs, &arg_refs)
1572 .await
1573 .map_err(SdkError::Valkey)?;
1574
1575 crate::task::parse_signal_result(&raw)
1576 }
1577
1578 #[cfg(feature = "direct-valkey-claim")]
1579 fn next_lane(&self) -> LaneId {
1580 let idx = self.lane_index.fetch_add(1, Ordering::Relaxed) % self.config.lanes.len();
1581 self.config.lanes[idx].clone()
1582 }
1583}
1584
1585#[cfg(feature = "direct-valkey-claim")]
1586fn is_retryable_claim_error(err: &crate::EngineError) -> bool {
1587 use ff_core::error::ErrorClass;
1588 matches!(
1589 ff_script::engine_error_ext::class(err),
1590 ErrorClass::Retryable | ErrorClass::Informational
1591 )
1592}
1593
1594/// Initial offset for [`FlowFabricWorker::scan_cursor`]. Hashes the worker
1595/// instance id with FNV-1a to place distinct worker processes on different
1596/// partition windows from their first poll. Zero is valid for single-worker
1597/// clusters but spreads work in multi-worker deployments.
1598#[cfg(feature = "direct-valkey-claim")]
1599fn scan_cursor_seed(worker_instance_id: &str, num_partitions: usize) -> usize {
1600 if num_partitions == 0 {
1601 return 0;
1602 }
1603 (ff_core::hash::fnv1a_u64(worker_instance_id.as_bytes()) as usize) % num_partitions
1604}
1605
1606#[cfg(feature = "direct-valkey-claim")]
1607fn extract_first_array_string(value: &Value) -> Option<String> {
1608 match value {
1609 Value::Array(arr) if !arr.is_empty() => match &arr[0] {
1610 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1611 Ok(Value::SimpleString(s)) => Some(s.clone()),
1612 _ => None,
1613 },
1614 _ => None,
1615 }
1616}
1617
1618/// Read partition config from Valkey's `ff:config:partitions` hash.
1619/// Returns Err if the key doesn't exist or can't be read.
1620async fn read_partition_config(client: &Client) -> Result<PartitionConfig, SdkError> {
1621 let key = ff_core::keys::global_config_partitions();
1622 let fields: HashMap<String, String> = client
1623 .hgetall(&key)
1624 .await
1625 .map_err(|e| SdkError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
1626
1627 if fields.is_empty() {
1628 return Err(SdkError::Config {
1629 context: "read_partition_config".into(),
1630 field: None,
1631 message: "ff:config:partitions not found in Valkey".into(),
1632 });
1633 }
1634
1635 let parse = |field: &str, default: u16| -> u16 {
1636 fields
1637 .get(field)
1638 .and_then(|v| v.parse().ok())
1639 .filter(|&n: &u16| n > 0)
1640 .unwrap_or(default)
1641 };
1642
1643 Ok(PartitionConfig {
1644 num_flow_partitions: parse("num_flow_partitions", 256),
1645 num_budget_partitions: parse("num_budget_partitions", 32),
1646 num_quota_partitions: parse("num_quota_partitions", 32),
1647 })
1648}
1649
1650#[cfg(test)]
1651mod completion_accessor_type_tests {
1652 //! Type-level compile check that
1653 //! [`FlowFabricWorker::completion_backend`] returns an
1654 //! `Option<Arc<dyn CompletionBackend>>`. No Valkey required —
1655 //! the assertion is at the function-pointer type level and the
1656 //! #[test] body exists solely so the compiler elaborates it.
1657 use super::FlowFabricWorker;
1658 use ff_core::completion_backend::CompletionBackend;
1659 use std::sync::Arc;
1660
1661 #[test]
1662 fn completion_backend_accessor_signature() {
1663 // If this line compiles, the public accessor returns the
1664 // advertised type. The function is never called (no live
1665 // worker), so no I/O happens.
1666 let _f: fn(&FlowFabricWorker) -> Option<Arc<dyn CompletionBackend>> =
1667 FlowFabricWorker::completion_backend;
1668 }
1669}
1670
1671#[cfg(all(test, feature = "direct-valkey-claim"))]
1672mod scan_cursor_tests {
1673 use super::scan_cursor_seed;
1674
1675 #[test]
1676 fn stable_for_same_input() {
1677 assert_eq!(scan_cursor_seed("w1", 256), scan_cursor_seed("w1", 256));
1678 }
1679
1680 #[test]
1681 fn distinct_for_different_ids() {
1682 assert_ne!(scan_cursor_seed("w1", 256), scan_cursor_seed("w2", 256));
1683 }
1684
1685 #[test]
1686 fn bounded_by_partition_count() {
1687 for i in 0..100 {
1688 assert!(scan_cursor_seed(&format!("w{i}"), 256) < 256);
1689 }
1690 }
1691
1692 #[test]
1693 fn zero_partitions_returns_zero() {
1694 assert_eq!(scan_cursor_seed("w1", 0), 0);
1695 }
1696}