Skip to main content

ff_sdk/
lib.rs

1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! The production claim path is
12//! [`FlowFabricWorker::claim_from_grant`]: obtain a
13//! [`ClaimGrant`] from `ff_scheduler::Scheduler::claim_for_worker`
14//! (the scheduler enforces budget, quota, and capability checks),
15//! then hand it to the SDK. `claim_next` is gated behind the
16//! default-off `direct-valkey-claim` feature and bypasses admission
17//! control — fine for benchmarks, not production.
18//!
19//! ```rust,ignore
20//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
21//! use ff_core::types::LaneId;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), ff_sdk::SdkError> {
25//!     let config = WorkerConfig::new(
26//!         "localhost",
27//!         6379,
28//!         "my-worker",
29//!         "my-worker-instance-1",
30//!         "default",
31//!         "main",
32//!     );
33//!
34//!     let worker = FlowFabricWorker::connect(config).await?;
35//!     let lane = LaneId::new("main");
36//!
37//!     // In a real deployment `grant` is obtained from the
38//!     // scheduler's `claim_for_worker` RPC/helper; it carries the
39//!     // execution id, capability match, and admission result.
40//!     # let grant: ff_core::contracts::ClaimGrant = unimplemented!();
41//!     let task = worker.claim_from_grant(lane, grant).await?;
42//!     println!("claimed: {}", task.execution_id());
43//!     // Process task...
44//!     task.complete(Some(b"done".to_vec())).await?;
45//!     Ok(())
46//! }
47//! ```
48//!
49//! # Migration: `direct-valkey-claim` → scheduler-issued grants
50//!
51//! The `direct-valkey-claim` cargo feature — which gates
52//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
53//! the pair of scheduler-issued grant entry points:
54//!
55//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
56//!   `ff_scheduler::Scheduler::claim_for_worker` to obtain the
57//!   [`ClaimGrant`], then hand it to the SDK.
58//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
59//!   for an `attempt_interrupted` execution. Wraps a
60//!   [`ReclaimGrant`].
61//!
62//! `claim_next` bypasses budget and quota admission control; the
63//! grant-based path does not. See each method's rustdoc for the
64//! exact migration recipe.
65//!
66//! [`ClaimGrant`]: ff_core::contracts::ClaimGrant
67//! [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
68
69#[cfg(feature = "valkey-default")]
70pub mod admin;
71pub mod config;
72pub mod engine_error;
73#[cfg(feature = "valkey-default")]
74pub mod snapshot;
75#[cfg(feature = "valkey-default")]
76pub mod task;
77#[cfg(feature = "valkey-default")]
78pub mod worker;
79
80// Re-exports for convenience
81#[cfg(feature = "valkey-default")]
82pub use admin::{
83    rotate_waitpoint_hmac_secret_all_partitions, FlowFabricAdminClient, PartitionRotationOutcome,
84    RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
85};
86pub use config::WorkerConfig;
87pub use engine_error::{
88    BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
89};
90// `FailOutcome` is ff-core-native (Stage 1a move); re-export
91// unconditionally so consumers can name `ff_sdk::FailOutcome` even
92// under `--no-default-features`.
93pub use ff_core::backend::FailOutcome;
94// `ResumeSignal` is also ff-core-native (Stage 0 move).
95pub use ff_core::backend::ResumeSignal;
96#[cfg(feature = "valkey-default")]
97pub use task::{
98    read_stream, tail_stream, AppendFrameOutcome, ClaimedTask, ConditionMatcher, Signal,
99    SignalOutcome, StreamCursor, StreamFrames, SuspendOutcome, TimeoutBehavior,
100    MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
101};
102#[cfg(feature = "valkey-default")]
103pub use worker::FlowFabricWorker;
104
105/// SDK error type.
106#[derive(Debug, thiserror::Error)]
107pub enum SdkError {
108    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
109    #[cfg(feature = "valkey-default")]
110    #[error("valkey: {0}")]
111    Valkey(#[from] ferriskey::Error),
112
113    /// Valkey error with additional context.
114    #[cfg(feature = "valkey-default")]
115    #[error("valkey: {context}: {source}")]
116    ValkeyContext {
117        #[source]
118        source: ferriskey::Error,
119        context: String,
120    },
121
122    /// FlowFabric engine error — typed sum over Lua error codes + transport
123    /// faults. See [`EngineError`] for the variant-granularity contract.
124    /// Replaces the previous `Script(ScriptError)` carrier (#58.6).
125    ///
126    /// `Box`ed to keep `SdkError`'s stack footprint small: the richest
127    /// variant (`ConflictKind::DependencyAlreadyExists { existing:
128    /// EdgeSnapshot }`) is ~200 bytes. Boxing keeps `Result<T, SdkError>`
129    /// at the same width every other variant pays.
130    #[error("engine: {0}")]
131    Engine(Box<EngineError>),
132
133    /// Configuration error. `context` identifies the call site / logical
134    /// operation (e.g. `"describe_execution: exec_core"`, `"admin_client"`).
135    /// `field` names the specific offending field when the error is
136    /// field-scoped (e.g. `Some("public_state")`), or `None` for
137    /// whole-object validation (e.g. `"at least one lane is required"`).
138    /// `message` carries dynamic detail: source-error rendering, the
139    /// offending raw value, etc.
140    #[error("{}", fmt_config(.context, .field.as_deref(), .message))]
141    Config {
142        context: String,
143        field: Option<String>,
144        message: String,
145    },
146
147    /// Worker is at its configured `max_concurrent_tasks` capacity —
148    /// the caller should retry later. Returned by
149    /// [`FlowFabricWorker::claim_from_grant`] and
150    /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
151    /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
152    /// a `ClaimGrant`/`ReclaimGrant` represents real work already
153    /// selected by the scheduler, so silently dropping it would waste
154    /// the grant and let the grant TTL elapse. Surfacing the
155    /// saturation lets the caller release the grant (or wait +
156    /// retry).
157    ///
158    /// # Classification
159    ///
160    /// * [`SdkError::is_retryable`] returns `true` — saturation is
161    ///   transient: any in-flight task's
162    ///   complete/fail/cancel/drop releases a permit. Retry after
163    ///   milliseconds, not a retry loop with backoff for a Valkey
164    ///   transport failure.
165    /// * [`SdkError::valkey_kind`] returns `None` — this is not a
166    ///   Valkey transport or Lua error, so there is no
167    ///   `ferriskey::ErrorKind` to inspect. Callers that fan out
168    ///   on `valkey_kind()` should match `WorkerAtCapacity`
169    ///   explicitly (or use `is_retryable()`).
170    ///
171    /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
172    /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
173    #[error("worker at capacity: max_concurrent_tasks reached")]
174    WorkerAtCapacity,
175
176    /// HTTP transport error from the admin REST surface. Carries
177    /// the underlying `reqwest::Error` via `#[source]` so callers
178    /// can inspect `is_timeout()` / `is_connect()` / etc. for
179    /// finer-grained retry logic. Distinct from
180    /// [`SdkError::Valkey`] — this fires on the HTTP/JSON surface,
181    /// not on the Lua/Valkey hot path.
182    #[error("http: {context}: {source}")]
183    Http {
184        #[source]
185        source: reqwest::Error,
186        context: String,
187    },
188
189    /// The admin REST endpoint returned a non-2xx response.
190    ///
191    /// Fields surface the server-side `ErrorBody` JSON shape
192    /// (`{ error, kind?, retryable? }`) as structured values so
193    /// cairn-fabric and other consumers can match without
194    /// re-parsing the body:
195    ///
196    /// * `status` — HTTP status code.
197    /// * `message` — the `error` string from the JSON body (or
198    ///   the raw body if it didn't parse as JSON).
199    /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
200    ///   backed by a transport error; `None` for 4xxs.
201    /// * `retryable` — server-supplied hint; `None` for 4xxs.
202    /// * `raw_body` — the full response body, preserved for logging
203    ///   when the JSON shape doesn't match.
204    #[error("admin api: {status}: {message}")]
205    AdminApi {
206        status: u16,
207        message: String,
208        kind: Option<String>,
209        retryable: Option<bool>,
210        raw_body: String,
211    },
212}
213
214/// Renders `SdkError::Config` as `config: <context>[.<field>]: <message>`.
215/// The `field` slot is omitted when `None` (whole-object validation).
216fn fmt_config(context: &str, field: Option<&str>, message: &str) -> String {
217    match field {
218        Some(f) => format!("config: {context}.{f}: {message}"),
219        None => format!("config: {context}: {message}"),
220    }
221}
222
223/// Preserves the ergonomic `?`-propagation from FCALL sites that
224/// return `Result<_, ScriptError>`. Routes through `EngineError`'s
225/// typed classification so every call site gets the same
226/// variant-level detail without hand-written conversion.
227impl From<ff_script::error::ScriptError> for SdkError {
228    fn from(err: ff_script::error::ScriptError) -> Self {
229        // ff-script's `From<ScriptError> for EngineError` owns the
230        // mapping table (#58.6). See `ff_script::engine_error_ext`.
231        Self::Engine(Box::new(EngineError::from(err)))
232    }
233}
234
235impl From<EngineError> for SdkError {
236    fn from(err: EngineError) -> Self {
237        Self::Engine(Box::new(err))
238    }
239}
240
241impl SdkError {
242    /// Returns the underlying ferriskey `ErrorKind` if this error carries one.
243    /// Covers transport variants (`Valkey`, `ValkeyContext`) directly and
244    /// `Engine(EngineError::Transport { .. })` via delegation.
245    #[cfg(feature = "valkey-default")]
246    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
247        match self {
248            Self::Valkey(e) => Some(e.kind()),
249            Self::ValkeyContext { source, .. } => Some(source.kind()),
250            Self::Engine(e) => ff_script::engine_error_ext::valkey_kind(e),
251            // HTTP/admin-surface errors carry no ferriskey::ErrorKind;
252            // the admin path never touches Valkey directly from the
253            // SDK side. Use `AdminApi.kind` for the server-supplied
254            // label when present.
255            Self::Config { .. } | Self::WorkerAtCapacity | Self::Http { .. } | Self::AdminApi { .. } => {
256                None
257            }
258        }
259    }
260
261    /// Whether this error is safely retryable by a caller. For transport
262    /// variants, delegates to [`ff_script::retry::is_retryable_kind`]. For
263    /// `Engine` errors, returns `true` iff the typed classification is
264    /// `ErrorClass::Retryable`. `Config` errors are never retryable.
265    pub fn is_retryable(&self) -> bool {
266        match self {
267            #[cfg(feature = "valkey-default")]
268            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
269                ff_script::retry::is_retryable_kind(e.kind())
270            }
271            Self::Engine(e) => {
272                matches!(
273                    ff_script::engine_error_ext::class(e),
274                    ff_core::error::ErrorClass::Retryable
275                )
276            }
277            // WorkerAtCapacity is retryable: the saturation is transient
278            // and clears as soon as a concurrent task completes.
279            Self::WorkerAtCapacity => true,
280            // HTTP transport: timeouts and connect failures are
281            // retryable (transient network state); body-decode or
282            // request-build errors are terminal (caller must fix
283            // the code). `reqwest::Error` exposes both predicates.
284            Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
285            // Admin API errors: trust the server's `retryable` hint
286            // when present; otherwise fall back to the HTTP-standard
287            // retryable-status set (429, 502, 503, 504). 5xxs without
288            // a hint are conservatively non-retryable — the caller can
289            // override with `AdminApi.status`-based logic if needed.
290            // 502 covers reverse-proxy transients (ALB/nginx returning
291            // Bad Gateway when ff-server restarts mid-request).
292            Self::AdminApi {
293                status, retryable, ..
294            } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
295            Self::Config { .. } => false,
296        }
297    }
298}
299
300#[cfg(all(test, feature = "valkey-default"))]
301mod tests {
302    use super::*;
303    use ferriskey::ErrorKind;
304    use ff_script::error::ScriptError;
305
306    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
307        ferriskey::Error::from((kind, "synthetic"))
308    }
309
310    #[test]
311    fn valkey_kind_direct_and_context() {
312        assert_eq!(
313            SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).valkey_kind(),
314            Some(ErrorKind::IoError)
315        );
316        assert_eq!(
317            SdkError::ValkeyContext {
318                source: mk_fk_err(ErrorKind::BusyLoadingError),
319                context: "connect".into()
320            }
321            .valkey_kind(),
322            Some(ErrorKind::BusyLoadingError)
323        );
324    }
325
326    #[test]
327    fn valkey_kind_delegates_through_engine_transport() {
328        let err = SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
329        assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
330    }
331
332    #[test]
333    fn valkey_kind_none_for_lua_and_config() {
334        assert_eq!(
335            SdkError::from(ScriptError::LeaseExpired).valkey_kind(),
336            None
337        );
338        assert_eq!(
339            SdkError::Config {
340                context: "worker_config".into(),
341                field: Some("bearer_token".into()),
342                message: "bad host".into(),
343            }
344            .valkey_kind(),
345            None
346        );
347    }
348
349    #[test]
350    fn is_retryable_transport() {
351        assert!(SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
352        assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
353        assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
354    }
355
356    #[test]
357    fn is_retryable_engine_delegates_to_class() {
358        // NoEligibleExecution is classified Retryable via EngineError::class().
359        assert!(SdkError::from(ScriptError::NoEligibleExecution).is_retryable());
360        // StaleLease is Terminal.
361        assert!(!SdkError::from(ScriptError::StaleLease).is_retryable());
362        // Transport(Valkey(IoError)) is Retryable via class() delegation.
363        assert!(
364            SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
365        );
366    }
367
368    /// Regression (#98): `SdkError::Config` carries `context`, optional
369    /// `field`, and `message` separately so consumers can pattern-match on
370    /// the offending field without parsing the Display string. Test covers
371    /// both the field-scoped and whole-object renderings.
372    #[test]
373    fn config_structured_fields_render_and_match() {
374        let with_field = SdkError::Config {
375            context: "admin_client".into(),
376            field: Some("bearer_token".into()),
377            message: "is empty or all-whitespace".into(),
378        };
379        assert_eq!(
380            with_field.to_string(),
381            "config: admin_client.bearer_token: is empty or all-whitespace"
382        );
383        assert!(matches!(
384            &with_field,
385            SdkError::Config { field: Some(f), .. } if f == "bearer_token"
386        ));
387
388        let whole_object = SdkError::Config {
389            context: "worker_config".into(),
390            field: None,
391            message: "at least one lane is required".into(),
392        };
393        assert_eq!(
394            whole_object.to_string(),
395            "config: worker_config: at least one lane is required"
396        );
397        assert!(matches!(
398            &whole_object,
399            SdkError::Config { field: None, .. }
400        ));
401    }
402
403    #[test]
404    fn is_retryable_config_false() {
405        assert!(
406            !SdkError::Config {
407                context: "worker_config".into(),
408                field: None,
409                message: "at least one lane is required".into(),
410            }
411            .is_retryable()
412        );
413    }
414
415    #[test]
416    fn is_retryable_admin_api_uses_server_hint_when_present() {
417        let err = SdkError::AdminApi {
418            status: 429,
419            message: "throttled".into(),
420            kind: None,
421            retryable: Some(false),
422            raw_body: String::new(),
423        };
424        assert!(!err.is_retryable());
425
426        let err = SdkError::AdminApi {
427            status: 500,
428            message: "valkey timeout".into(),
429            kind: Some("IoError".into()),
430            retryable: Some(true),
431            raw_body: String::new(),
432        };
433        assert!(err.is_retryable());
434    }
435
436    #[test]
437    fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
438        // 502 covers ALB/nginx Bad Gateway transients on ff-server
439        // restart — same retry-is-safe as 503/504 because rotation
440        // is idempotent server-side.
441        for s in [429u16, 502, 503, 504] {
442            let err = SdkError::AdminApi {
443                status: s,
444                message: "x".into(),
445                kind: None,
446                retryable: None,
447                raw_body: String::new(),
448            };
449            assert!(err.is_retryable(), "status {s} should be retryable");
450        }
451        for s in [400u16, 401, 403, 404, 500] {
452            let err = SdkError::AdminApi {
453                status: s,
454                message: "x".into(),
455                kind: None,
456                retryable: None,
457                raw_body: String::new(),
458            };
459            assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
460        }
461    }
462
463    #[test]
464    fn valkey_kind_none_for_admin_surface() {
465        let err = SdkError::AdminApi {
466            status: 500,
467            message: "x".into(),
468            kind: Some("IoError".into()),
469            retryable: Some(true),
470            raw_body: String::new(),
471        };
472        assert_eq!(err.valkey_kind(), None);
473    }
474}