ff_sdk/lib.rs
1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! ```rust,ignore
12//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
13//! use std::time::Duration;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), ff_sdk::SdkError> {
17//! let config = WorkerConfig::new(
18//! "localhost",
19//! 6379,
20//! "my-worker",
21//! "my-worker-instance-1",
22//! "default",
23//! "main",
24//! );
25//!
26//! let worker = FlowFabricWorker::connect(config).await?;
27//!
28//! loop {
29//! match worker.claim_next().await? {
30//! Some(task) => {
31//! println!("claimed: {}", task.execution_id());
32//! // Process task...
33//! task.complete(Some(b"done".to_vec())).await?;
34//! }
35//! None => {
36//! tokio::time::sleep(Duration::from_secs(1)).await;
37//! }
38//! }
39//! }
40//! }
41//! ```
42//!
43//! # Migration: `direct-valkey-claim` → scheduler-issued grants
44//!
45//! The `direct-valkey-claim` cargo feature — which gates
46//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
47//! the pair of scheduler-issued grant entry points:
48//!
49//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
50//! `ff_scheduler::Scheduler::claim_for_worker` to obtain the
51//! [`ClaimGrant`], then hand it to the SDK.
52//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
53//! for an `attempt_interrupted` execution. Wraps a
54//! [`ReclaimGrant`].
55//!
56//! `claim_next` bypasses budget and quota admission control; the
57//! grant-based path does not. See each method's rustdoc for the
58//! exact migration recipe.
59//!
60//! [`ClaimGrant`]: ff_core::contracts::ClaimGrant
61//! [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
62
63pub mod admin;
64pub mod config;
65pub mod task;
66pub mod worker;
67
68// Re-exports for convenience
69pub use admin::{
70 FlowFabricAdminClient, RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
71};
72pub use config::WorkerConfig;
73pub use task::{
74 read_stream, tail_stream, AppendFrameOutcome, ClaimedTask, ConditionMatcher, FailOutcome,
75 ResumeSignal, Signal, SignalOutcome, StreamFrames, SuspendOutcome, TimeoutBehavior,
76 MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
77};
78pub use worker::FlowFabricWorker;
79
80/// SDK error type.
81#[derive(Debug, thiserror::Error)]
82pub enum SdkError {
83 /// Valkey connection or command error (preserves ErrorKind for caller inspection).
84 #[error("valkey: {0}")]
85 Valkey(#[from] ferriskey::Error),
86
87 /// Valkey error with additional context.
88 #[error("valkey: {context}: {source}")]
89 ValkeyContext {
90 #[source]
91 source: ferriskey::Error,
92 context: String,
93 },
94
95 /// FlowFabric Lua script error.
96 #[error("script: {0}")]
97 Script(#[from] ff_script::error::ScriptError),
98
99 /// Configuration error.
100 #[error("config: {0}")]
101 Config(String),
102
103 /// Worker is at its configured `max_concurrent_tasks` capacity —
104 /// the caller should retry later. Returned by
105 /// [`FlowFabricWorker::claim_from_grant`] and
106 /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
107 /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
108 /// a `ClaimGrant`/`ReclaimGrant` represents real work already
109 /// selected by the scheduler, so silently dropping it would waste
110 /// the grant and let the grant TTL elapse. Surfacing the
111 /// saturation lets the caller release the grant (or wait +
112 /// retry).
113 ///
114 /// # Classification
115 ///
116 /// * [`SdkError::is_retryable`] returns `true` — saturation is
117 /// transient: any in-flight task's
118 /// complete/fail/cancel/drop releases a permit. Retry after
119 /// milliseconds, not a retry loop with backoff for a Valkey
120 /// transport failure.
121 /// * [`SdkError::valkey_kind`] returns `None` — this is not a
122 /// Valkey transport or Lua error, so there is no
123 /// `ferriskey::ErrorKind` to inspect. Callers that fan out
124 /// on `valkey_kind()` should match `WorkerAtCapacity`
125 /// explicitly (or use `is_retryable()`).
126 ///
127 /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
128 /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
129 #[error("worker at capacity: max_concurrent_tasks reached")]
130 WorkerAtCapacity,
131
132 /// HTTP transport error from the admin REST surface. Carries
133 /// the underlying `reqwest::Error` via `#[source]` so callers
134 /// can inspect `is_timeout()` / `is_connect()` / etc. for
135 /// finer-grained retry logic. Distinct from
136 /// [`SdkError::Valkey`] — this fires on the HTTP/JSON surface,
137 /// not on the Lua/Valkey hot path.
138 #[error("http: {context}: {source}")]
139 Http {
140 #[source]
141 source: reqwest::Error,
142 context: String,
143 },
144
145 /// The admin REST endpoint returned a non-2xx response.
146 ///
147 /// Fields surface the server-side `ErrorBody` JSON shape
148 /// (`{ error, kind?, retryable? }`) as structured values so
149 /// cairn-fabric and other consumers can match without
150 /// re-parsing the body:
151 ///
152 /// * `status` — HTTP status code.
153 /// * `message` — the `error` string from the JSON body (or
154 /// the raw body if it didn't parse as JSON).
155 /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
156 /// backed by a transport error; `None` for 4xxs.
157 /// * `retryable` — server-supplied hint; `None` for 4xxs.
158 /// * `raw_body` — the full response body, preserved for logging
159 /// when the JSON shape doesn't match.
160 #[error("admin api: {status}: {message}")]
161 AdminApi {
162 status: u16,
163 message: String,
164 kind: Option<String>,
165 retryable: Option<bool>,
166 raw_body: String,
167 },
168}
169
170impl SdkError {
171 /// Returns the underlying ferriskey `ErrorKind` if this error carries one.
172 /// Covers transport variants (`Valkey`, `ValkeyContext`) directly and
173 /// `Script(ScriptError::Valkey(...))` via delegation.
174 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
175 match self {
176 Self::Valkey(e) => Some(e.kind()),
177 Self::ValkeyContext { source, .. } => Some(source.kind()),
178 Self::Script(e) => e.valkey_kind(),
179 // HTTP/admin-surface errors carry no ferriskey::ErrorKind;
180 // the admin path never touches Valkey directly from the
181 // SDK side. Use `AdminApi.kind` for the server-supplied
182 // label when present.
183 Self::Config(_) | Self::WorkerAtCapacity | Self::Http { .. } | Self::AdminApi { .. } => {
184 None
185 }
186 }
187 }
188
189 /// Whether this error is safely retryable by a caller. For transport
190 /// variants, delegates to [`ff_script::retry::is_retryable_kind`]. For
191 /// `Script` errors, returns `true` iff the Lua error's classification
192 /// is `ErrorClass::Retryable`. `Config` errors are never retryable.
193 pub fn is_retryable(&self) -> bool {
194 match self {
195 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
196 ff_script::retry::is_retryable_kind(e.kind())
197 }
198 Self::Script(e) => {
199 matches!(e.class(), ff_core::error::ErrorClass::Retryable)
200 }
201 // WorkerAtCapacity is retryable: the saturation is transient
202 // and clears as soon as a concurrent task completes.
203 Self::WorkerAtCapacity => true,
204 // HTTP transport: timeouts and connect failures are
205 // retryable (transient network state); body-decode or
206 // request-build errors are terminal (caller must fix
207 // the code). `reqwest::Error` exposes both predicates.
208 Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
209 // Admin API errors: trust the server's `retryable` hint
210 // when present; otherwise fall back to the HTTP-standard
211 // retryable-status set (429, 502, 503, 504). 5xxs without
212 // a hint are conservatively non-retryable — the caller can
213 // override with `AdminApi.status`-based logic if needed.
214 // 502 covers reverse-proxy transients (ALB/nginx returning
215 // Bad Gateway when ff-server restarts mid-request).
216 Self::AdminApi {
217 status, retryable, ..
218 } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
219 Self::Config(_) => false,
220 }
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use ferriskey::ErrorKind;
228 use ff_script::error::ScriptError;
229
230 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
231 ferriskey::Error::from((kind, "synthetic"))
232 }
233
234 #[test]
235 fn valkey_kind_direct_and_context() {
236 assert_eq!(
237 SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).valkey_kind(),
238 Some(ErrorKind::IoError)
239 );
240 assert_eq!(
241 SdkError::ValkeyContext {
242 source: mk_fk_err(ErrorKind::BusyLoadingError),
243 context: "connect".into()
244 }
245 .valkey_kind(),
246 Some(ErrorKind::BusyLoadingError)
247 );
248 }
249
250 #[test]
251 fn valkey_kind_delegates_through_script_transport() {
252 let err = SdkError::Script(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
253 assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
254 }
255
256 #[test]
257 fn valkey_kind_none_for_lua_and_config() {
258 assert_eq!(
259 SdkError::Script(ScriptError::LeaseExpired).valkey_kind(),
260 None
261 );
262 assert_eq!(SdkError::Config("bad host".into()).valkey_kind(), None);
263 }
264
265 #[test]
266 fn is_retryable_transport() {
267 assert!(SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
268 assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
269 assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
270 }
271
272 #[test]
273 fn is_retryable_script_delegates_to_class() {
274 // NoEligibleExecution is classified Retryable in ScriptError::class().
275 assert!(SdkError::Script(ScriptError::NoEligibleExecution).is_retryable());
276 // StaleLease is Terminal.
277 assert!(!SdkError::Script(ScriptError::StaleLease).is_retryable());
278 // Script::Valkey(IoError) is Retryable via class() delegation.
279 assert!(
280 SdkError::Script(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
281 );
282 }
283
284 #[test]
285 fn is_retryable_config_false() {
286 assert!(!SdkError::Config("at least one lane is required".into()).is_retryable());
287 }
288
289 #[test]
290 fn is_retryable_admin_api_uses_server_hint_when_present() {
291 let err = SdkError::AdminApi {
292 status: 429,
293 message: "throttled".into(),
294 kind: None,
295 retryable: Some(false),
296 raw_body: String::new(),
297 };
298 assert!(!err.is_retryable());
299
300 let err = SdkError::AdminApi {
301 status: 500,
302 message: "valkey timeout".into(),
303 kind: Some("IoError".into()),
304 retryable: Some(true),
305 raw_body: String::new(),
306 };
307 assert!(err.is_retryable());
308 }
309
310 #[test]
311 fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
312 // 502 covers ALB/nginx Bad Gateway transients on ff-server
313 // restart — same retry-is-safe as 503/504 because rotation
314 // is idempotent server-side.
315 for s in [429u16, 502, 503, 504] {
316 let err = SdkError::AdminApi {
317 status: s,
318 message: "x".into(),
319 kind: None,
320 retryable: None,
321 raw_body: String::new(),
322 };
323 assert!(err.is_retryable(), "status {s} should be retryable");
324 }
325 for s in [400u16, 401, 403, 404, 500] {
326 let err = SdkError::AdminApi {
327 status: s,
328 message: "x".into(),
329 kind: None,
330 retryable: None,
331 raw_body: String::new(),
332 };
333 assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
334 }
335 }
336
337 #[test]
338 fn valkey_kind_none_for_admin_surface() {
339 let err = SdkError::AdminApi {
340 status: 500,
341 message: "x".into(),
342 kind: Some("IoError".into()),
343 retryable: Some(true),
344 raw_body: String::new(),
345 };
346 assert_eq!(err.valkey_kind(), None);
347 }
348}