Skip to main content

ff_sdk/
lib.rs

1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! ```rust,ignore
12//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
13//! use std::time::Duration;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), ff_sdk::SdkError> {
17//!     let config = WorkerConfig::new(
18//!         "localhost",
19//!         6379,
20//!         "my-worker",
21//!         "my-worker-instance-1",
22//!         "default",
23//!         "main",
24//!     );
25//!
26//!     let worker = FlowFabricWorker::connect(config).await?;
27//!
28//!     loop {
29//!         match worker.claim_next().await? {
30//!             Some(task) => {
31//!                 println!("claimed: {}", task.execution_id());
32//!                 // Process task...
33//!                 task.complete(Some(b"done".to_vec())).await?;
34//!             }
35//!             None => {
36//!                 tokio::time::sleep(Duration::from_secs(1)).await;
37//!             }
38//!         }
39//!     }
40//! }
41//! ```
42//!
43//! # Migration: `direct-valkey-claim` → scheduler-issued grants
44//!
45//! The `direct-valkey-claim` cargo feature — which gates
46//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
47//! the pair of scheduler-issued grant entry points:
48//!
49//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
50//!   `ff_scheduler::Scheduler::claim_for_worker` to obtain the
51//!   [`ClaimGrant`], then hand it to the SDK.
52//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
53//!   for an `attempt_interrupted` execution. Wraps a
54//!   [`ReclaimGrant`].
55//!
56//! `claim_next` bypasses budget and quota admission control; the
57//! grant-based path does not. See each method's rustdoc for the
58//! exact migration recipe.
59//!
60//! [`ClaimGrant`]: ff_core::contracts::ClaimGrant
61//! [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
62
63pub mod admin;
64pub mod config;
65pub mod task;
66pub mod worker;
67
68// Re-exports for convenience
69pub use admin::{
70    FlowFabricAdminClient, RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
71};
72pub use config::WorkerConfig;
73pub use task::{
74    read_stream, tail_stream, AppendFrameOutcome, ClaimedTask, ConditionMatcher, FailOutcome,
75    ResumeSignal, Signal, SignalOutcome, StreamFrames, SuspendOutcome, TimeoutBehavior,
76    MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
77};
78pub use worker::FlowFabricWorker;
79
80/// SDK error type.
81#[derive(Debug, thiserror::Error)]
82pub enum SdkError {
83    /// Valkey connection or command error (preserves ErrorKind for caller inspection).
84    #[error("valkey: {0}")]
85    Valkey(#[from] ferriskey::Error),
86
87    /// Valkey error with additional context.
88    #[error("valkey: {context}: {source}")]
89    ValkeyContext {
90        #[source]
91        source: ferriskey::Error,
92        context: String,
93    },
94
95    /// FlowFabric Lua script error.
96    #[error("script: {0}")]
97    Script(#[from] ff_script::error::ScriptError),
98
99    /// Configuration error.
100    #[error("config: {0}")]
101    Config(String),
102
103    /// Worker is at its configured `max_concurrent_tasks` capacity —
104    /// the caller should retry later. Returned by
105    /// [`FlowFabricWorker::claim_from_grant`] and
106    /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
107    /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
108    /// a `ClaimGrant`/`ReclaimGrant` represents real work already
109    /// selected by the scheduler, so silently dropping it would waste
110    /// the grant and let the grant TTL elapse. Surfacing the
111    /// saturation lets the caller release the grant (or wait +
112    /// retry).
113    ///
114    /// # Classification
115    ///
116    /// * [`SdkError::is_retryable`] returns `true` — saturation is
117    ///   transient: any in-flight task's
118    ///   complete/fail/cancel/drop releases a permit. Retry after
119    ///   milliseconds, not a retry loop with backoff for a Valkey
120    ///   transport failure.
121    /// * [`SdkError::valkey_kind`] returns `None` — this is not a
122    ///   Valkey transport or Lua error, so there is no
123    ///   `ferriskey::ErrorKind` to inspect. Callers that fan out
124    ///   on `valkey_kind()` should match `WorkerAtCapacity`
125    ///   explicitly (or use `is_retryable()`).
126    ///
127    /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
128    /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
129    #[error("worker at capacity: max_concurrent_tasks reached")]
130    WorkerAtCapacity,
131
132    /// HTTP transport error from the admin REST surface. Carries
133    /// the underlying `reqwest::Error` via `#[source]` so callers
134    /// can inspect `is_timeout()` / `is_connect()` / etc. for
135    /// finer-grained retry logic. Distinct from
136    /// [`SdkError::Valkey`] — this fires on the HTTP/JSON surface,
137    /// not on the Lua/Valkey hot path.
138    #[error("http: {context}: {source}")]
139    Http {
140        #[source]
141        source: reqwest::Error,
142        context: String,
143    },
144
145    /// The admin REST endpoint returned a non-2xx response.
146    ///
147    /// Fields surface the server-side `ErrorBody` JSON shape
148    /// (`{ error, kind?, retryable? }`) as structured values so
149    /// cairn-fabric and other consumers can match without
150    /// re-parsing the body:
151    ///
152    /// * `status` — HTTP status code.
153    /// * `message` — the `error` string from the JSON body (or
154    ///   the raw body if it didn't parse as JSON).
155    /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
156    ///   backed by a transport error; `None` for 4xxs.
157    /// * `retryable` — server-supplied hint; `None` for 4xxs.
158    /// * `raw_body` — the full response body, preserved for logging
159    ///   when the JSON shape doesn't match.
160    #[error("admin api: {status}: {message}")]
161    AdminApi {
162        status: u16,
163        message: String,
164        kind: Option<String>,
165        retryable: Option<bool>,
166        raw_body: String,
167    },
168}
169
170impl SdkError {
171    /// Returns the underlying ferriskey `ErrorKind` if this error carries one.
172    /// Covers transport variants (`Valkey`, `ValkeyContext`) directly and
173    /// `Script(ScriptError::Valkey(...))` via delegation.
174    pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
175        match self {
176            Self::Valkey(e) => Some(e.kind()),
177            Self::ValkeyContext { source, .. } => Some(source.kind()),
178            Self::Script(e) => e.valkey_kind(),
179            // HTTP/admin-surface errors carry no ferriskey::ErrorKind;
180            // the admin path never touches Valkey directly from the
181            // SDK side. Use `AdminApi.kind` for the server-supplied
182            // label when present.
183            Self::Config(_) | Self::WorkerAtCapacity | Self::Http { .. } | Self::AdminApi { .. } => {
184                None
185            }
186        }
187    }
188
189    /// Whether this error is safely retryable by a caller. For transport
190    /// variants, delegates to [`ff_script::retry::is_retryable_kind`]. For
191    /// `Script` errors, returns `true` iff the Lua error's classification
192    /// is `ErrorClass::Retryable`. `Config` errors are never retryable.
193    pub fn is_retryable(&self) -> bool {
194        match self {
195            Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
196                ff_script::retry::is_retryable_kind(e.kind())
197            }
198            Self::Script(e) => {
199                matches!(e.class(), ff_core::error::ErrorClass::Retryable)
200            }
201            // WorkerAtCapacity is retryable: the saturation is transient
202            // and clears as soon as a concurrent task completes.
203            Self::WorkerAtCapacity => true,
204            // HTTP transport: timeouts and connect failures are
205            // retryable (transient network state); body-decode or
206            // request-build errors are terminal (caller must fix
207            // the code). `reqwest::Error` exposes both predicates.
208            Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
209            // Admin API errors: trust the server's `retryable` hint
210            // when present; otherwise fall back to the HTTP-standard
211            // retryable-status set (429, 502, 503, 504). 5xxs without
212            // a hint are conservatively non-retryable — the caller can
213            // override with `AdminApi.status`-based logic if needed.
214            // 502 covers reverse-proxy transients (ALB/nginx returning
215            // Bad Gateway when ff-server restarts mid-request).
216            Self::AdminApi {
217                status, retryable, ..
218            } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
219            Self::Config(_) => false,
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use ferriskey::ErrorKind;
228    use ff_script::error::ScriptError;
229
230    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
231        ferriskey::Error::from((kind, "synthetic"))
232    }
233
234    #[test]
235    fn valkey_kind_direct_and_context() {
236        assert_eq!(
237            SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).valkey_kind(),
238            Some(ErrorKind::IoError)
239        );
240        assert_eq!(
241            SdkError::ValkeyContext {
242                source: mk_fk_err(ErrorKind::BusyLoadingError),
243                context: "connect".into()
244            }
245            .valkey_kind(),
246            Some(ErrorKind::BusyLoadingError)
247        );
248    }
249
250    #[test]
251    fn valkey_kind_delegates_through_script_transport() {
252        let err = SdkError::Script(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
253        assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
254    }
255
256    #[test]
257    fn valkey_kind_none_for_lua_and_config() {
258        assert_eq!(
259            SdkError::Script(ScriptError::LeaseExpired).valkey_kind(),
260            None
261        );
262        assert_eq!(SdkError::Config("bad host".into()).valkey_kind(), None);
263    }
264
265    #[test]
266    fn is_retryable_transport() {
267        assert!(SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
268        assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
269        assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
270    }
271
272    #[test]
273    fn is_retryable_script_delegates_to_class() {
274        // NoEligibleExecution is classified Retryable in ScriptError::class().
275        assert!(SdkError::Script(ScriptError::NoEligibleExecution).is_retryable());
276        // StaleLease is Terminal.
277        assert!(!SdkError::Script(ScriptError::StaleLease).is_retryable());
278        // Script::Valkey(IoError) is Retryable via class() delegation.
279        assert!(
280            SdkError::Script(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
281        );
282    }
283
284    #[test]
285    fn is_retryable_config_false() {
286        assert!(!SdkError::Config("at least one lane is required".into()).is_retryable());
287    }
288
289    #[test]
290    fn is_retryable_admin_api_uses_server_hint_when_present() {
291        let err = SdkError::AdminApi {
292            status: 429,
293            message: "throttled".into(),
294            kind: None,
295            retryable: Some(false),
296            raw_body: String::new(),
297        };
298        assert!(!err.is_retryable());
299
300        let err = SdkError::AdminApi {
301            status: 500,
302            message: "valkey timeout".into(),
303            kind: Some("IoError".into()),
304            retryable: Some(true),
305            raw_body: String::new(),
306        };
307        assert!(err.is_retryable());
308    }
309
310    #[test]
311    fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
312        // 502 covers ALB/nginx Bad Gateway transients on ff-server
313        // restart — same retry-is-safe as 503/504 because rotation
314        // is idempotent server-side.
315        for s in [429u16, 502, 503, 504] {
316            let err = SdkError::AdminApi {
317                status: s,
318                message: "x".into(),
319                kind: None,
320                retryable: None,
321                raw_body: String::new(),
322            };
323            assert!(err.is_retryable(), "status {s} should be retryable");
324        }
325        for s in [400u16, 401, 403, 404, 500] {
326            let err = SdkError::AdminApi {
327                status: s,
328                message: "x".into(),
329                kind: None,
330                retryable: None,
331                raw_body: String::new(),
332            };
333            assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
334        }
335    }
336
337    #[test]
338    fn valkey_kind_none_for_admin_surface() {
339        let err = SdkError::AdminApi {
340            status: 500,
341            message: "x".into(),
342            kind: Some("IoError".into()),
343            retryable: Some(true),
344            raw_body: String::new(),
345        };
346        assert_eq!(err.valkey_kind(), None);
347    }
348}