pub mod admin;
pub mod config;
pub mod task;
pub mod worker;
pub use admin::{
FlowFabricAdminClient, RotateWaitpointSecretRequest, RotateWaitpointSecretResponse,
};
pub use config::WorkerConfig;
pub use task::{
read_stream, tail_stream, AppendFrameOutcome, ClaimedTask, ConditionMatcher, FailOutcome,
ResumeSignal, Signal, SignalOutcome, StreamFrames, SuspendOutcome, TimeoutBehavior,
MAX_TAIL_BLOCK_MS, STREAM_READ_HARD_CAP,
};
pub use worker::FlowFabricWorker;
#[derive(Debug, thiserror::Error)]
pub enum SdkError {
#[error("valkey: {0}")]
Valkey(#[from] ferriskey::Error),
#[error("valkey: {context}: {source}")]
ValkeyContext {
#[source]
source: ferriskey::Error,
context: String,
},
#[error("script: {0}")]
Script(#[from] ff_script::error::ScriptError),
#[error("config: {0}")]
Config(String),
#[error("worker at capacity: max_concurrent_tasks reached")]
WorkerAtCapacity,
#[error("http: {context}: {source}")]
Http {
#[source]
source: reqwest::Error,
context: String,
},
#[error("admin api: {status}: {message}")]
AdminApi {
status: u16,
message: String,
kind: Option<String>,
retryable: Option<bool>,
raw_body: String,
},
}
impl SdkError {
pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
match self {
Self::Valkey(e) => Some(e.kind()),
Self::ValkeyContext { source, .. } => Some(source.kind()),
Self::Script(e) => e.valkey_kind(),
Self::Config(_) | Self::WorkerAtCapacity | Self::Http { .. } | Self::AdminApi { .. } => {
None
}
}
}
pub fn is_retryable(&self) -> bool {
match self {
Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
ff_script::retry::is_retryable_kind(e.kind())
}
Self::Script(e) => {
matches!(e.class(), ff_core::error::ErrorClass::Retryable)
}
Self::WorkerAtCapacity => true,
Self::Http { source, .. } => source.is_timeout() || source.is_connect(),
Self::AdminApi {
status, retryable, ..
} => retryable.unwrap_or(matches!(*status, 429 | 502 | 503 | 504)),
Self::Config(_) => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ferriskey::ErrorKind;
use ff_script::error::ScriptError;
fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
ferriskey::Error::from((kind, "synthetic"))
}
#[test]
fn valkey_kind_direct_and_context() {
assert_eq!(
SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).valkey_kind(),
Some(ErrorKind::IoError)
);
assert_eq!(
SdkError::ValkeyContext {
source: mk_fk_err(ErrorKind::BusyLoadingError),
context: "connect".into()
}
.valkey_kind(),
Some(ErrorKind::BusyLoadingError)
);
}
#[test]
fn valkey_kind_delegates_through_script_transport() {
let err = SdkError::Script(ScriptError::Valkey(mk_fk_err(ErrorKind::ClusterDown)));
assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
}
#[test]
fn valkey_kind_none_for_lua_and_config() {
assert_eq!(
SdkError::Script(ScriptError::LeaseExpired).valkey_kind(),
None
);
assert_eq!(SdkError::Config("bad host".into()).valkey_kind(), None);
}
#[test]
fn is_retryable_transport() {
assert!(SdkError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
assert!(!SdkError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
}
#[test]
fn is_retryable_script_delegates_to_class() {
assert!(SdkError::Script(ScriptError::NoEligibleExecution).is_retryable());
assert!(!SdkError::Script(ScriptError::StaleLease).is_retryable());
assert!(
SdkError::Script(ScriptError::Valkey(mk_fk_err(ErrorKind::IoError))).is_retryable()
);
}
#[test]
fn is_retryable_config_false() {
assert!(!SdkError::Config("at least one lane is required".into()).is_retryable());
}
#[test]
fn is_retryable_admin_api_uses_server_hint_when_present() {
let err = SdkError::AdminApi {
status: 429,
message: "throttled".into(),
kind: None,
retryable: Some(false),
raw_body: String::new(),
};
assert!(!err.is_retryable());
let err = SdkError::AdminApi {
status: 500,
message: "valkey timeout".into(),
kind: Some("IoError".into()),
retryable: Some(true),
raw_body: String::new(),
};
assert!(err.is_retryable());
}
#[test]
fn is_retryable_admin_api_falls_back_to_standard_retryable_statuses() {
for s in [429u16, 502, 503, 504] {
let err = SdkError::AdminApi {
status: s,
message: "x".into(),
kind: None,
retryable: None,
raw_body: String::new(),
};
assert!(err.is_retryable(), "status {s} should be retryable");
}
for s in [400u16, 401, 403, 404, 500] {
let err = SdkError::AdminApi {
status: s,
message: "x".into(),
kind: None,
retryable: None,
raw_body: String::new(),
};
assert!(!err.is_retryable(), "status {s} should NOT be retryable without hint");
}
}
#[test]
fn valkey_kind_none_for_admin_surface() {
let err = SdkError::AdminApi {
status: 500,
message: "x".into(),
kind: Some("IoError".into()),
retryable: Some(true),
raw_body: String::new(),
};
assert_eq!(err.valkey_kind(), None);
}
}