Skip to main content

ff_sdk/
lib.rs

1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! ```rust,ignore
12//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
13//! use std::time::Duration;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), ff_sdk::SdkError> {
17//!     let config = WorkerConfig::new(
18//!         "localhost",
19//!         6379,
20//!         "my-worker",
21//!         "my-worker-instance-1",
22//!         "default",
23//!         "main",
24//!     );
25//!
26//!     let worker = FlowFabricWorker::connect(config).await?;
27//!
28//!     loop {
29//!         match worker.claim_next().await? {
30//!             Some(task) => {
31//!                 println!("claimed: {}", task.execution_id());
32//!                 // Process task...
33//!                 task.complete(Some(b"done".to_vec())).await?;
34//!             }
35//!             None => {
36//!                 tokio::time::sleep(Duration::from_secs(1)).await;
37//!             }
38//!         }
39//!     }
40//! }
41//! ```
42//!
43//! # Migration: `direct-valkey-claim` → scheduler-issued grants
44//!
45//! The `direct-valkey-claim` cargo feature — which gates
46//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
47//! the pair of scheduler-issued grant entry points:
48//!
49//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
50//!   `ff_scheduler::Scheduler::claim_for_worker` to obtain the
51//!   [`ClaimGrant`], then hand it to the SDK.
52//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
53//!   for an `attempt_interrupted` execution. Wraps a
54//!   [`ReclaimGrant`].
55//!
56//! `claim_next` bypasses budget and quota admission control; the
57//! grant-based path does not. See each method's rustdoc for the
58//! exact migration recipe.
59//!
60//! [`ClaimGrant`]: ff_core::contracts::ClaimGrant
61//! [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
62
63#[cfg(feature = "valkey-default")]
64pub mod admin;
65pub mod config;
66pub mod engine_error;
67#[cfg(feature = "valkey-default")]
68pub mod snapshot;
69#[cfg(feature = "valkey-default")]
70pub mod task;
71#[cfg(feature = "valkey-default")]
72pub mod worker;
73
74// Re-exports for convenience
75#[cfg(feature = "valkey-default")]
76pub use admin::{
77    rotate_waitpoint_hmac_secret_all_partitions, FlowFabricAdminClient, PartitionRotationOutcome,
78    RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
79};
80pub use config::WorkerConfig;
81pub use engine_error::{
82    BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
83};
84// `FailOutcome` is ff-core-native (Stage 1a move); re-export
85// unconditionally so consumers can name `ff_sdk::FailOutcome` even
86// under `--no-default-features`.
87pub use ff_core::backend::FailOutcome;
88// `ResumeSignal` is also ff-core-native (Stage 0 move).
89pub use ff_core::backend::ResumeSignal;
90#[cfg(feature = "valkey-default")]
91pub use task::{
92    read_stream, tail_stream, AppendFrameOutcome, ClaimedTask, ConditionMatcher, Signal,
93    SignalOutcome, StreamCursor, StreamFrames, SuspendOutcome, TimeoutBehavior,
94    MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
95};
96#[cfg(feature = "valkey-default")]
97pub use worker::FlowFabricWorker;
98
99/// SDK error type.
100#[derive(Debug, thiserror::Error)]
101pub enum SdkError {
102    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
103    #[cfg(feature = "valkey-default")]
104    #[error("valkey: {0}")]
105    Valkey(#[from] ferriskey::Error),
106
107    /// Valkey error with additional context.
108    #[cfg(feature = "valkey-default")]
109    #[error("valkey: {context}: {source}")]
110    ValkeyContext {
111        #[source]
112        source: ferriskey::Error,
113        context: String,
114    },
115
116    /// FlowFabric engine error — typed sum over Lua error codes + transport
117    /// faults. See [`EngineError`] for the variant-granularity contract.
118    /// Replaces the previous `Script(ScriptError)` carrier (#58.6).
119    ///
120    /// `Box`ed to keep `SdkError`'s stack footprint small: the richest
121    /// variant (`ConflictKind::DependencyAlreadyExists { existing:
122    /// EdgeSnapshot }`) is ~200 bytes. Boxing keeps `Result<T, SdkError>`
123    /// at the same width every other variant pays.
124    #[error("engine: {0}")]
125    Engine(Box<EngineError>),
126
127    /// Configuration error. `context` identifies the call site / logical
128    /// operation (e.g. `"describe_execution: exec_core"`, `"admin_client"`).
129    /// `field` names the specific offending field when the error is
130    /// field-scoped (e.g. `Some("public_state")`), or `None` for
131    /// whole-object validation (e.g. `"at least one lane is required"`).
132    /// `message` carries dynamic detail: source-error rendering, the
133    /// offending raw value, etc.
134    #[error("{}", fmt_config(.context, .field.as_deref(), .message))]
135    Config {
136        context: String,
137        field: Option<String>,
138        message: String,
139    },
140
141    /// Worker is at its configured `max_concurrent_tasks` capacity —
142    /// the caller should retry later. Returned by
143    /// [`FlowFabricWorker::claim_from_grant`] and
144    /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
145    /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
146    /// a `ClaimGrant`/`ReclaimGrant` represents real work already
147    /// selected by the scheduler, so silently dropping it would waste
148    /// the grant and let the grant TTL elapse. Surfacing the
149    /// saturation lets the caller release the grant (or wait +
150    /// retry).
151    ///
152    /// # Classification
153    ///
154    /// * [`SdkError::is_retryable`] returns `true` — saturation is
155    ///   transient: any in-flight task's
156    ///   complete/fail/cancel/drop releases a permit. Retry after
157    ///   milliseconds, not a retry loop with backoff for a Valkey
158    ///   transport failure.
159    /// * [`SdkError::valkey_kind`] returns `None` — this is not a
160    ///   Valkey transport or Lua error, so there is no
161    ///   `ferriskey::ErrorKind` to inspect. Callers that fan out
162    ///   on `valkey_kind()` should match `WorkerAtCapacity`
163    ///   explicitly (or use `is_retryable()`).
164    ///
165    /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
166    /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
167    #[error("worker at capacity: max_concurrent_tasks reached")]
168    WorkerAtCapacity,
169
170    /// HTTP transport error from the admin REST surface. Carries
171    /// the underlying `reqwest::Error` via `#[source]` so callers
172    /// can inspect `is_timeout()` / `is_connect()` / etc. for
173    /// finer-grained retry logic. Distinct from
174    /// [`SdkError::Valkey`] — this fires on the HTTP/JSON surface,
175    /// not on the Lua/Valkey hot path.
176    #[error("http: {context}: {source}")]
177    Http {
178        #[source]
179        source: reqwest::Error,
180        context: String,
181    },
182
183    /// The admin REST endpoint returned a non-2xx response.
184    ///
185    /// Fields surface the server-side `ErrorBody` JSON shape
186    /// (`{ error, kind?, retryable? }`) as structured values so
187    /// cairn-fabric and other consumers can match without
188    /// re-parsing the body:
189    ///
190    /// * `status` — HTTP status code.
191    /// * `message` — the `error` string from the JSON body (or
192    ///   the raw body if it didn't parse as JSON).
193    /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
194    ///   backed by a transport error; `None` for 4xxs.
195    /// * `retryable` — server-supplied hint; `None` for 4xxs.
196    /// * `raw_body` — the full response body, preserved for logging
197    ///   when the JSON shape doesn't match.
198    #[error("admin api: {status}: {message}")]
199    AdminApi {
200        status: u16,
201        message: String,
202        kind: Option<String>,
203        retryable: Option<bool>,
204        raw_body: String,
205    },
206}
207
208/// Renders `SdkError::Config` as `config: <context>[.<field>]: <message>`.
209/// The `field` slot is omitted when `None` (whole-object validation).
210fn fmt_config(context: &str, field: Option<&str>, message: &str) -> String {
211    match field {
212        Some(f) => format!("config: {context}.{f}: {message}"),
213        None => format!("config: {context}: {message}"),
214    }
215}
216
217/// Preserves the ergonomic `?`-propagation from FCALL sites that
218/// return `Result<_, ScriptError>`. Routes through `EngineError`'s
219/// typed classification so every call site gets the same
220/// variant-level detail without hand-written conversion.
221impl From<ff_script::error::ScriptError> for SdkError {
222    fn from(err: ff_script::error::ScriptError) -> Self {
223        // ff-script's `From<ScriptError> for EngineError` owns the
224        // mapping table (#58.6). See `ff_script::engine_error_ext`.
225        Self::Engine(Box::new(EngineError::from(err)))
226    }
227}
228
229impl From<EngineError> for SdkError {
230    fn from(err: EngineError) -> Self {
231        Self::Engine(Box::new(err))
232    }
233}
234
235impl SdkError {
236    /// Returns the underlying ferriskey `ErrorKind` if this error carries one.
237    /// Covers transport variants (`Valkey`, `ValkeyContext`) directly and
238    /// `Engine(EngineError::Transport { .. })` via delegation.
239    #[cfg(feature = "valkey-default")]
240    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
241        match self {
242            Self::Valkey(e) => Some(e.kind()),
243            Self::ValkeyContext { source, .. } => Some(source.kind()),
244            Self::Engine(e) => ff_script::engine_error_ext::valkey_kind(e),
245            // HTTP/admin-surface errors carry no ferriskey::ErrorKind;
246            // the admin path never touches Valkey directly from the
247            // SDK side. Use `AdminApi.kind` for the server-supplied
248            // label when present.
249            Self::Config { .. } | Self::WorkerAtCapacity | Self::Http { .. } | Self::AdminApi { .. } => {
250                None
251            }
252        }
253    }
254
255    /// Whether this error is safely retryable by a caller. For transport
256    /// variants, delegates to [`ff_script::retry::is_retryable_kind`]. For
257    /// `Engine` errors, returns `true` iff the typed classification is
258    /// `ErrorClass::Retryable`. `Config` errors are never retryable.
259    pub fn is_retryable(&self) -> bool {
260        match self {
261            #[cfg(feature = "valkey-default")]
262            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
263                ff_script::retry::is_retryable_kind(e.kind())
264            }
265            Self::Engine(e) => {
266                matches!(
267                    ff_script::engine_error_ext::class(e),
268                    ff_core::error::ErrorClass::Retryable
269                )
270            }
271            // WorkerAtCapacity is retryable: the saturation is transient
272            // and clears as soon as a concurrent task completes.
273            Self::WorkerAtCapacity => true,
274            // HTTP transport: timeouts and connect failures are
275            // retryable (transient network state); body-decode or
276            // request-build errors are terminal (caller must fix
277            // the code). `reqwest::Error` exposes both predicates.
278            Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
279            // Admin API errors: trust the server's `retryable` hint
280            // when present; otherwise fall back to the HTTP-standard
281            // retryable-status set (429, 502, 503, 504). 5xxs without
282            // a hint are conservatively non-retryable — the caller can
283            // override with `AdminApi.status`-based logic if needed.
284            // 502 covers reverse-proxy transients (ALB/nginx returning
285            // Bad Gateway when ff-server restarts mid-request).
286            Self::AdminApi {
287                status, retryable, ..
288            } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
289            Self::Config { .. } => false,
290        }
291    }
292}
293
294#[cfg(all(test, feature = "valkey-default"))]
295mod tests {
296    use super::*;
297    use ferriskey::ErrorKind;
298    use ff_script::error::ScriptError;
299
300    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
301        ferriskey::Error::from((kind, "synthetic"))
302    }
303
304    #[test]
305    fn valkey_kind_direct_and_context() {
306        assert_eq!(
307            SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).valkey_kind(),
308            Some(ErrorKind::IoError)
309        );
310        assert_eq!(
311            SdkError::ValkeyContext {
312                source: mk_fk_err(ErrorKind::BusyLoadingError),
313                context: "connect".into()
314            }
315            .valkey_kind(),
316            Some(ErrorKind::BusyLoadingError)
317        );
318    }
319
320    #[test]
321    fn valkey_kind_delegates_through_engine_transport() {
322        let err = SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
323        assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
324    }
325
326    #[test]
327    fn valkey_kind_none_for_lua_and_config() {
328        assert_eq!(
329            SdkError::from(ScriptError::LeaseExpired).valkey_kind(),
330            None
331        );
332        assert_eq!(
333            SdkError::Config {
334                context: "worker_config".into(),
335                field: Some("bearer_token".into()),
336                message: "bad host".into(),
337            }
338            .valkey_kind(),
339            None
340        );
341    }
342
343    #[test]
344    fn is_retryable_transport() {
345        assert!(SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
346        assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
347        assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
348    }
349
350    #[test]
351    fn is_retryable_engine_delegates_to_class() {
352        // NoEligibleExecution is classified Retryable via EngineError::class().
353        assert!(SdkError::from(ScriptError::NoEligibleExecution).is_retryable());
354        // StaleLease is Terminal.
355        assert!(!SdkError::from(ScriptError::StaleLease).is_retryable());
356        // Transport(Valkey(IoError)) is Retryable via class() delegation.
357        assert!(
358            SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
359        );
360    }
361
362    /// Regression (#98): `SdkError::Config` carries `context`, optional
363    /// `field`, and `message` separately so consumers can pattern-match on
364    /// the offending field without parsing the Display string. Test covers
365    /// both the field-scoped and whole-object renderings.
366    #[test]
367    fn config_structured_fields_render_and_match() {
368        let with_field = SdkError::Config {
369            context: "admin_client".into(),
370            field: Some("bearer_token".into()),
371            message: "is empty or all-whitespace".into(),
372        };
373        assert_eq!(
374            with_field.to_string(),
375            "config: admin_client.bearer_token: is empty or all-whitespace"
376        );
377        assert!(matches!(
378            &with_field,
379            SdkError::Config { field: Some(f), .. } if f == "bearer_token"
380        ));
381
382        let whole_object = SdkError::Config {
383            context: "worker_config".into(),
384            field: None,
385            message: "at least one lane is required".into(),
386        };
387        assert_eq!(
388            whole_object.to_string(),
389            "config: worker_config: at least one lane is required"
390        );
391        assert!(matches!(
392            &whole_object,
393            SdkError::Config { field: None, .. }
394        ));
395    }
396
397    #[test]
398    fn is_retryable_config_false() {
399        assert!(
400            !SdkError::Config {
401                context: "worker_config".into(),
402                field: None,
403                message: "at least one lane is required".into(),
404            }
405            .is_retryable()
406        );
407    }
408
409    #[test]
410    fn is_retryable_admin_api_uses_server_hint_when_present() {
411        let err = SdkError::AdminApi {
412            status: 429,
413            message: "throttled".into(),
414            kind: None,
415            retryable: Some(false),
416            raw_body: String::new(),
417        };
418        assert!(!err.is_retryable());
419
420        let err = SdkError::AdminApi {
421            status: 500,
422            message: "valkey timeout".into(),
423            kind: Some("IoError".into()),
424            retryable: Some(true),
425            raw_body: String::new(),
426        };
427        assert!(err.is_retryable());
428    }
429
430    #[test]
431    fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
432        // 502 covers ALB/nginx Bad Gateway transients on ff-server
433        // restart — same retry-is-safe as 503/504 because rotation
434        // is idempotent server-side.
435        for s in [429u16, 502, 503, 504] {
436            let err = SdkError::AdminApi {
437                status: s,
438                message: "x".into(),
439                kind: None,
440                retryable: None,
441                raw_body: String::new(),
442            };
443            assert!(err.is_retryable(), "status {s} should be retryable");
444        }
445        for s in [400u16, 401, 403, 404, 500] {
446            let err = SdkError::AdminApi {
447                status: s,
448                message: "x".into(),
449                kind: None,
450                retryable: None,
451                raw_body: String::new(),
452            };
453            assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
454        }
455    }
456
457    #[test]
458    fn valkey_kind_none_for_admin_surface() {
459        let err = SdkError::AdminApi {
460            status: 500,
461            message: "x".into(),
462            kind: Some("IoError".into()),
463            retryable: Some(true),
464            raw_body: String::new(),
465        };
466        assert_eq!(err.valkey_kind(), None);
467    }
468}