Skip to main content

ff_sdk/
lib.rs

1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! The production claim path is
12//! [`FlowFabricWorker::claim_from_grant`]: obtain a
13//! [`ClaimGrant`] from `ff_scheduler::Scheduler::claim_for_worker`
14//! (the scheduler enforces budget, quota, and capability checks),
15//! then hand it to the SDK. `claim_next` is gated behind the
16//! default-off `direct-valkey-claim` feature and bypasses admission
17//! control — fine for benchmarks, not production.
18//!
19//! ```rust,ignore
20//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
21//! use ff_core::backend::BackendConfig;
22//! use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
23//!
24//! #[tokio::main]
25//! async fn main() -> Result<(), ff_sdk::SdkError> {
26//!     let config = WorkerConfig {
27//!         backend: BackendConfig::valkey("localhost", 6379),
28//!         worker_id: WorkerId::new("my-worker"),
29//!         worker_instance_id: WorkerInstanceId::new("my-worker-instance-1"),
30//!         namespace: Namespace::new("default"),
31//!         lanes: vec![LaneId::new("main")],
32//!         capabilities: Vec::new(),
33//!         lease_ttl_ms: 30_000,
34//!         claim_poll_interval_ms: 1_000,
35//!         max_concurrent_tasks: 1,
36//!     };
37//!
38//!     let worker = FlowFabricWorker::connect(config).await?;
39//!     let lane = LaneId::new("main");
40//!
41//!     // In a real deployment `grant` is obtained from the
42//!     // scheduler's `claim_for_worker` RPC/helper; it carries the
43//!     // execution id, capability match, and admission result.
44//!     # let grant: ff_sdk::ClaimGrant = unimplemented!();
45//!     let task = worker.claim_from_grant(lane, grant).await?;
46//!     println!("claimed: {}", task.execution_id());
47//!     // Process task...
48//!     task.complete(Some(b"done".to_vec())).await?;
49//!     Ok(())
50//! }
51//! ```
52//!
53//! # Migration: `direct-valkey-claim` → scheduler-issued grants
54//!
55//! The `direct-valkey-claim` cargo feature — which gates
56//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
57//! the pair of scheduler-issued grant entry points:
58//!
59//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
60//!   `ff_scheduler::Scheduler::claim_for_worker` to obtain the
61//!   [`ClaimGrant`], then hand it to the SDK.
62//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
63//!   for an `attempt_interrupted` execution. Wraps a
64//!   [`ReclaimGrant`].
65//!
66//! `claim_next` bypasses budget and quota admission control; the
67//! grant-based path does not. See each method's rustdoc for the
68//! exact migration recipe.
69//!
70//! `ClaimGrant` / `ReclaimGrant` (and the `ClaimPolicy` /
71//! `ReclaimToken` wire types on the scheduler-owner side) are
72//! re-exported from `ff-sdk` (#283) so consumers do not need a
73//! direct `ff-scheduler` dep just to type these signatures.
74//!
75//! [`ClaimGrant`]: crate::ClaimGrant
76//! [`ReclaimGrant`]: crate::ReclaimGrant
77
78#[cfg(feature = "valkey-default")]
79pub mod admin;
80pub mod config;
81pub mod engine_error;
82#[cfg(any(
83    feature = "layer-tracing",
84    feature = "layer-ratelimit",
85    feature = "layer-metrics",
86    feature = "layer-circuit-breaker",
87))]
88pub mod layer;
89#[cfg(feature = "valkey-default")]
90pub mod snapshot;
91#[cfg(feature = "valkey-default")]
92pub mod task;
93#[cfg(feature = "valkey-default")]
94pub mod worker;
95
96// Re-exports for convenience
97#[cfg(feature = "valkey-default")]
98pub use admin::{
99    rotate_waitpoint_hmac_secret_all_partitions, FlowFabricAdminClient, PartitionRotationOutcome,
100    RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
101};
102pub use config::WorkerConfig;
103pub use engine_error::{
104    BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
105};
106// #88: backend-agnostic transport error surface. Consumers that
107// previously matched on `ferriskey::ErrorKind` via `valkey_kind()`
108// now match on `BackendErrorKind` via `backend_kind()`.
109pub use ff_core::engine_error::{BackendError, BackendErrorKind};
110// `FailOutcome` is ff-core-native (Stage 1a move); re-export
111// unconditionally so consumers can name `ff_sdk::FailOutcome` even
112// under `--no-default-features`.
113pub use ff_core::backend::FailOutcome;
114// `ResumeSignal` is also ff-core-native (Stage 0 move).
115pub use ff_core::backend::ResumeSignal;
116#[cfg(feature = "valkey-default")]
117pub use task::{
118    read_stream, tail_stream, tail_stream_with_visibility, AppendFrameOutcome, ClaimedTask,
119    Signal, SignalOutcome, StreamCursor, StreamFrames, SuspendedHandle, TrySuspendOutcome,
120    MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
121};
122// RFC-015 stream-durability-mode public surface. Re-exported so
123// consumers can name `ff_sdk::StreamMode` etc. alongside the older
124// `AppendFrameOutcome` / `StreamCursor` re-exports. Gated on
125// `valkey-default` because the worker surface that uses them is too.
126#[cfg(feature = "valkey-default")]
127pub use ff_core::backend::{
128    PatchKind, StreamMode, SummaryDocument, TailVisibility, SUMMARY_NULL_SENTINEL,
129};
130// RFC-013 Stage 1d — typed suspend surface lives in `ff_core::contracts`
131// and is reachable via `ff_sdk::*` too. `SuspendOutcome` path is
132// preserved via this re-export so `ff_sdk::SuspendOutcome` still
133// compiles.
134#[cfg(feature = "valkey-default")]
135pub use ff_core::contracts::{
136    CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
137    SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
138    SuspensionRequester, TimeoutBehavior, WaitpointBinding,
139};
140// #283 — Claim-flow wire types. Consumers typing `claim_from_grant` /
141// `claim_from_reclaim_grant` signatures (or wrapping
142// `EngineBackend::claim_for_worker`) can name these as
143// `ff_sdk::ClaimGrant` etc. without pinning `ff-scheduler` directly.
144// `Scheduler` itself is intentionally not re-exported: implementing a
145// scheduler is specialized and stays behind the `ff-scheduler` dep.
146pub use ff_core::backend::{ClaimPolicy, ReclaimToken};
147pub use ff_core::contracts::{ClaimGrant, ReclaimGrant};
148#[cfg(feature = "valkey-default")]
149pub use worker::FlowFabricWorker;
150
151/// SDK error type.
152#[derive(Debug, thiserror::Error)]
153pub enum SdkError {
154    /// Backend transport error. Previously wrapped `ferriskey::Error`
155    /// directly (#88); now carries a backend-agnostic
156    /// [`BackendError`] so consumers match on
157    /// [`BackendErrorKind`] instead of ferriskey's native taxonomy.
158    /// The ferriskey → [`BackendError`] mapping lives in
159    /// `ff_backend_valkey::backend_error::backend_error_from_ferriskey`.
160    #[error("backend: {0}")]
161    Backend(#[from] BackendError),
162
163    /// Backend error with additional context (e.g. call-site label).
164    /// Previously `ValkeyContext { source: ferriskey::Error }` (#88).
165    #[error("backend: {context}: {source}")]
166    BackendContext {
167        #[source]
168        source: BackendError,
169        context: String,
170    },
171
172    /// FlowFabric engine error — typed sum over Lua error codes + transport
173    /// faults. See [`EngineError`] for the variant-granularity contract.
174    /// Replaces the previous `Script(ScriptError)` carrier (#58.6).
175    ///
176    /// `Box`ed to keep `SdkError`'s stack footprint small: the richest
177    /// variant (`ConflictKind::DependencyAlreadyExists { existing:
178    /// EdgeSnapshot }`) is ~200 bytes. Boxing keeps `Result<T, SdkError>`
179    /// at the same width every other variant pays.
180    #[error("engine: {0}")]
181    Engine(Box<EngineError>),
182
183    /// Configuration error. `context` identifies the call site / logical
184    /// operation (e.g. `"describe_execution: exec_core"`, `"admin_client"`).
185    /// `field` names the specific offending field when the error is
186    /// field-scoped (e.g. `Some("public_state")`), or `None` for
187    /// whole-object validation (e.g. `"at least one lane is required"`).
188    /// `message` carries dynamic detail: source-error rendering, the
189    /// offending raw value, etc.
190    #[error("{}", fmt_config(.context, .field.as_deref(), .message))]
191    Config {
192        context: String,
193        field: Option<String>,
194        message: String,
195    },
196
197    /// Worker is at its configured `max_concurrent_tasks` capacity —
198    /// the caller should retry later. Returned by
199    /// [`FlowFabricWorker::claim_from_grant`] and
200    /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
201    /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
202    /// a `ClaimGrant`/`ReclaimGrant` represents real work already
203    /// selected by the scheduler, so silently dropping it would waste
204    /// the grant and let the grant TTL elapse. Surfacing the
205    /// saturation lets the caller release the grant (or wait +
206    /// retry).
207    ///
208    /// # Classification
209    ///
210    /// * [`SdkError::is_retryable`] returns `true` — saturation is
211    ///   transient: any in-flight task's
212    ///   complete/fail/cancel/drop releases a permit. Retry after
213    ///   milliseconds, not a retry loop with backoff for a backend
214    ///   transport failure.
215    /// * [`SdkError::backend_kind`] returns `None` — this is not a
216    ///   backend transport error, so there is no
217    ///   [`BackendErrorKind`] to inspect. Callers that fan out on
218    ///   `backend_kind()` should match `WorkerAtCapacity` explicitly
219    ///   (or use `is_retryable()`).
220    ///
221    /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
222    /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
223    #[error("worker at capacity: max_concurrent_tasks reached")]
224    WorkerAtCapacity,
225
226    /// HTTP transport error from the admin REST surface. Carries
227    /// the underlying `reqwest::Error` via `#[source]` so callers
228    /// can inspect `is_timeout()` / `is_connect()` / etc. for
229    /// finer-grained retry logic. Distinct from
230    /// [`SdkError::Backend`] — this fires on the HTTP/JSON surface,
231    /// not on the Lua/Valkey hot path.
232    #[error("http: {context}: {source}")]
233    Http {
234        #[source]
235        source: reqwest::Error,
236        context: String,
237    },
238
239    /// The admin REST endpoint returned a non-2xx response.
240    ///
241    /// Fields surface the server-side `ErrorBody` JSON shape
242    /// (`{ error, kind?, retryable? }`) as structured values so
243    /// cairn-fabric and other consumers can match without
244    /// re-parsing the body:
245    ///
246    /// * `status` — HTTP status code.
247    /// * `message` — the `error` string from the JSON body (or
248    ///   the raw body if it didn't parse as JSON).
249    /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
250    ///   backed by a transport error; `None` for 4xxs.
251    /// * `retryable` — server-supplied hint; `None` for 4xxs.
252    /// * `raw_body` — the full response body, preserved for logging
253    ///   when the JSON shape doesn't match.
254    #[error("admin api: {status}: {message}")]
255    AdminApi {
256        status: u16,
257        message: String,
258        kind: Option<String>,
259        retryable: Option<bool>,
260        raw_body: String,
261    },
262}
263
264/// Renders `SdkError::Config` as `config: <context>[.<field>]: <message>`.
265/// The `field` slot is omitted when `None` (whole-object validation).
266fn fmt_config(context: &str, field: Option<&str>, message: &str) -> String {
267    match field {
268        Some(f) => format!("config: {context}.{f}: {message}"),
269        None => format!("config: {context}: {message}"),
270    }
271}
272
273/// Lift a native `ferriskey::Error` into [`SdkError::Backend`] via
274/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
275/// `?`-propagation ergonomic at FCALL/transport call sites while
276/// the public variant stays backend-agnostic.
277#[cfg(feature = "valkey-default")]
278impl From<ferriskey::Error> for SdkError {
279    fn from(err: ferriskey::Error) -> Self {
280        Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
281    }
282}
283
284/// Build an [`SdkError::BackendContext`] from a native
285/// `ferriskey::Error` and a call-site label, preserving the
286/// backend-agnostic shape on the public surface (#88).
287#[cfg(feature = "valkey-default")]
288pub(crate) fn backend_context(
289    err: ferriskey::Error,
290    context: impl Into<String>,
291) -> SdkError {
292    SdkError::BackendContext {
293        source: ff_backend_valkey::backend_error_from_ferriskey(&err),
294        context: context.into(),
295    }
296}
297
298/// Preserves the ergonomic `?`-propagation from FCALL sites that
299/// return `Result<_, ScriptError>`. Routes through `EngineError`'s
300/// typed classification so every call site gets the same
301/// variant-level detail without hand-written conversion.
302impl From<ff_script::error::ScriptError> for SdkError {
303    fn from(err: ff_script::error::ScriptError) -> Self {
304        // ff-script's `From<ScriptError> for EngineError` owns the
305        // mapping table (#58.6). See `ff_script::engine_error_ext`.
306        Self::Engine(Box::new(EngineError::from(err)))
307    }
308}
309
310impl From<EngineError> for SdkError {
311    fn from(err: EngineError) -> Self {
312        Self::Engine(Box::new(err))
313    }
314}
315
316impl SdkError {
317    /// Returns the classified [`BackendErrorKind`] if this error
318    /// carries a backend transport fault. Covers the direct
319    /// [`SdkError::Backend`] / [`SdkError::BackendContext`] variants
320    /// and `Engine(EngineError::Transport { .. })` via the
321    /// ScriptError-aware downcast in `ff_script::engine_error_ext`.
322    ///
323    /// Renamed from `valkey_kind` in #88 — the previous return type
324    /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
325    /// consumer doing retry classification.
326    pub fn backend_kind(&self) -> Option<BackendErrorKind> {
327        match self {
328            Self::Backend(be) => Some(be.kind()),
329            Self::BackendContext { source, .. } => Some(source.kind()),
330            #[cfg(feature = "valkey-default")]
331            Self::Engine(e) => ff_script::engine_error_ext::valkey_kind(e)
332                .map(ff_backend_valkey::classify_ferriskey_kind),
333            #[cfg(not(feature = "valkey-default"))]
334            Self::Engine(_) => None,
335            // HTTP/admin-surface errors carry no backend fault;
336            // the admin path never touches the backend directly from
337            // the SDK side. Use `AdminApi.kind` for the server-supplied
338            // label when present.
339            Self::Config { .. }
340            | Self::WorkerAtCapacity
341            | Self::Http { .. }
342            | Self::AdminApi { .. } => None,
343        }
344    }
345
346    /// Whether this error is safely retryable by a caller. For backend
347    /// transport variants, delegates to
348    /// [`BackendErrorKind::is_retryable`]. For `Engine` errors, returns
349    /// `true` iff the typed classification is
350    /// `ErrorClass::Retryable`. `Config` errors are never retryable.
351    pub fn is_retryable(&self) -> bool {
352        match self {
353            Self::Backend(be) | Self::BackendContext { source: be, .. } => {
354                be.kind().is_retryable()
355            }
356            Self::Engine(e) => {
357                matches!(
358                    ff_script::engine_error_ext::class(e),
359                    ff_core::error::ErrorClass::Retryable
360                )
361            }
362            // WorkerAtCapacity is retryable: the saturation is transient
363            // and clears as soon as a concurrent task completes.
364            Self::WorkerAtCapacity => true,
365            // HTTP transport: timeouts and connect failures are
366            // retryable (transient network state); body-decode or
367            // request-build errors are terminal (caller must fix
368            // the code). `reqwest::Error` exposes both predicates.
369            Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
370            // Admin API errors: trust the server's `retryable` hint
371            // when present; otherwise fall back to the HTTP-standard
372            // retryable-status set (429, 502, 503, 504). 5xxs without
373            // a hint are conservatively non-retryable — the caller can
374            // override with `AdminApi.status`-based logic if needed.
375            // 502 covers reverse-proxy transients (ALB/nginx returning
376            // Bad Gateway when ff-server restarts mid-request).
377            Self::AdminApi {
378                status, retryable, ..
379            } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
380            Self::Config { .. } => false,
381        }
382    }
383}
384
385#[cfg(all(test, feature = "valkey-default"))]
386mod tests {
387    use super::*;
388    use ferriskey::ErrorKind;
389    use ff_script::error::ScriptError;
390
391    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
392        ferriskey::Error::from((kind, "synthetic"))
393    }
394
395    #[test]
396    fn backend_kind_direct_and_context() {
397        assert_eq!(
398            SdkError::from(mk_fk_err(ErrorKind::IoError)).backend_kind(),
399            Some(BackendErrorKind::Transport)
400        );
401        assert_eq!(
402            crate::backend_context(mk_fk_err(ErrorKind::BusyLoadingError), "connect")
403                .backend_kind(),
404            Some(BackendErrorKind::BusyLoading)
405        );
406    }
407
408    #[test]
409    fn backend_kind_delegates_through_engine_transport() {
410        let err = SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
411        assert_eq!(err.backend_kind(), Some(BackendErrorKind::Cluster));
412    }
413
414    #[test]
415    fn backend_kind_none_for_lua_and_config() {
416        assert_eq!(
417            SdkError::from(ScriptError::LeaseExpired).backend_kind(),
418            None
419        );
420        assert_eq!(
421            SdkError::Config {
422                context: "worker_config".into(),
423                field: Some("bearer_token".into()),
424                message: "bad host".into(),
425            }
426            .backend_kind(),
427            None
428        );
429    }
430
431    #[test]
432    fn is_retryable_transport() {
433        // Transport-bucketed kinds (IoError, FatalSend/Receive,
434        // ProtocolDesync) are retryable under the #88 classifier.
435        assert!(SdkError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
436        // Auth-bucketed kinds are terminal.
437        assert!(!SdkError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
438        // Protocol-bucketed kinds (ResponseError, ParseError, TypeError,
439        // InvalidClientConfig, etc.) are terminal.
440        assert!(!SdkError::from(mk_fk_err(ErrorKind::ResponseError)).is_retryable());
441    }
442
443    #[test]
444    fn is_retryable_engine_delegates_to_class() {
445        // NoEligibleExecution is classified Retryable via EngineError::class().
446        assert!(SdkError::from(ScriptError::NoEligibleExecution).is_retryable());
447        // StaleLease is Terminal.
448        assert!(!SdkError::from(ScriptError::StaleLease).is_retryable());
449        // Transport(Valkey(IoError)) is Retryable via class() delegation.
450        assert!(
451            SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
452        );
453    }
454
455    /// Regression (#98): `SdkError::Config` carries `context`, optional
456    /// `field`, and `message` separately so consumers can pattern-match on
457    /// the offending field without parsing the Display string. Test covers
458    /// both the field-scoped and whole-object renderings.
459    #[test]
460    fn config_structured_fields_render_and_match() {
461        let with_field = SdkError::Config {
462            context: "admin_client".into(),
463            field: Some("bearer_token".into()),
464            message: "is empty or all-whitespace".into(),
465        };
466        assert_eq!(
467            with_field.to_string(),
468            "config: admin_client.bearer_token: is empty or all-whitespace"
469        );
470        assert!(matches!(
471            &with_field,
472            SdkError::Config { field: Some(f), .. } if f == "bearer_token"
473        ));
474
475        let whole_object = SdkError::Config {
476            context: "worker_config".into(),
477            field: None,
478            message: "at least one lane is required".into(),
479        };
480        assert_eq!(
481            whole_object.to_string(),
482            "config: worker_config: at least one lane is required"
483        );
484        assert!(matches!(
485            &whole_object,
486            SdkError::Config { field: None, .. }
487        ));
488    }
489
490    #[test]
491    fn is_retryable_config_false() {
492        assert!(
493            !SdkError::Config {
494                context: "worker_config".into(),
495                field: None,
496                message: "at least one lane is required".into(),
497            }
498            .is_retryable()
499        );
500    }
501
502    #[test]
503    fn is_retryable_admin_api_uses_server_hint_when_present() {
504        let err = SdkError::AdminApi {
505            status: 429,
506            message: "throttled".into(),
507            kind: None,
508            retryable: Some(false),
509            raw_body: String::new(),
510        };
511        assert!(!err.is_retryable());
512
513        let err = SdkError::AdminApi {
514            status: 500,
515            message: "valkey timeout".into(),
516            kind: Some("IoError".into()),
517            retryable: Some(true),
518            raw_body: String::new(),
519        };
520        assert!(err.is_retryable());
521    }
522
523    #[test]
524    fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
525        // 502 covers ALB/nginx Bad Gateway transients on ff-server
526        // restart — same retry-is-safe as 503/504 because rotation
527        // is idempotent server-side.
528        for s in [429u16, 502, 503, 504] {
529            let err = SdkError::AdminApi {
530                status: s,
531                message: "x".into(),
532                kind: None,
533                retryable: None,
534                raw_body: String::new(),
535            };
536            assert!(err.is_retryable(), "status {s} should be retryable");
537        }
538        for s in [400u16, 401, 403, 404, 500] {
539            let err = SdkError::AdminApi {
540                status: s,
541                message: "x".into(),
542                kind: None,
543                retryable: None,
544                raw_body: String::new(),
545            };
546            assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
547        }
548    }
549
550    #[test]
551    fn valkey_kind_none_for_admin_surface() {
552        let err = SdkError::AdminApi {
553            status: 500,
554            message: "x".into(),
555            kind: Some("IoError".into()),
556            retryable: Some(true),
557            raw_body: String::new(),
558        };
559        assert_eq!(err.backend_kind(), None);
560    }
561}