ff_sdk/lib.rs
1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! The production claim path is
12//! [`FlowFabricWorker::claim_from_grant`]: obtain a
13//! [`ClaimGrant`] from `ff_scheduler::Scheduler::claim_for_worker`
14//! (the scheduler enforces budget, quota, and capability checks),
15//! then hand it to the SDK. `claim_next` is gated behind the
16//! default-off `direct-valkey-claim` feature and bypasses admission
17//! control — fine for benchmarks, not production.
18//!
19//! ```rust,ignore
20//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
21//! use ff_core::backend::BackendConfig;
22//! use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), ff_sdk::SdkError> {
26//! let config = WorkerConfig {
27//! backend: BackendConfig::valkey("localhost", 6379),
28//! worker_id: WorkerId::new("my-worker"),
29//! worker_instance_id: WorkerInstanceId::new("my-worker-instance-1"),
30//! namespace: Namespace::new("default"),
31//! lanes: vec![LaneId::new("main")],
32//! capabilities: Vec::new(),
33//! lease_ttl_ms: 30_000,
34//! claim_poll_interval_ms: 1_000,
35//! max_concurrent_tasks: 1,
36//! };
37//!
38//! let worker = FlowFabricWorker::connect(config).await?;
39//! let lane = LaneId::new("main");
40//!
41//! // In a real deployment `grant` is obtained from the
42//! // scheduler's `claim_for_worker` RPC/helper; it carries the
43//! // execution id, capability match, and admission result.
44//! # let grant: ff_core::contracts::ClaimGrant = unimplemented!();
45//! let task = worker.claim_from_grant(lane, grant).await?;
46//! println!("claimed: {}", task.execution_id());
47//! // Process task...
48//! task.complete(Some(b"done".to_vec())).await?;
49//! Ok(())
50//! }
51//! ```
52//!
53//! # Migration: `direct-valkey-claim` → scheduler-issued grants
54//!
55//! The `direct-valkey-claim` cargo feature — which gates
56//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
57//! the pair of scheduler-issued grant entry points:
58//!
59//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
60//! `ff_scheduler::Scheduler::claim_for_worker` to obtain the
61//! [`ClaimGrant`], then hand it to the SDK.
62//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
63//! for an `attempt_interrupted` execution. Wraps a
64//! [`ReclaimGrant`].
65//!
66//! `claim_next` bypasses budget and quota admission control; the
67//! grant-based path does not. See each method's rustdoc for the
68//! exact migration recipe.
69//!
70//! [`ClaimGrant`]: ff_core::contracts::ClaimGrant
71//! [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
72
73#[cfg(feature = "valkey-default")]
74pub mod admin;
75pub mod config;
76pub mod engine_error;
77#[cfg(any(
78 feature = "layer-tracing",
79 feature = "layer-ratelimit",
80 feature = "layer-metrics",
81 feature = "layer-circuit-breaker",
82))]
83pub mod layer;
84#[cfg(feature = "valkey-default")]
85pub mod snapshot;
86#[cfg(feature = "valkey-default")]
87pub mod task;
88#[cfg(feature = "valkey-default")]
89pub mod worker;
90
91// Re-exports for convenience
92#[cfg(feature = "valkey-default")]
93pub use admin::{
94 rotate_waitpoint_hmac_secret_all_partitions, FlowFabricAdminClient, PartitionRotationOutcome,
95 RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
96};
97pub use config::WorkerConfig;
98pub use engine_error::{
99 BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
100};
101// #88: backend-agnostic transport error surface. Consumers that
102// previously matched on `ferriskey::ErrorKind` via `valkey_kind()`
103// now match on `BackendErrorKind` via `backend_kind()`.
104pub use ff_core::engine_error::{BackendError, BackendErrorKind};
105// `FailOutcome` is ff-core-native (Stage 1a move); re-export
106// unconditionally so consumers can name `ff_sdk::FailOutcome` even
107// under `--no-default-features`.
108pub use ff_core::backend::FailOutcome;
109// `ResumeSignal` is also ff-core-native (Stage 0 move).
110pub use ff_core::backend::ResumeSignal;
111#[cfg(feature = "valkey-default")]
112pub use task::{
113 read_stream, tail_stream, tail_stream_with_visibility, AppendFrameOutcome, ClaimedTask,
114 Signal, SignalOutcome, StreamCursor, StreamFrames, SuspendedHandle, TrySuspendOutcome,
115 MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
116};
117// RFC-015 stream-durability-mode public surface. Re-exported so
118// consumers can name `ff_sdk::StreamMode` etc. alongside the older
119// `AppendFrameOutcome` / `StreamCursor` re-exports. Gated on
120// `valkey-default` because the worker surface that uses them is too.
121#[cfg(feature = "valkey-default")]
122pub use ff_core::backend::{
123 PatchKind, StreamMode, SummaryDocument, TailVisibility, SUMMARY_NULL_SENTINEL,
124};
125// RFC-013 Stage 1d — typed suspend surface lives in `ff_core::contracts`
126// and is reachable via `ff_sdk::*` too. `SuspendOutcome` path is
127// preserved via this re-export so `ff_sdk::SuspendOutcome` still
128// compiles.
129#[cfg(feature = "valkey-default")]
130pub use ff_core::contracts::{
131 CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
132 SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
133 SuspensionRequester, TimeoutBehavior, WaitpointBinding,
134};
135#[cfg(feature = "valkey-default")]
136pub use worker::FlowFabricWorker;
137
138/// SDK error type.
139#[derive(Debug, thiserror::Error)]
140pub enum SdkError {
141 /// Backend transport error. Previously wrapped `ferriskey::Error`
142 /// directly (#88); now carries a backend-agnostic
143 /// [`BackendError`] so consumers match on
144 /// [`BackendErrorKind`] instead of ferriskey's native taxonomy.
145 /// The ferriskey → [`BackendError`] mapping lives in
146 /// `ff_backend_valkey::backend_error::backend_error_from_ferriskey`.
147 #[error("backend: {0}")]
148 Backend(#[from] BackendError),
149
150 /// Backend error with additional context (e.g. call-site label).
151 /// Previously `ValkeyContext { source: ferriskey::Error }` (#88).
152 #[error("backend: {context}: {source}")]
153 BackendContext {
154 #[source]
155 source: BackendError,
156 context: String,
157 },
158
159 /// FlowFabric engine error — typed sum over Lua error codes + transport
160 /// faults. See [`EngineError`] for the variant-granularity contract.
161 /// Replaces the previous `Script(ScriptError)` carrier (#58.6).
162 ///
163 /// `Box`ed to keep `SdkError`'s stack footprint small: the richest
164 /// variant (`ConflictKind::DependencyAlreadyExists { existing:
165 /// EdgeSnapshot }`) is ~200 bytes. Boxing keeps `Result<T, SdkError>`
166 /// at the same width every other variant pays.
167 #[error("engine: {0}")]
168 Engine(Box<EngineError>),
169
170 /// Configuration error. `context` identifies the call site / logical
171 /// operation (e.g. `"describe_execution: exec_core"`, `"admin_client"`).
172 /// `field` names the specific offending field when the error is
173 /// field-scoped (e.g. `Some("public_state")`), or `None` for
174 /// whole-object validation (e.g. `"at least one lane is required"`).
175 /// `message` carries dynamic detail: source-error rendering, the
176 /// offending raw value, etc.
177 #[error("{}", fmt_config(.context, .field.as_deref(), .message))]
178 Config {
179 context: String,
180 field: Option<String>,
181 message: String,
182 },
183
184 /// Worker is at its configured `max_concurrent_tasks` capacity —
185 /// the caller should retry later. Returned by
186 /// [`FlowFabricWorker::claim_from_grant`] and
187 /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
188 /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
189 /// a `ClaimGrant`/`ReclaimGrant` represents real work already
190 /// selected by the scheduler, so silently dropping it would waste
191 /// the grant and let the grant TTL elapse. Surfacing the
192 /// saturation lets the caller release the grant (or wait +
193 /// retry).
194 ///
195 /// # Classification
196 ///
197 /// * [`SdkError::is_retryable`] returns `true` — saturation is
198 /// transient: any in-flight task's
199 /// complete/fail/cancel/drop releases a permit. Retry after
200 /// milliseconds, not a retry loop with backoff for a backend
201 /// transport failure.
202 /// * [`SdkError::backend_kind`] returns `None` — this is not a
203 /// backend transport error, so there is no
204 /// [`BackendErrorKind`] to inspect. Callers that fan out on
205 /// `backend_kind()` should match `WorkerAtCapacity` explicitly
206 /// (or use `is_retryable()`).
207 ///
208 /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
209 /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
210 #[error("worker at capacity: max_concurrent_tasks reached")]
211 WorkerAtCapacity,
212
213 /// HTTP transport error from the admin REST surface. Carries
214 /// the underlying `reqwest::Error` via `#[source]` so callers
215 /// can inspect `is_timeout()` / `is_connect()` / etc. for
216 /// finer-grained retry logic. Distinct from
217 /// [`SdkError::Backend`] — this fires on the HTTP/JSON surface,
218 /// not on the Lua/Valkey hot path.
219 #[error("http: {context}: {source}")]
220 Http {
221 #[source]
222 source: reqwest::Error,
223 context: String,
224 },
225
226 /// The admin REST endpoint returned a non-2xx response.
227 ///
228 /// Fields surface the server-side `ErrorBody` JSON shape
229 /// (`{ error, kind?, retryable? }`) as structured values so
230 /// cairn-fabric and other consumers can match without
231 /// re-parsing the body:
232 ///
233 /// * `status` — HTTP status code.
234 /// * `message` — the `error` string from the JSON body (or
235 /// the raw body if it didn't parse as JSON).
236 /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
237 /// backed by a transport error; `None` for 4xxs.
238 /// * `retryable` — server-supplied hint; `None` for 4xxs.
239 /// * `raw_body` — the full response body, preserved for logging
240 /// when the JSON shape doesn't match.
241 #[error("admin api: {status}: {message}")]
242 AdminApi {
243 status: u16,
244 message: String,
245 kind: Option<String>,
246 retryable: Option<bool>,
247 raw_body: String,
248 },
249}
250
251/// Renders `SdkError::Config` as `config: <context>[.<field>]: <message>`.
252/// The `field` slot is omitted when `None` (whole-object validation).
253fn fmt_config(context: &str, field: Option<&str>, message: &str) -> String {
254 match field {
255 Some(f) => format!("config: {context}.{f}: {message}"),
256 None => format!("config: {context}: {message}"),
257 }
258}
259
260/// Lift a native `ferriskey::Error` into [`SdkError::Backend`] via
261/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
262/// `?`-propagation ergonomic at FCALL/transport call sites while
263/// the public variant stays backend-agnostic.
264#[cfg(feature = "valkey-default")]
265impl From<ferriskey::Error> for SdkError {
266 fn from(err: ferriskey::Error) -> Self {
267 Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
268 }
269}
270
271/// Build an [`SdkError::BackendContext`] from a native
272/// `ferriskey::Error` and a call-site label, preserving the
273/// backend-agnostic shape on the public surface (#88).
274#[cfg(feature = "valkey-default")]
275pub(crate) fn backend_context(
276 err: ferriskey::Error,
277 context: impl Into<String>,
278) -> SdkError {
279 SdkError::BackendContext {
280 source: ff_backend_valkey::backend_error_from_ferriskey(&err),
281 context: context.into(),
282 }
283}
284
285/// Preserves the ergonomic `?`-propagation from FCALL sites that
286/// return `Result<_, ScriptError>`. Routes through `EngineError`'s
287/// typed classification so every call site gets the same
288/// variant-level detail without hand-written conversion.
289impl From<ff_script::error::ScriptError> for SdkError {
290 fn from(err: ff_script::error::ScriptError) -> Self {
291 // ff-script's `From<ScriptError> for EngineError` owns the
292 // mapping table (#58.6). See `ff_script::engine_error_ext`.
293 Self::Engine(Box::new(EngineError::from(err)))
294 }
295}
296
297impl From<EngineError> for SdkError {
298 fn from(err: EngineError) -> Self {
299 Self::Engine(Box::new(err))
300 }
301}
302
303impl SdkError {
304 /// Returns the classified [`BackendErrorKind`] if this error
305 /// carries a backend transport fault. Covers the direct
306 /// [`SdkError::Backend`] / [`SdkError::BackendContext`] variants
307 /// and `Engine(EngineError::Transport { .. })` via the
308 /// ScriptError-aware downcast in `ff_script::engine_error_ext`.
309 ///
310 /// Renamed from `valkey_kind` in #88 — the previous return type
311 /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
312 /// consumer doing retry classification.
313 pub fn backend_kind(&self) -> Option<BackendErrorKind> {
314 match self {
315 Self::Backend(be) => Some(be.kind()),
316 Self::BackendContext { source, .. } => Some(source.kind()),
317 #[cfg(feature = "valkey-default")]
318 Self::Engine(e) => ff_script::engine_error_ext::valkey_kind(e)
319 .map(ff_backend_valkey::classify_ferriskey_kind),
320 #[cfg(not(feature = "valkey-default"))]
321 Self::Engine(_) => None,
322 // HTTP/admin-surface errors carry no backend fault;
323 // the admin path never touches the backend directly from
324 // the SDK side. Use `AdminApi.kind` for the server-supplied
325 // label when present.
326 Self::Config { .. }
327 | Self::WorkerAtCapacity
328 | Self::Http { .. }
329 | Self::AdminApi { .. } => None,
330 }
331 }
332
333 /// Whether this error is safely retryable by a caller. For backend
334 /// transport variants, delegates to
335 /// [`BackendErrorKind::is_retryable`]. For `Engine` errors, returns
336 /// `true` iff the typed classification is
337 /// `ErrorClass::Retryable`. `Config` errors are never retryable.
338 pub fn is_retryable(&self) -> bool {
339 match self {
340 Self::Backend(be) | Self::BackendContext { source: be, .. } => {
341 be.kind().is_retryable()
342 }
343 Self::Engine(e) => {
344 matches!(
345 ff_script::engine_error_ext::class(e),
346 ff_core::error::ErrorClass::Retryable
347 )
348 }
349 // WorkerAtCapacity is retryable: the saturation is transient
350 // and clears as soon as a concurrent task completes.
351 Self::WorkerAtCapacity => true,
352 // HTTP transport: timeouts and connect failures are
353 // retryable (transient network state); body-decode or
354 // request-build errors are terminal (caller must fix
355 // the code). `reqwest::Error` exposes both predicates.
356 Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
357 // Admin API errors: trust the server's `retryable` hint
358 // when present; otherwise fall back to the HTTP-standard
359 // retryable-status set (429, 502, 503, 504). 5xxs without
360 // a hint are conservatively non-retryable — the caller can
361 // override with `AdminApi.status`-based logic if needed.
362 // 502 covers reverse-proxy transients (ALB/nginx returning
363 // Bad Gateway when ff-server restarts mid-request).
364 Self::AdminApi {
365 status, retryable, ..
366 } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
367 Self::Config { .. } => false,
368 }
369 }
370}
371
372#[cfg(all(test, feature = "valkey-default"))]
373mod tests {
374 use super::*;
375 use ferriskey::ErrorKind;
376 use ff_script::error::ScriptError;
377
378 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
379 ferriskey::Error::from((kind, "synthetic"))
380 }
381
382 #[test]
383 fn backend_kind_direct_and_context() {
384 assert_eq!(
385 SdkError::from(mk_fk_err(ErrorKind::IoError)).backend_kind(),
386 Some(BackendErrorKind::Transport)
387 );
388 assert_eq!(
389 crate::backend_context(mk_fk_err(ErrorKind::BusyLoadingError), "connect")
390 .backend_kind(),
391 Some(BackendErrorKind::BusyLoading)
392 );
393 }
394
395 #[test]
396 fn backend_kind_delegates_through_engine_transport() {
397 let err = SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
398 assert_eq!(err.backend_kind(), Some(BackendErrorKind::Cluster));
399 }
400
401 #[test]
402 fn backend_kind_none_for_lua_and_config() {
403 assert_eq!(
404 SdkError::from(ScriptError::LeaseExpired).backend_kind(),
405 None
406 );
407 assert_eq!(
408 SdkError::Config {
409 context: "worker_config".into(),
410 field: Some("bearer_token".into()),
411 message: "bad host".into(),
412 }
413 .backend_kind(),
414 None
415 );
416 }
417
418 #[test]
419 fn is_retryable_transport() {
420 // Transport-bucketed kinds (IoError, FatalSend/Receive,
421 // ProtocolDesync) are retryable under the #88 classifier.
422 assert!(SdkError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
423 // Auth-bucketed kinds are terminal.
424 assert!(!SdkError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
425 // Protocol-bucketed kinds (ResponseError, ParseError, TypeError,
426 // InvalidClientConfig, etc.) are terminal.
427 assert!(!SdkError::from(mk_fk_err(ErrorKind::ResponseError)).is_retryable());
428 }
429
430 #[test]
431 fn is_retryable_engine_delegates_to_class() {
432 // NoEligibleExecution is classified Retryable via EngineError::class().
433 assert!(SdkError::from(ScriptError::NoEligibleExecution).is_retryable());
434 // StaleLease is Terminal.
435 assert!(!SdkError::from(ScriptError::StaleLease).is_retryable());
436 // Transport(Valkey(IoError)) is Retryable via class() delegation.
437 assert!(
438 SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
439 );
440 }
441
442 /// Regression (#98): `SdkError::Config` carries `context`, optional
443 /// `field`, and `message` separately so consumers can pattern-match on
444 /// the offending field without parsing the Display string. Test covers
445 /// both the field-scoped and whole-object renderings.
446 #[test]
447 fn config_structured_fields_render_and_match() {
448 let with_field = SdkError::Config {
449 context: "admin_client".into(),
450 field: Some("bearer_token".into()),
451 message: "is empty or all-whitespace".into(),
452 };
453 assert_eq!(
454 with_field.to_string(),
455 "config: admin_client.bearer_token: is empty or all-whitespace"
456 );
457 assert!(matches!(
458 &with_field,
459 SdkError::Config { field: Some(f), .. } if f == "bearer_token"
460 ));
461
462 let whole_object = SdkError::Config {
463 context: "worker_config".into(),
464 field: None,
465 message: "at least one lane is required".into(),
466 };
467 assert_eq!(
468 whole_object.to_string(),
469 "config: worker_config: at least one lane is required"
470 );
471 assert!(matches!(
472 &whole_object,
473 SdkError::Config { field: None, .. }
474 ));
475 }
476
477 #[test]
478 fn is_retryable_config_false() {
479 assert!(
480 !SdkError::Config {
481 context: "worker_config".into(),
482 field: None,
483 message: "at least one lane is required".into(),
484 }
485 .is_retryable()
486 );
487 }
488
489 #[test]
490 fn is_retryable_admin_api_uses_server_hint_when_present() {
491 let err = SdkError::AdminApi {
492 status: 429,
493 message: "throttled".into(),
494 kind: None,
495 retryable: Some(false),
496 raw_body: String::new(),
497 };
498 assert!(!err.is_retryable());
499
500 let err = SdkError::AdminApi {
501 status: 500,
502 message: "valkey timeout".into(),
503 kind: Some("IoError".into()),
504 retryable: Some(true),
505 raw_body: String::new(),
506 };
507 assert!(err.is_retryable());
508 }
509
510 #[test]
511 fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
512 // 502 covers ALB/nginx Bad Gateway transients on ff-server
513 // restart — same retry-is-safe as 503/504 because rotation
514 // is idempotent server-side.
515 for s in [429u16, 502, 503, 504] {
516 let err = SdkError::AdminApi {
517 status: s,
518 message: "x".into(),
519 kind: None,
520 retryable: None,
521 raw_body: String::new(),
522 };
523 assert!(err.is_retryable(), "status {s} should be retryable");
524 }
525 for s in [400u16, 401, 403, 404, 500] {
526 let err = SdkError::AdminApi {
527 status: s,
528 message: "x".into(),
529 kind: None,
530 retryable: None,
531 raw_body: String::new(),
532 };
533 assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
534 }
535 }
536
537 #[test]
538 fn valkey_kind_none_for_admin_surface() {
539 let err = SdkError::AdminApi {
540 status: 500,
541 message: "x".into(),
542 kind: Some("IoError".into()),
543 retryable: Some(true),
544 raw_body: String::new(),
545 };
546 assert_eq!(err.backend_kind(), None);
547 }
548}