#[cfg(feature = "valkey-default")]
pub mod admin;
pub mod config;
pub mod engine_error;
#[cfg(any(
feature = "layer-tracing",
feature = "layer-ratelimit",
feature = "layer-metrics",
feature = "layer-circuit-breaker",
))]
pub mod layer;
#[cfg(feature = "valkey-default")]
pub mod snapshot;
#[cfg(feature = "valkey-default")]
pub mod task;
#[cfg(feature = "valkey-default")]
pub mod worker;
#[cfg(feature = "valkey-default")]
pub use admin::{
rotate_waitpoint_hmac_secret_all_partitions, FlowFabricAdminClient, PartitionRotationOutcome,
RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
};
pub use config::WorkerConfig;
pub use engine_error::{
BugKind, ConflictKind, ContentionKind, EngineError, StateKind, ValidationKind,
};
pub use ff_core::engine_error::{BackendError, BackendErrorKind};
pub use ff_core::backend::FailOutcome;
pub use ff_core::backend::ResumeSignal;
#[cfg(feature = "valkey-default")]
pub use task::{
read_stream, tail_stream, tail_stream_with_visibility, AppendFrameOutcome, ClaimedTask,
Signal, SignalOutcome, StreamCursor, StreamFrames, SuspendedHandle, TrySuspendOutcome,
MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
};
#[cfg(feature = "valkey-default")]
pub use ff_core::backend::{
PatchKind, StreamMode, SummaryDocument, TailVisibility, SUMMARY_NULL_SENTINEL,
};
#[cfg(feature = "valkey-default")]
pub use ff_core::contracts::{
CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
SuspensionRequester, TimeoutBehavior, WaitpointBinding,
};
#[cfg(feature = "valkey-default")]
pub use worker::FlowFabricWorker;
#[derive(Debug, thiserror::Error)]
pub enum SdkError {
#[error("backend: {0}")]
Backend(#[from] BackendError),
#[error("backend: {context}: {source}")]
BackendContext {
#[source]
source: BackendError,
context: String,
},
#[error("engine: {0}")]
Engine(Box<EngineError>),
#[error("{}", fmt_config(.context, .field.as_deref(), .message))]
Config {
context: String,
field: Option<String>,
message: String,
},
#[error("worker at capacity: max_concurrent_tasks reached")]
WorkerAtCapacity,
#[error("http: {context}: {source}")]
Http {
#[source]
source: reqwest::Error,
context: String,
},
#[error("admin api: {status}: {message}")]
AdminApi {
status: u16,
message: String,
kind: Option<String>,
retryable: Option<bool>,
raw_body: String,
},
}
fn fmt_config(context: &str, field: Option<&str>, message: &str) -> String {
match field {
Some(f) => format!("config: {context}.{f}: {message}"),
None => format!("config: {context}: {message}"),
}
}
#[cfg(feature = "valkey-default")]
impl From<ferriskey::Error> for SdkError {
fn from(err: ferriskey::Error) -> Self {
Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
}
}
#[cfg(feature = "valkey-default")]
pub(crate) fn backend_context(
err: ferriskey::Error,
context: impl Into<String>,
) -> SdkError {
SdkError::BackendContext {
source: ff_backend_valkey::backend_error_from_ferriskey(&err),
context: context.into(),
}
}
impl From<ff_script::error::ScriptError> for SdkError {
fn from(err: ff_script::error::ScriptError) -> Self {
Self::Engine(Box::new(EngineError::from(err)))
}
}
impl From<EngineError> for SdkError {
fn from(err: EngineError) -> Self {
Self::Engine(Box::new(err))
}
}
impl SdkError {
pub fn backend_kind(&self) -> Option<BackendErrorKind> {
match self {
Self::Backend(be) => Some(be.kind()),
Self::BackendContext { source, .. } => Some(source.kind()),
#[cfg(feature = "valkey-default")]
Self::Engine(e) => ff_script::engine_error_ext::valkey_kind(e)
.map(ff_backend_valkey::classify_ferriskey_kind),
#[cfg(not(feature = "valkey-default"))]
Self::Engine(_) => None,
Self::Config { .. }
| Self::WorkerAtCapacity
| Self::Http { .. }
| Self::AdminApi { .. } => None,
}
}
pub fn is_retryable(&self) -> bool {
match self {
Self::Backend(be) | Self::BackendContext { source: be, .. } => {
be.kind().is_retryable()
}
Self::Engine(e) => {
matches!(
ff_script::engine_error_ext::class(e),
ff_core::error::ErrorClass::Retryable
)
}
Self::WorkerAtCapacity => true,
Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
Self::AdminApi {
status, retryable, ..
} => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
Self::Config { .. } => false,
}
}
}
#[cfg(all(test, feature = "valkey-default"))]
mod tests {
use super::*;
use ferriskey::ErrorKind;
use ff_script::error::ScriptError;
fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
ferriskey::Error::from((kind, "synthetic"))
}
#[test]
fn backend_kind_direct_and_context() {
assert_eq!(
SdkError::from(mk_fk_err(ErrorKind::IoError)).backend_kind(),
Some(BackendErrorKind::Transport)
);
assert_eq!(
crate::backend_context(mk_fk_err(ErrorKind::BusyLoadingError), "connect")
.backend_kind(),
Some(BackendErrorKind::BusyLoading)
);
}
#[test]
fn backend_kind_delegates_through_engine_transport() {
let err = SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
assert_eq!(err.backend_kind(), Some(BackendErrorKind::Cluster));
}
#[test]
fn backend_kind_none_for_lua_and_config() {
assert_eq!(
SdkError::from(ScriptError::LeaseExpired).backend_kind(),
None
);
assert_eq!(
SdkError::Config {
context: "worker_config".into(),
field: Some("bearer_token".into()),
message: "bad host".into(),
}
.backend_kind(),
None
);
}
#[test]
fn is_retryable_transport() {
assert!(SdkError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
assert!(!SdkError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
assert!(!SdkError::from(mk_fk_err(ErrorKind::ResponseError)).is_retryable());
}
#[test]
fn is_retryable_engine_delegates_to_class() {
assert!(SdkError::from(ScriptError::NoEligibleExecution).is_retryable());
assert!(!SdkError::from(ScriptError::StaleLease).is_retryable());
assert!(
SdkError::from(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
);
}
#[test]
fn config_structured_fields_render_and_match() {
let with_field = SdkError::Config {
context: "admin_client".into(),
field: Some("bearer_token".into()),
message: "is empty or all-whitespace".into(),
};
assert_eq!(
with_field.to_string(),
"config: admin_client.bearer_token: is empty or all-whitespace"
);
assert!(matches!(
&with_field,
SdkError::Config { field: Some(f), .. } if f == "bearer_token"
));
let whole_object = SdkError::Config {
context: "worker_config".into(),
field: None,
message: "at least one lane is required".into(),
};
assert_eq!(
whole_object.to_string(),
"config: worker_config: at least one lane is required"
);
assert!(matches!(
&whole_object,
SdkError::Config { field: None, .. }
));
}
#[test]
fn is_retryable_config_false() {
assert!(
!SdkError::Config {
context: "worker_config".into(),
field: None,
message: "at least one lane is required".into(),
}
.is_retryable()
);
}
#[test]
fn is_retryable_admin_api_uses_server_hint_when_present() {
let err = SdkError::AdminApi {
status: 429,
message: "throttled".into(),
kind: None,
retryable: Some(false),
raw_body: String::new(),
};
assert!(!err.is_retryable());
let err = SdkError::AdminApi {
status: 500,
message: "valkey timeout".into(),
kind: Some("IoError".into()),
retryable: Some(true),
raw_body: String::new(),
};
assert!(err.is_retryable());
}
#[test]
fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
for s in [429u16, 502, 503, 504] {
let err = SdkError::AdminApi {
status: s,
message: "x".into(),
kind: None,
retryable: None,
raw_body: String::new(),
};
assert!(err.is_retryable(), "status {s} should be retryable");
}
for s in [400u16, 401, 403, 404, 500] {
let err = SdkError::AdminApi {
status: s,
message: "x".into(),
kind: None,
retryable: None,
raw_body: String::new(),
};
assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
}
}
#[test]
fn valkey_kind_none_for_admin_surface() {
let err = SdkError::AdminApi {
status: 500,
message: "x".into(),
kind: Some("IoError".into()),
retryable: Some(true),
raw_body: String::new(),
};
assert_eq!(err.backend_kind(), None);
}
}