ff_sdk/lib.rs
1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! ```rust,ignore
12//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
13//! use std::time::Duration;
14//!
15//! #[tokio::main]
16//! async fn main() -> Result<(), ff_sdk::SdkError> {
17//! let config = WorkerConfig::new(
18//! "localhost",
19//! 6379,
20//! "my-worker",
21//! "my-worker-instance-1",
22//! "default",
23//! "main",
24//! );
25//!
26//! let worker = FlowFabricWorker::connect(config).await?;
27//!
28//! loop {
29//! match worker.claim_next().await? {
30//! Some(task) => {
31//! println!("claimed: {}", task.execution_id());
32//! // Process task...
33//! task.complete(Some(b"done".to_vec())).await?;
34//! }
35//! None => {
36//! tokio::time::sleep(Duration::from_secs(1)).await;
37//! }
38//! }
39//! }
40//! }
41//! ```
42//!
43//! # Migration: `direct-valkey-claim` → scheduler-issued grants
44//!
45//! The `direct-valkey-claim` cargo feature — which gates
46//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
47//! the pair of scheduler-issued grant entry points:
48//!
49//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
50//! `ff_scheduler::Scheduler::claim_for_worker` to obtain the
51//! [`ClaimGrant`], then hand it to the SDK.
52//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
53//! for an `attempt_interrupted` execution. Wraps a
54//! [`ReclaimGrant`].
55//!
56//! `claim_next` bypasses budget and quota admission control; the
57//! grant-based path does not. See each method's rustdoc for the
58//! exact migration recipe.
59//!
60//! [`ClaimGrant`]: ff_core::contracts::ClaimGrant
61//! [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
62
63#[cfg(feature = "valkey-default")]
64pub mod admin;
65pub mod config;
66pub mod engine_error;
67#[cfg(feature = "valkey-default")]
68pub mod snapshot;
69#[cfg(feature = "valkey-default")]
70pub mod task;
71#[cfg(feature = "valkey-default")]
72pub mod worker;
73
74// Re-exports for convenience
75#[cfg(feature = "valkey-default")]
76pub use admin::{
77 rotate_waitpoint_hmac_secret_all_partitions, FlowFabricAdminClient, PartitionRotationOutcome,
78 RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
79};
80pub use config::WorkerConfig;
81pub use engine_error::{
82 BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
83};
84// `FailOutcome` is ff-core-native (Stage 1a move); re-export
85// unconditionally so consumers can name `ff_sdk::FailOutcome` even
86// under `--no-default-features`.
87pub use ff_core::backend::FailOutcome;
88// `ResumeSignal` is also ff-core-native (Stage 0 move).
89pub use ff_core::backend::ResumeSignal;
90#[cfg(feature = "valkey-default")]
91pub use task::{
92 read_stream, tail_stream, AppendFrameOutcome, ClaimedTask, ConditionMatcher, Signal,
93 SignalOutcome, StreamCursor, StreamFrames, SuspendOutcome, TimeoutBehavior,
94 MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
95};
96#[cfg(feature = "valkey-default")]
97pub use worker::FlowFabricWorker;
98
99/// SDK error type.
100#[derive(Debug, thiserror::Error)]
101pub enum SdkError {
102 /// Valkey connection or command error (preserves ErrorKind for caller inspection).
103 #[cfg(feature = "valkey-default")]
104 #[error("valkey: {0}")]
105 Valkey(#[from] ferriskey::Error),
106
107 /// Valkey error with additional context.
108 #[cfg(feature = "valkey-default")]
109 #[error("valkey: {context}: {source}")]
110 ValkeyContext {
111 #[source]
112 source: ferriskey::Error,
113 context: String,
114 },
115
116 /// FlowFabric engine error — typed sum over Lua error codes + transport
117 /// faults. See [`EngineError`] for the variant-granularity contract.
118 /// Replaces the previous `Script(ScriptError)` carrier (#58.6).
119 ///
120 /// `Box`ed to keep `SdkError`'s stack footprint small: the richest
121 /// variant (`ConflictKind::DependencyAlreadyExists { existing:
122 /// EdgeSnapshot }`) is ~200 bytes. Boxing keeps `Result<T, SdkError>`
123 /// at the same width every other variant pays.
124 #[error("engine: {0}")]
125 Engine(Box<EngineError>),
126
127 /// Configuration error. `context` identifies the call site / logical
128 /// operation (e.g. `"describe_execution: exec_core"`, `"admin_client"`).
129 /// `field` names the specific offending field when the error is
130 /// field-scoped (e.g. `Some("public_state")`), or `None` for
131 /// whole-object validation (e.g. `"at least one lane is required"`).
132 /// `message` carries dynamic detail: source-error rendering, the
133 /// offending raw value, etc.
134 #[error("{}", fmt_config(.context, .field.as_deref(), .message))]
135 Config {
136 context: String,
137 field: Option<String>,
138 message: String,
139 },
140
141 /// Worker is at its configured `max_concurrent_tasks` capacity —
142 /// the caller should retry later. Returned by
143 /// [`FlowFabricWorker::claim_from_grant`] and
144 /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
145 /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
146 /// a `ClaimGrant`/`ReclaimGrant` represents real work already
147 /// selected by the scheduler, so silently dropping it would waste
148 /// the grant and let the grant TTL elapse. Surfacing the
149 /// saturation lets the caller release the grant (or wait +
150 /// retry).
151 ///
152 /// # Classification
153 ///
154 /// * [`SdkError::is_retryable`] returns `true` — saturation is
155 /// transient: any in-flight task's
156 /// complete/fail/cancel/drop releases a permit. Retry after
157 /// milliseconds, not a retry loop with backoff for a Valkey
158 /// transport failure.
159 /// * [`SdkError::valkey_kind`] returns `None` — this is not a
160 /// Valkey transport or Lua error, so there is no
161 /// `ferriskey::ErrorKind` to inspect. Callers that fan out
162 /// on `valkey_kind()` should match `WorkerAtCapacity`
163 /// explicitly (or use `is_retryable()`).
164 ///
165 /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
166 /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
167 #[error("worker at capacity: max_concurrent_tasks reached")]
168 WorkerAtCapacity,
169
170 /// HTTP transport error from the admin REST surface. Carries
171 /// the underlying `reqwest::Error` via `#[source]` so callers
172 /// can inspect `is_timeout()` / `is_connect()` / etc. for
173 /// finer-grained retry logic. Distinct from
174 /// [`SdkError::Valkey`] — this fires on the HTTP/JSON surface,
175 /// not on the Lua/Valkey hot path.
176 #[error("http: {context}: {source}")]
177 Http {
178 #[source]
179 source: reqwest::Error,
180 context: String,
181 },
182
183 /// The admin REST endpoint returned a non-2xx response.
184 ///
185 /// Fields surface the server-side `ErrorBody` JSON shape
186 /// (`{ error, kind?, retryable? }`) as structured values so
187 /// cairn-fabric and other consumers can match without
188 /// re-parsing the body:
189 ///
190 /// * `status` — HTTP status code.
191 /// * `message` — the `error` string from the JSON body (or
192 /// the raw body if it didn't parse as JSON).
193 /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
194 /// backed by a transport error; `None` for 4xxs.
195 /// * `retryable` — server-supplied hint; `None` for 4xxs.
196 /// * `raw_body` — the full response body, preserved for logging
197 /// when the JSON shape doesn't match.
198 #[error("admin api: {status}: {message}")]
199 AdminApi {
200 status: u16,
201 message: String,
202 kind: Option<String>,
203 retryable: Option<bool>,
204 raw_body: String,
205 },
206}
207
208/// Renders `SdkError::Config` as `config: <context>[.<field>]: <message>`.
209/// The `field` slot is omitted when `None` (whole-object validation).
210fn fmt_config(context: &str, field: Option<&str>, message: &str) -> String {
211 match field {
212 Some(f) => format!("config: {context}.{f}: {message}"),
213 None => format!("config: {context}: {message}"),
214 }
215}
216
217/// Preserves the ergonomic `?`-propagation from FCALL sites that
218/// return `Result<_, ScriptError>`. Routes through `EngineError`'s
219/// typed classification so every call site gets the same
220/// variant-level detail without hand-written conversion.
221impl From<ff_script::error::ScriptError> for SdkError {
222 fn from(err: ff_script::error::ScriptError) -> Self {
223 // ff-script's `From<ScriptError> for EngineError` owns the
224 // mapping table (#58.6). See `ff_script::engine_error_ext`.
225 Self::Engine(Box::new(EngineError::from(err)))
226 }
227}
228
229impl From<EngineError> for SdkError {
230 fn from(err: EngineError) -> Self {
231 Self::Engine(Box::new(err))
232 }
233}
234
235impl SdkError {
236 /// Returns the underlying ferriskey `ErrorKind` if this error carries one.
237 /// Covers transport variants (`Valkey`, `ValkeyContext`) directly and
238 /// `Engine(EngineError::Transport { .. })` via delegation.
239 #[cfg(feature = "valkey-default")]
240 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
241 match self {
242 Self::Valkey(e) => Some(e.kind()),
243 Self::ValkeyContext { source, .. } => Some(source.kind()),
244 Self::Engine(e) => ff_script::engine_error_ext::valkey_kind(e),
245 // HTTP/admin-surface errors carry no ferriskey::ErrorKind;
246 // the admin path never touches Valkey directly from the
247 // SDK side. Use `AdminApi.kind` for the server-supplied
248 // label when present.
249 Self::Config { .. } | Self::WorkerAtCapacity | Self::Http { .. } | Self::AdminApi { .. } => {
250 None
251 }
252 }
253 }
254
255 /// Whether this error is safely retryable by a caller. For transport
256 /// variants, delegates to [`ff_script::retry::is_retryable_kind`]. For
257 /// `Engine` errors, returns `true` iff the typed classification is
258 /// `ErrorClass::Retryable`. `Config` errors are never retryable.
259 pub fn is_retryable(&self) -> bool {
260 match self {
261 #[cfg(feature = "valkey-default")]
262 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
263 ff_script::retry::is_retryable_kind(e.kind())
264 }
265 Self::Engine(e) => {
266 matches!(
267 ff_script::engine_error_ext::class(e),
268 ff_core::error::ErrorClass::Retryable
269 )
270 }
271 // WorkerAtCapacity is retryable: the saturation is transient
272 // and clears as soon as a concurrent task completes.
273 Self::WorkerAtCapacity => true,
274 // HTTP transport: timeouts and connect failures are
275 // retryable (transient network state); body-decode or
276 // request-build errors are terminal (caller must fix
277 // the code). `reqwest::Error` exposes both predicates.
278 Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
279 // Admin API errors: trust the server's `retryable` hint
280 // when present; otherwise fall back to the HTTP-standard
281 // retryable-status set (429, 502, 503, 504). 5xxs without
282 // a hint are conservatively non-retryable — the caller can
283 // override with `AdminApi.status`-based logic if needed.
284 // 502 covers reverse-proxy transients (ALB/nginx returning
285 // Bad Gateway when ff-server restarts mid-request).
286 Self::AdminApi {
287 status, retryable, ..
288 } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
289 Self::Config { .. } => false,
290 }
291 }
292}
293
294#[cfg(all(test, feature = "valkey-default"))]
295mod tests {
296 use super::*;
297 use ferriskey::ErrorKind;
298 use ff_script::error::ScriptError;
299
300 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
301 ferriskey::Error::from((kind, "synthetic"))
302 }
303
304 #[test]
305 fn valkey_kind_direct_and_context() {
306 assert_eq!(
307 SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).valkey_kind(),
308 Some(ErrorKind::IoError)
309 );
310 assert_eq!(
311 SdkError::ValkeyContext {
312 source: mk_fk_err(ErrorKind::BusyLoadingError),
313 context: "connect".into()
314 }
315 .valkey_kind(),
316 Some(ErrorKind::BusyLoadingError)
317 );
318 }
319
320 #[test]
321 fn valkey_kind_delegates_through_engine_transport() {
322 let err = SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
323 assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
324 }
325
326 #[test]
327 fn valkey_kind_none_for_lua_and_config() {
328 assert_eq!(
329 SdkError::from(ScriptError::LeaseExpired).valkey_kind(),
330 None
331 );
332 assert_eq!(
333 SdkError::Config {
334 context: "worker_config".into(),
335 field: Some("bearer_token".into()),
336 message: "bad host".into(),
337 }
338 .valkey_kind(),
339 None
340 );
341 }
342
343 #[test]
344 fn is_retryable_transport() {
345 assert!(SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
346 assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
347 assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
348 }
349
350 #[test]
351 fn is_retryable_engine_delegates_to_class() {
352 // NoEligibleExecution is classified Retryable via EngineError::class().
353 assert!(SdkError::from(ScriptError::NoEligibleExecution).is_retryable());
354 // StaleLease is Terminal.
355 assert!(!SdkError::from(ScriptError::StaleLease).is_retryable());
356 // Transport(Valkey(IoError)) is Retryable via class() delegation.
357 assert!(
358 SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
359 );
360 }
361
362 /// Regression (#98): `SdkError::Config` carries `context`, optional
363 /// `field`, and `message` separately so consumers can pattern-match on
364 /// the offending field without parsing the Display string. Test covers
365 /// both the field-scoped and whole-object renderings.
366 #[test]
367 fn config_structured_fields_render_and_match() {
368 let with_field = SdkError::Config {
369 context: "admin_client".into(),
370 field: Some("bearer_token".into()),
371 message: "is empty or all-whitespace".into(),
372 };
373 assert_eq!(
374 with_field.to_string(),
375 "config: admin_client.bearer_token: is empty or all-whitespace"
376 );
377 assert!(matches!(
378 &with_field,
379 SdkError::Config { field: Some(f), .. } if f == "bearer_token"
380 ));
381
382 let whole_object = SdkError::Config {
383 context: "worker_config".into(),
384 field: None,
385 message: "at least one lane is required".into(),
386 };
387 assert_eq!(
388 whole_object.to_string(),
389 "config: worker_config: at least one lane is required"
390 );
391 assert!(matches!(
392 &whole_object,
393 SdkError::Config { field: None, .. }
394 ));
395 }
396
397 #[test]
398 fn is_retryable_config_false() {
399 assert!(
400 !SdkError::Config {
401 context: "worker_config".into(),
402 field: None,
403 message: "at least one lane is required".into(),
404 }
405 .is_retryable()
406 );
407 }
408
409 #[test]
410 fn is_retryable_admin_api_uses_server_hint_when_present() {
411 let err = SdkError::AdminApi {
412 status: 429,
413 message: "throttled".into(),
414 kind: None,
415 retryable: Some(false),
416 raw_body: String::new(),
417 };
418 assert!(!err.is_retryable());
419
420 let err = SdkError::AdminApi {
421 status: 500,
422 message: "valkey timeout".into(),
423 kind: Some("IoError".into()),
424 retryable: Some(true),
425 raw_body: String::new(),
426 };
427 assert!(err.is_retryable());
428 }
429
430 #[test]
431 fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
432 // 502 covers ALB/nginx Bad Gateway transients on ff-server
433 // restart — same retry-is-safe as 503/504 because rotation
434 // is idempotent server-side.
435 for s in [429u16, 502, 503, 504] {
436 let err = SdkError::AdminApi {
437 status: s,
438 message: "x".into(),
439 kind: None,
440 retryable: None,
441 raw_body: String::new(),
442 };
443 assert!(err.is_retryable(), "status {s} should be retryable");
444 }
445 for s in [400u16, 401, 403, 404, 500] {
446 let err = SdkError::AdminApi {
447 status: s,
448 message: "x".into(),
449 kind: None,
450 retryable: None,
451 raw_body: String::new(),
452 };
453 assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
454 }
455 }
456
457 #[test]
458 fn valkey_kind_none_for_admin_surface() {
459 let err = SdkError::AdminApi {
460 status: 500,
461 message: "x".into(),
462 kind: Some("IoError".into()),
463 retryable: Some(true),
464 raw_body: String::new(),
465 };
466 assert_eq!(err.valkey_kind(), None);
467 }
468}