ff_sdk/lib.rs
1//! FlowFabric Worker SDK — public API for worker authors.
2//!
3//! This crate depends on `ff-script` for the Lua-function types, Lua error
4//! kinds (`ScriptError`), and retry helpers (`is_retryable_kind`,
5//! `kind_to_stable_str`). Consumers using `ff-sdk` do not need to import
6//! `ff-script` directly for normal worker operations, but can if they need
7//! the `ScriptError` or retry types.
8//!
9//! # Quick start
10//!
11//! The production claim path is
12//! [`FlowFabricWorker::claim_from_grant`]: obtain a
13//! [`ClaimGrant`] from `ff_scheduler::Scheduler::claim_for_worker`
14//! (the scheduler enforces budget, quota, and capability checks),
15//! then hand it to the SDK. `claim_next` is gated behind the
16//! default-off `direct-valkey-claim` feature and bypasses admission
17//! control — fine for benchmarks, not production.
18//!
19//! ```rust,ignore
20//! use ff_sdk::{FlowFabricWorker, WorkerConfig};
21//! use ff_core::types::LaneId;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), ff_sdk::SdkError> {
25//! let config = WorkerConfig::new(
26//! "localhost",
27//! 6379,
28//! "my-worker",
29//! "my-worker-instance-1",
30//! "default",
31//! "main",
32//! );
33//!
34//! let worker = FlowFabricWorker::connect(config).await?;
35//! let lane = LaneId::new("main");
36//!
37//! // In a real deployment `grant` is obtained from the
38//! // scheduler's `claim_for_worker` RPC/helper; it carries the
39//! // execution id, capability match, and admission result.
40//! # let grant: ff_core::contracts::ClaimGrant = unimplemented!();
41//! let task = worker.claim_from_grant(lane, grant).await?;
42//! println!("claimed: {}", task.execution_id());
43//! // Process task...
44//! task.complete(Some(b"done".to_vec())).await?;
45//! Ok(())
46//! }
47//! ```
48//!
49//! # Migration: `direct-valkey-claim` → scheduler-issued grants
50//!
51//! The `direct-valkey-claim` cargo feature — which gates
52//! [`FlowFabricWorker::claim_next`] — is **deprecated** in favour of
53//! the pair of scheduler-issued grant entry points:
54//!
55//! * [`FlowFabricWorker::claim_from_grant`] — fresh claims. Use
56//! `ff_scheduler::Scheduler::claim_for_worker` to obtain the
57//! [`ClaimGrant`], then hand it to the SDK.
58//! * [`FlowFabricWorker::claim_from_reclaim_grant`] — resumed claims
59//! for an `attempt_interrupted` execution. Wraps a
60//! [`ReclaimGrant`].
61//!
62//! `claim_next` bypasses budget and quota admission control; the
63//! grant-based path does not. See each method's rustdoc for the
64//! exact migration recipe.
65//!
66//! [`ClaimGrant`]: ff_core::contracts::ClaimGrant
67//! [`ReclaimGrant`]: ff_core::contracts::ReclaimGrant
68
69#[cfg(feature = "valkey-default")]
70pub mod admin;
71pub mod config;
72pub mod engine_error;
73#[cfg(feature = "valkey-default")]
74pub mod snapshot;
75#[cfg(feature = "valkey-default")]
76pub mod task;
77#[cfg(feature = "valkey-default")]
78pub mod worker;
79
80// Re-exports for convenience
81#[cfg(feature = "valkey-default")]
82pub use admin::{
83 rotate_waitpoint_hmac_secret_all_partitions, FlowFabricAdminClient, PartitionRotationOutcome,
84 RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
85};
86pub use config::WorkerConfig;
87pub use engine_error::{
88 BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
89};
90// `FailOutcome` is ff-core-native (Stage 1a move); re-export
91// unconditionally so consumers can name `ff_sdk::FailOutcome` even
92// under `--no-default-features`.
93pub use ff_core::backend::FailOutcome;
94// `ResumeSignal` is also ff-core-native (Stage 0 move).
95pub use ff_core::backend::ResumeSignal;
96#[cfg(feature = "valkey-default")]
97pub use task::{
98 read_stream, tail_stream, AppendFrameOutcome, ClaimedTask, ConditionMatcher, Signal,
99 SignalOutcome, StreamCursor, StreamFrames, SuspendOutcome, TimeoutBehavior,
100 MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
101};
102#[cfg(feature = "valkey-default")]
103pub use worker::FlowFabricWorker;
104
105/// SDK error type.
106#[derive(Debug, thiserror::Error)]
107pub enum SdkError {
108 /// Valkey connection or command error (preserves ErrorKind for caller inspection).
109 #[cfg(feature = "valkey-default")]
110 #[error("valkey: {0}")]
111 Valkey(#[from] ferriskey::Error),
112
113 /// Valkey error with additional context.
114 #[cfg(feature = "valkey-default")]
115 #[error("valkey: {context}: {source}")]
116 ValkeyContext {
117 #[source]
118 source: ferriskey::Error,
119 context: String,
120 },
121
122 /// FlowFabric engine error — typed sum over Lua error codes + transport
123 /// faults. See [`EngineError`] for the variant-granularity contract.
124 /// Replaces the previous `Script(ScriptError)` carrier (#58.6).
125 ///
126 /// `Box`ed to keep `SdkError`'s stack footprint small: the richest
127 /// variant (`ConflictKind::DependencyAlreadyExists { existing:
128 /// EdgeSnapshot }`) is ~200 bytes. Boxing keeps `Result<T, SdkError>`
129 /// at the same width every other variant pays.
130 #[error("engine: {0}")]
131 Engine(Box<EngineError>),
132
133 /// Configuration error. `context` identifies the call site / logical
134 /// operation (e.g. `"describe_execution: exec_core"`, `"admin_client"`).
135 /// `field` names the specific offending field when the error is
136 /// field-scoped (e.g. `Some("public_state")`), or `None` for
137 /// whole-object validation (e.g. `"at least one lane is required"`).
138 /// `message` carries dynamic detail: source-error rendering, the
139 /// offending raw value, etc.
140 #[error("{}", fmt_config(.context, .field.as_deref(), .message))]
141 Config {
142 context: String,
143 field: Option<String>,
144 message: String,
145 },
146
147 /// Worker is at its configured `max_concurrent_tasks` capacity —
148 /// the caller should retry later. Returned by
149 /// [`FlowFabricWorker::claim_from_grant`] and
150 /// [`FlowFabricWorker::claim_from_reclaim_grant`] when the
151 /// concurrency semaphore is saturated. Distinct from `Ok(None)`:
152 /// a `ClaimGrant`/`ReclaimGrant` represents real work already
153 /// selected by the scheduler, so silently dropping it would waste
154 /// the grant and let the grant TTL elapse. Surfacing the
155 /// saturation lets the caller release the grant (or wait +
156 /// retry).
157 ///
158 /// # Classification
159 ///
160 /// * [`SdkError::is_retryable`] returns `true` — saturation is
161 /// transient: any in-flight task's
162 /// complete/fail/cancel/drop releases a permit. Retry after
163 /// milliseconds, not a retry loop with backoff for a Valkey
164 /// transport failure.
165 /// * [`SdkError::valkey_kind`] returns `None` — this is not a
166 /// Valkey transport or Lua error, so there is no
167 /// `ferriskey::ErrorKind` to inspect. Callers that fan out
168 /// on `valkey_kind()` should match `WorkerAtCapacity`
169 /// explicitly (or use `is_retryable()`).
170 ///
171 /// [`FlowFabricWorker::claim_from_grant`]: crate::FlowFabricWorker::claim_from_grant
172 /// [`FlowFabricWorker::claim_from_reclaim_grant`]: crate::FlowFabricWorker::claim_from_reclaim_grant
173 #[error("worker at capacity: max_concurrent_tasks reached")]
174 WorkerAtCapacity,
175
176 /// HTTP transport error from the admin REST surface. Carries
177 /// the underlying `reqwest::Error` via `#[source]` so callers
178 /// can inspect `is_timeout()` / `is_connect()` / etc. for
179 /// finer-grained retry logic. Distinct from
180 /// [`SdkError::Valkey`] — this fires on the HTTP/JSON surface,
181 /// not on the Lua/Valkey hot path.
182 #[error("http: {context}: {source}")]
183 Http {
184 #[source]
185 source: reqwest::Error,
186 context: String,
187 },
188
189 /// The admin REST endpoint returned a non-2xx response.
190 ///
191 /// Fields surface the server-side `ErrorBody` JSON shape
192 /// (`{ error, kind?, retryable? }`) as structured values so
193 /// cairn-fabric and other consumers can match without
194 /// re-parsing the body:
195 ///
196 /// * `status` — HTTP status code.
197 /// * `message` — the `error` string from the JSON body (or
198 /// the raw body if it didn't parse as JSON).
199 /// * `kind` — server-supplied Valkey `ErrorKind` label for 5xxs
200 /// backed by a transport error; `None` for 4xxs.
201 /// * `retryable` — server-supplied hint; `None` for 4xxs.
202 /// * `raw_body` — the full response body, preserved for logging
203 /// when the JSON shape doesn't match.
204 #[error("admin api: {status}: {message}")]
205 AdminApi {
206 status: u16,
207 message: String,
208 kind: Option<String>,
209 retryable: Option<bool>,
210 raw_body: String,
211 },
212}
213
214/// Renders `SdkError::Config` as `config: <context>[.<field>]: <message>`.
215/// The `field` slot is omitted when `None` (whole-object validation).
216fn fmt_config(context: &str, field: Option<&str>, message: &str) -> String {
217 match field {
218 Some(f) => format!("config: {context}.{f}: {message}"),
219 None => format!("config: {context}: {message}"),
220 }
221}
222
223/// Preserves the ergonomic `?`-propagation from FCALL sites that
224/// return `Result<_, ScriptError>`. Routes through `EngineError`'s
225/// typed classification so every call site gets the same
226/// variant-level detail without hand-written conversion.
227impl From<ff_script::error::ScriptError> for SdkError {
228 fn from(err: ff_script::error::ScriptError) -> Self {
229 // ff-script's `From<ScriptError> for EngineError` owns the
230 // mapping table (#58.6). See `ff_script::engine_error_ext`.
231 Self::Engine(Box::new(EngineError::from(err)))
232 }
233}
234
235impl From<EngineError> for SdkError {
236 fn from(err: EngineError) -> Self {
237 Self::Engine(Box::new(err))
238 }
239}
240
241impl SdkError {
242 /// Returns the underlying ferriskey `ErrorKind` if this error carries one.
243 /// Covers transport variants (`Valkey`, `ValkeyContext`) directly and
244 /// `Engine(EngineError::Transport { .. })` via delegation.
245 #[cfg(feature = "valkey-default")]
246 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
247 match self {
248 Self::Valkey(e) => Some(e.kind()),
249 Self::ValkeyContext { source, .. } => Some(source.kind()),
250 Self::Engine(e) => ff_script::engine_error_ext::valkey_kind(e),
251 // HTTP/admin-surface errors carry no ferriskey::ErrorKind;
252 // the admin path never touches Valkey directly from the
253 // SDK side. Use `AdminApi.kind` for the server-supplied
254 // label when present.
255 Self::Config { .. } | Self::WorkerAtCapacity | Self::Http { .. } | Self::AdminApi { .. } => {
256 None
257 }
258 }
259 }
260
261 /// Whether this error is safely retryable by a caller. For transport
262 /// variants, delegates to [`ff_script::retry::is_retryable_kind`]. For
263 /// `Engine` errors, returns `true` iff the typed classification is
264 /// `ErrorClass::Retryable`. `Config` errors are never retryable.
265 pub fn is_retryable(&self) -> bool {
266 match self {
267 #[cfg(feature = "valkey-default")]
268 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
269 ff_script::retry::is_retryable_kind(e.kind())
270 }
271 Self::Engine(e) => {
272 matches!(
273 ff_script::engine_error_ext::class(e),
274 ff_core::error::ErrorClass::Retryable
275 )
276 }
277 // WorkerAtCapacity is retryable: the saturation is transient
278 // and clears as soon as a concurrent task completes.
279 Self::WorkerAtCapacity => true,
280 // HTTP transport: timeouts and connect failures are
281 // retryable (transient network state); body-decode or
282 // request-build errors are terminal (caller must fix
283 // the code). `reqwest::Error` exposes both predicates.
284 Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
285 // Admin API errors: trust the server's `retryable` hint
286 // when present; otherwise fall back to the HTTP-standard
287 // retryable-status set (429, 502, 503, 504). 5xxs without
288 // a hint are conservatively non-retryable — the caller can
289 // override with `AdminApi.status`-based logic if needed.
290 // 502 covers reverse-proxy transients (ALB/nginx returning
291 // Bad Gateway when ff-server restarts mid-request).
292 Self::AdminApi {
293 status, retryable, ..
294 } => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
295 Self::Config { .. } => false,
296 }
297 }
298}
299
300#[cfg(all(test, feature = "valkey-default"))]
301mod tests {
302 use super::*;
303 use ferriskey::ErrorKind;
304 use ff_script::error::ScriptError;
305
306 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
307 ferriskey::Error::from((kind, "synthetic"))
308 }
309
310 #[test]
311 fn valkey_kind_direct_and_context() {
312 assert_eq!(
313 SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).valkey_kind(),
314 Some(ErrorKind::IoError)
315 );
316 assert_eq!(
317 SdkError::ValkeyContext {
318 source: mk_fk_err(ErrorKind::BusyLoadingError),
319 context: "connect".into()
320 }
321 .valkey_kind(),
322 Some(ErrorKind::BusyLoadingError)
323 );
324 }
325
326 #[test]
327 fn valkey_kind_delegates_through_engine_transport() {
328 let err = SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
329 assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
330 }
331
332 #[test]
333 fn valkey_kind_none_for_lua_and_config() {
334 assert_eq!(
335 SdkError::from(ScriptError::LeaseExpired).valkey_kind(),
336 None
337 );
338 assert_eq!(
339 SdkError::Config {
340 context: "worker_config".into(),
341 field: Some("bearer_token".into()),
342 message: "bad host".into(),
343 }
344 .valkey_kind(),
345 None
346 );
347 }
348
349 #[test]
350 fn is_retryable_transport() {
351 assert!(SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
352 assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
353 assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
354 }
355
356 #[test]
357 fn is_retryable_engine_delegates_to_class() {
358 // NoEligibleExecution is classified Retryable via EngineError::class().
359 assert!(SdkError::from(ScriptError::NoEligibleExecution).is_retryable());
360 // StaleLease is Terminal.
361 assert!(!SdkError::from(ScriptError::StaleLease).is_retryable());
362 // Transport(Valkey(IoError)) is Retryable via class() delegation.
363 assert!(
364 SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
365 );
366 }
367
368 /// Regression (#98): `SdkError::Config` carries `context`, optional
369 /// `field`, and `message` separately so consumers can pattern-match on
370 /// the offending field without parsing the Display string. Test covers
371 /// both the field-scoped and whole-object renderings.
372 #[test]
373 fn config_structured_fields_render_and_match() {
374 let with_field = SdkError::Config {
375 context: "admin_client".into(),
376 field: Some("bearer_token".into()),
377 message: "is empty or all-whitespace".into(),
378 };
379 assert_eq!(
380 with_field.to_string(),
381 "config: admin_client.bearer_token: is empty or all-whitespace"
382 );
383 assert!(matches!(
384 &with_field,
385 SdkError::Config { field: Some(f), .. } if f == "bearer_token"
386 ));
387
388 let whole_object = SdkError::Config {
389 context: "worker_config".into(),
390 field: None,
391 message: "at least one lane is required".into(),
392 };
393 assert_eq!(
394 whole_object.to_string(),
395 "config: worker_config: at least one lane is required"
396 );
397 assert!(matches!(
398 &whole_object,
399 SdkError::Config { field: None, .. }
400 ));
401 }
402
403 #[test]
404 fn is_retryable_config_false() {
405 assert!(
406 !SdkError::Config {
407 context: "worker_config".into(),
408 field: None,
409 message: "at least one lane is required".into(),
410 }
411 .is_retryable()
412 );
413 }
414
415 #[test]
416 fn is_retryable_admin_api_uses_server_hint_when_present() {
417 let err = SdkError::AdminApi {
418 status: 429,
419 message: "throttled".into(),
420 kind: None,
421 retryable: Some(false),
422 raw_body: String::new(),
423 };
424 assert!(!err.is_retryable());
425
426 let err = SdkError::AdminApi {
427 status: 500,
428 message: "valkey timeout".into(),
429 kind: Some("IoError".into()),
430 retryable: Some(true),
431 raw_body: String::new(),
432 };
433 assert!(err.is_retryable());
434 }
435
436 #[test]
437 fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
438 // 502 covers ALB/nginx Bad Gateway transients on ff-server
439 // restart — same retry-is-safe as 503/504 because rotation
440 // is idempotent server-side.
441 for s in [429u16, 502, 503, 504] {
442 let err = SdkError::AdminApi {
443 status: s,
444 message: "x".into(),
445 kind: None,
446 retryable: None,
447 raw_body: String::new(),
448 };
449 assert!(err.is_retryable(), "status {s} should be retryable");
450 }
451 for s in [400u16, 401, 403, 404, 500] {
452 let err = SdkError::AdminApi {
453 status: s,
454 message: "x".into(),
455 kind: None,
456 retryable: None,
457 raw_body: String::new(),
458 };
459 assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
460 }
461 }
462
463 #[test]
464 fn valkey_kind_none_for_admin_surface() {
465 let err = SdkError::AdminApi {
466 status: 500,
467 message: "x".into(),
468 kind: Some("IoError".into()),
469 retryable: Some(true),
470 raw_body: String::new(),
471 };
472 assert_eq!(err.valkey_kind(), None);
473 }
474}