use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use ferriskey::{Client, Value};
use ff_core::backend::{Handle, HandleKind, PendingWaitpoint};
use ff_core::contracts::ReportUsageResult;
use ff_core::engine_backend::EngineBackend;
use ff_core::engine_error::StateKind;
use ff_script::error::ScriptError;
use ff_core::partition::PartitionConfig;
use ff_core::types::*;
use tokio::sync::{Notify, OwnedSemaphorePermit};
use tokio::task::JoinHandle;
use crate::SdkError;
pub use ff_core::contracts::{
CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
SuspensionRequester, TimeoutBehavior, WaitpointBinding,
};
#[derive(Debug)]
pub struct SuspendedHandle {
pub handle: Handle,
pub details: SuspendOutcomeDetails,
}
#[allow(clippy::large_enum_variant)]
pub enum TrySuspendOutcome {
Suspended(SuspendedHandle),
AlreadySatisfied {
task: ClaimedTask,
details: SuspendOutcomeDetails,
},
}
impl std::fmt::Debug for TrySuspendOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Suspended(h) => f.debug_tuple("Suspended").field(h).finish(),
Self::AlreadySatisfied { details, .. } => f
.debug_struct("AlreadySatisfied")
.field("details", details)
.finish_non_exhaustive(),
}
}
}
#[derive(Clone, Debug)]
pub struct Signal {
pub signal_name: String,
pub signal_category: String,
pub payload: Option<Vec<u8>>,
pub source_type: String,
pub source_identity: String,
pub idempotency_key: Option<String>,
pub waitpoint_token: WaitpointToken,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SignalOutcome {
Accepted { signal_id: SignalId, effect: String },
TriggeredResume { signal_id: SignalId },
Duplicate { existing_signal_id: String },
}
impl SignalOutcome {
pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
parse_signal_result(raw)
}
}
pub use ff_core::backend::ResumeSignal;
pub use ff_core::backend::AppendFrameOutcome;
pub use ff_core::backend::FailOutcome;
pub struct ClaimedTask {
#[allow(dead_code)]
client: Client,
backend: Arc<dyn EngineBackend>,
#[allow(dead_code)]
partition_config: PartitionConfig,
execution_id: ExecutionId,
attempt_index: AttemptIndex,
attempt_id: AttemptId,
lease_id: LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
lane_id: LaneId,
worker_instance_id: WorkerInstanceId,
input_payload: Vec<u8>,
execution_kind: String,
tags: HashMap<String, String>,
renewal_handle: JoinHandle<()>,
renewal_stop: Arc<Notify>,
renewal_failures: Arc<AtomicU32>,
terminal_op_called: AtomicBool,
_concurrency_permit: Option<OwnedSemaphorePermit>,
}
impl ClaimedTask {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: Client,
backend: Arc<dyn EngineBackend>,
partition_config: PartitionConfig,
execution_id: ExecutionId,
attempt_index: AttemptIndex,
attempt_id: AttemptId,
lease_id: LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
lane_id: LaneId,
worker_instance_id: WorkerInstanceId,
input_payload: Vec<u8>,
execution_kind: String,
tags: HashMap<String, String>,
) -> Self {
let renewal_stop = Arc::new(Notify::new());
let renewal_failures = Arc::new(AtomicU32::new(0));
let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
execution_id.clone(),
attempt_index,
attempt_id.clone(),
lease_id.clone(),
lease_epoch,
lease_ttl_ms,
lane_id.clone(),
worker_instance_id.clone(),
HandleKind::Fresh,
);
let renewal_handle = spawn_renewal_task(
backend.clone(),
renewal_handle_cookie,
execution_id.clone(),
lease_ttl_ms,
renewal_stop.clone(),
renewal_failures.clone(),
);
Self {
client,
backend,
partition_config,
execution_id,
attempt_index,
attempt_id,
lease_id,
lease_epoch,
lease_ttl_ms,
lane_id,
worker_instance_id,
input_payload,
execution_kind,
tags,
renewal_handle,
renewal_stop,
renewal_failures,
terminal_op_called: AtomicBool::new(false),
_concurrency_permit: None,
}
}
#[allow(dead_code)]
pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
self._concurrency_permit = Some(permit);
}
pub fn execution_id(&self) -> &ExecutionId {
&self.execution_id
}
pub fn attempt_index(&self) -> AttemptIndex {
self.attempt_index
}
pub fn attempt_id(&self) -> &AttemptId {
&self.attempt_id
}
pub fn lease_id(&self) -> &LeaseId {
&self.lease_id
}
pub fn lease_epoch(&self) -> LeaseEpoch {
self.lease_epoch
}
pub fn input_payload(&self) -> &[u8] {
&self.input_payload
}
pub fn execution_kind(&self) -> &str {
&self.execution_kind
}
pub fn tags(&self) -> &HashMap<String, String> {
&self.tags
}
pub fn lane_id(&self) -> &LaneId {
&self.lane_id
}
pub async fn read_stream(
&self,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
validate_stream_read_count(count_limit)?;
Ok(self
.backend
.read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
.await?)
}
pub async fn tail_stream(
&self,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
self.tail_stream_with_visibility(
after,
block_ms,
count_limit,
ff_core::backend::TailVisibility::All,
)
.await
}
pub async fn tail_stream_with_visibility(
&self,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: ff_core::backend::TailVisibility,
) -> Result<StreamFrames, SdkError> {
if block_ms > MAX_TAIL_BLOCK_MS {
return Err(SdkError::Config {
context: "tail_stream".into(),
field: Some("block_ms".into()),
message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
});
}
validate_stream_read_count(count_limit)?;
validate_tail_cursor(&after)?;
Ok(self
.backend
.tail_stream(
&self.execution_id,
self.attempt_index,
after,
block_ms,
count_limit,
visibility,
)
.await?)
}
fn synth_handle(&self) -> Handle {
ff_backend_valkey::ValkeyBackend::encode_handle(
self.execution_id.clone(),
self.attempt_index,
self.attempt_id.clone(),
self.lease_id.clone(),
self.lease_epoch,
self.lease_ttl_ms,
self.lane_id.clone(),
self.worker_instance_id.clone(),
HandleKind::Fresh,
)
}
pub fn is_lease_healthy(&self) -> bool {
self.renewal_failures.load(Ordering::Relaxed) < 3
}
pub fn consecutive_renewal_failures(&self) -> u32 {
self.renewal_failures.load(Ordering::Relaxed)
}
pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.delay(&handle, delay_until).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.wait_children(&handle).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.complete(&handle, result_payload).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn fail(
self,
reason: &str,
error_category: &str,
) -> Result<FailOutcome, SdkError> {
let handle = self.synth_handle();
let failure_reason = ff_core::backend::FailureReason::with_detail(
reason.to_owned(),
error_category.as_bytes().to_vec(),
);
let classification = error_category_to_class(error_category);
let out = self.backend.fail(&handle, failure_reason, classification).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.cancel(&handle, reason).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn renew_lease(&self) -> Result<(), SdkError> {
let handle = self.synth_handle();
self.backend
.renew(&handle)
.await
.map(|_renewal| ())
.map_err(SdkError::from)
}
pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
let handle = self.synth_handle();
self.backend
.progress(&handle, Some(pct), Some(message.to_owned()))
.await
.map_err(SdkError::from)
}
pub async fn report_usage(
&self,
budget_id: &BudgetId,
dimensions: &[(&str, u64)],
dedup_key: Option<&str>,
) -> Result<ReportUsageResult, SdkError> {
let handle = self.synth_handle();
let mut dims = ff_core::backend::UsageDimensions::default();
for (name, delta) in dimensions {
dims.custom.insert((*name).to_owned(), *delta);
}
dims.dedup_key = dedup_key
.filter(|k| !k.is_empty())
.map(|k| k.to_owned());
self.backend
.report_usage(&handle, budget_id, dims)
.await
.map_err(SdkError::from)
}
pub async fn create_pending_waitpoint(
&self,
waitpoint_key: &str,
expires_in_ms: u64,
) -> Result<(WaitpointId, WaitpointToken), SdkError> {
let handle = self.synth_handle();
let expires_in = std::time::Duration::from_millis(expires_in_ms);
let pending = self
.backend
.create_waitpoint(&handle, waitpoint_key, expires_in)
.await
.map_err(SdkError::from)?;
Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
}
pub async fn append_frame(
&self,
frame_type: &str,
payload: &[u8],
metadata: Option<&str>,
) -> Result<AppendFrameOutcome, SdkError> {
self.append_frame_with_mode(
frame_type,
payload,
metadata,
ff_core::backend::StreamMode::Durable,
)
.await
}
pub async fn append_frame_with_mode(
&self,
frame_type: &str,
payload: &[u8],
metadata: Option<&str>,
mode: ff_core::backend::StreamMode,
) -> Result<AppendFrameOutcome, SdkError> {
let handle = self.synth_handle();
let mut frame = ff_core::backend::Frame::new(
payload.to_vec(),
ff_core::backend::FrameKind::Event,
)
.with_frame_type(frame_type)
.with_mode(mode);
if let Some(cid) = metadata {
frame = frame.with_correlation_id(cid);
}
self.backend
.append_frame(&handle, frame)
.await
.map_err(SdkError::from)
}
pub async fn suspend(
self,
reason_code: SuspensionReasonCode,
resume_condition: ResumeCondition,
timeout: Option<(TimestampMs, TimeoutBehavior)>,
resume_policy: ResumePolicy,
) -> Result<SuspendedHandle, SdkError> {
let outcome = self
.try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
.await?;
match outcome {
TrySuspendOutcome::Suspended(h) => Ok(h),
TrySuspendOutcome::AlreadySatisfied { .. } => Err(SdkError::from(
crate::EngineError::State(StateKind::AlreadySatisfied),
)),
}
}
pub async fn try_suspend(
self,
reason_code: SuspensionReasonCode,
resume_condition: ResumeCondition,
timeout: Option<(TimestampMs, TimeoutBehavior)>,
resume_policy: ResumePolicy,
) -> Result<TrySuspendOutcome, SdkError> {
self.try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
.await
}
pub async fn try_suspend_on_pending(
self,
pending: &PendingWaitpoint,
reason_code: SuspensionReasonCode,
resume_condition: ResumeCondition,
timeout: Option<(TimestampMs, TimeoutBehavior)>,
resume_policy: ResumePolicy,
) -> Result<TrySuspendOutcome, SdkError> {
self.try_suspend_inner(
WaitpointBinding::use_pending(pending),
reason_code,
resume_condition,
timeout,
resume_policy,
)
.await
}
async fn try_suspend_inner(
self,
waitpoint: WaitpointBinding,
reason_code: SuspensionReasonCode,
resume_condition: ResumeCondition,
timeout: Option<(TimestampMs, TimeoutBehavior)>,
resume_policy: ResumePolicy,
) -> Result<TrySuspendOutcome, SdkError> {
let handle = self.synth_handle();
let (timeout_at, timeout_behavior) = match timeout {
Some((at, b)) => (Some(at), b),
None => (None, TimeoutBehavior::Fail),
};
let waitpoint = match (&waitpoint, &resume_condition) {
(
WaitpointBinding::Fresh { waitpoint_id, .. },
ResumeCondition::Single { waitpoint_key, .. },
) => WaitpointBinding::Fresh {
waitpoint_id: waitpoint_id.clone(),
waitpoint_key: waitpoint_key.clone(),
},
(
WaitpointBinding::Fresh { waitpoint_id, .. },
ResumeCondition::Composite(body),
) => {
if let Some(key) = composite_first_waitpoint_key(body) {
WaitpointBinding::Fresh {
waitpoint_id: waitpoint_id.clone(),
waitpoint_key: key,
}
} else {
waitpoint
}
}
_ => waitpoint,
};
let mut args = SuspendArgs::new(
SuspensionId::new(),
waitpoint,
resume_condition,
resume_policy,
reason_code,
TimestampMs::now(),
)
.with_requester(SuspensionRequester::Worker);
if let Some(at) = timeout_at {
args = args.with_timeout(at, timeout_behavior);
}
let outcome = self
.backend
.suspend(&handle, args)
.await
.map_err(SdkError::from)?;
match outcome {
SuspendOutcome::Suspended { details, handle: new_handle } => {
self.stop_renewal();
Ok(TrySuspendOutcome::Suspended(SuspendedHandle {
handle: new_handle,
details,
}))
}
SuspendOutcome::AlreadySatisfied { details } => {
Ok(TrySuspendOutcome::AlreadySatisfied {
task: self,
details,
})
}
_ => Err(SdkError::from(ScriptError::Parse {
fcall: "try_suspend_inner".into(),
execution_id: None,
message: "unexpected SuspendOutcome variant".into(),
})),
}
}
pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
let handle = self.synth_handle();
self.backend
.observe_signals(&handle)
.await
.map_err(SdkError::from)
}
fn stop_renewal(&self) {
self.terminal_op_called.store(true, Ordering::Release);
self.renewal_stop.notify_one();
}
}
fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
match r {
Ok(_) => true,
Err(crate::EngineError::Transport { .. }) => false,
Err(_) => true,
}
}
fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
use ff_core::backend::FailureClass;
match s {
"transient" => FailureClass::Transient,
"permanent" => FailureClass::Permanent,
"infra_crash" => FailureClass::InfraCrash,
"timeout" => FailureClass::Timeout,
"cancelled" => FailureClass::Cancelled,
_ => FailureClass::Transient,
}
}
impl Drop for ClaimedTask {
fn drop(&mut self) {
if !self.terminal_op_called.load(Ordering::Acquire) {
tracing::warn!(
execution_id = %self.execution_id,
"ClaimedTask dropped without terminal operation — lease will expire"
);
}
self.renewal_handle.abort();
}
}
#[tracing::instrument(
name = "renew_lease",
skip_all,
fields(execution_id = %execution_id)
)]
async fn renew_once(
backend: &dyn EngineBackend,
handle: &Handle,
execution_id: &ExecutionId,
) -> Result<(), crate::EngineError> {
backend.renew(handle).await.map(|_| ())
}
fn spawn_renewal_task(
backend: Arc<dyn EngineBackend>,
handle: Handle,
execution_id: ExecutionId,
lease_ttl_ms: u64,
stop_signal: Arc<Notify>,
failure_counter: Arc<AtomicU32>,
) -> JoinHandle<()> {
let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tick.tick().await;
loop {
tokio::select! {
_ = stop_signal.notified() => {
tracing::debug!(
execution_id = %execution_id,
"lease renewal stopped by signal"
);
return;
}
_ = tick.tick() => {
match renew_once(backend.as_ref(), &handle, &execution_id).await {
Ok(_renewal) => {
failure_counter.store(0, Ordering::Relaxed);
tracing::trace!(
execution_id = %execution_id,
"lease renewed"
);
}
Err(e) if is_terminal_renewal_error(&e) => {
failure_counter.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
execution_id = %execution_id,
error = %e,
"lease renewal failed with terminal error, stopping renewal"
);
return;
}
Err(e) => {
let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
tracing::warn!(
execution_id = %execution_id,
error = %e,
consecutive_failures = count,
"lease renewal failed (will retry next interval)"
);
}
}
}
}
}
})
}
#[allow(dead_code)]
fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
use crate::{ContentionKind, EngineError, StateKind};
matches!(
err,
EngineError::State(
StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
| EngineError::NotFound { entity: "execution" }
)
}
pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: "ff_report_usage_and_check: expected Array".into(),
}));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: "ff_report_usage_and_check: expected Int status code".into(),
}));
}
};
if status_code != 1 {
let error_code = usage_field_str(arr, 1);
let detail = usage_field_str(arr, 2);
return Err(SdkError::from(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: format!("ff_report_usage_and_check: {error_code}"),
}
}),
));
}
let sub_status = usage_field_str(arr, 1);
match sub_status.as_str() {
"OK" => Ok(ReportUsageResult::Ok),
"ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
"SOFT_BREACH" => {
let dim = usage_field_str(arr, 2);
let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
}
"HARD_BREACH" => {
let dim = usage_field_str(arr, 2);
let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
Ok(ReportUsageResult::HardBreach {
dimension: dim,
current_usage: current,
hard_limit: limit,
})
}
_ => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check: unknown sub-status: {sub_status}"
),
})),
}
}
fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
match arr.get(index) {
Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
Some(Ok(Value::SimpleString(s))) => s.clone(),
Some(Ok(Value::Int(n))) => n.to_string(),
_ => String::new(),
}
}
fn parse_usage_u64(
arr: &[Result<Value, ferriskey::Error>],
index: usize,
sub_status: &str,
field_name: &str,
) -> Result<u64, SdkError> {
match arr.get(index) {
Some(Ok(Value::Int(n))) => {
u64::try_from(*n).map_err(|_| {
SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) negative int {n} cannot be u64"
),
})
})
}
Some(Ok(Value::BulkString(b))) => {
let s = String::from_utf8_lossy(b);
s.parse::<u64>().map_err(|_| {
SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) not a u64 string: {s:?}"
),
})
})
}
Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) not a u64 string: {s:?}"
),
})
}),
Some(_) => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) wrong wire type (expected Int or String)"
),
})),
None => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) missing from response"
),
})),
}
}
#[allow(dead_code)]
fn resume_waitpoint_id_from_suspension(
susp: &HashMap<String, String>,
claimed_attempt: AttemptIndex,
) -> Result<Option<WaitpointId>, SdkError> {
if susp.is_empty() {
return Ok(None);
}
let susp_att: u32 = susp
.get("attempt_index")
.and_then(|s| s.parse().ok())
.unwrap_or(u32::MAX);
if susp_att != claimed_attempt.0 {
return Ok(None);
}
let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
if close_reason != "resumed" {
return Ok(None);
}
let wp_id_str = susp
.get("waitpoint_id")
.map(String::as_str)
.unwrap_or_default();
if wp_id_str.is_empty() {
return Ok(None);
}
let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
SdkError::from(ScriptError::Parse {
fcall: "resume_waitpoint_id_from_suspension".into(),
execution_id: None,
message: format!(
"resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
),
})
})?;
Ok(Some(waitpoint_id))
}
#[cfg_attr(not(feature = "direct-valkey-claim"), allow(dead_code))]
pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!(
"{function_name}: expected Array, got non-array"
),
}));
}
};
if arr.is_empty() {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!(
"{function_name}: empty result array"
),
}));
}
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!(
"{function_name}: expected Int at index 0"
),
}));
}
};
if status_code == 1 {
Ok(())
} else {
let field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let details: Vec<String> = (2..arr.len()).map(field_str).collect();
let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
.unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!("{function_name}: unknown error: {error_code}"),
}
});
Err(SdkError::from(script_err))
}
}
pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: "ff_deliver_signal: expected Array".into(),
}));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: "ff_deliver_signal: bad status code".into(),
}));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = err_field_str(2);
return Err(SdkError::from(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: format!("ff_deliver_signal: {error_code}"),
}
}),
));
}
let sub_status = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
if sub_status == "DUPLICATE" {
let existing_id = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
return Ok(SignalOutcome::Duplicate {
existing_signal_id: existing_id,
});
}
let signal_id_str = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let effect = arr
.get(3)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
SdkError::from(ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: format!(
"ff_deliver_signal: invalid signal_id from Lua: {e}"
),
})
})?;
if effect == "resume_condition_satisfied" {
Ok(SignalOutcome::TriggeredResume { signal_id })
} else {
Ok(SignalOutcome::Accepted { signal_id, effect })
}
}
#[allow(dead_code)]
fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: "ff_fail_execution: expected Array".into(),
}));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: "ff_fail_execution: bad status code".into(),
}));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
return Err(SdkError::from(
ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: format!("ff_fail_execution: {error_code}"),
}
}),
));
}
let sub_status = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
match sub_status.as_str() {
"retry_scheduled" => {
let delay_str = arr
.get(3)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::Int(n)) => Some(n.to_string()),
_ => None,
})
.unwrap_or_default();
let delay_until = delay_str.parse::<i64>().unwrap_or(0);
Ok(FailOutcome::RetryScheduled {
delay_until: TimestampMs::from_millis(delay_until),
})
}
"terminal_failed" => Ok(FailOutcome::TerminalFailed),
_ => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: format!(
"ff_fail_execution: unexpected sub-status: {sub_status}"
),
})),
}
}
pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
pub use ff_core::contracts::STREAM_READ_HARD_CAP;
pub use ff_core::contracts::StreamFrames;
pub use ff_core::contracts::StreamCursor;
fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
if !after.is_concrete() {
return Err(SdkError::Config {
context: "tail_stream".into(),
field: Some("after".into()),
message: "XREAD cursor must be a concrete entry id; pass \
StreamCursor::from_beginning() to start from the \
beginning"
.into(),
});
}
Ok(())
}
fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
if count_limit == 0 {
return Err(SdkError::Config {
context: "read_stream_frames".into(),
field: Some("count_limit".into()),
message: "count_limit must be >= 1".into(),
});
}
if count_limit > STREAM_READ_HARD_CAP {
return Err(SdkError::Config {
context: "read_stream_frames".into(),
field: Some("count_limit".into()),
message: format!(
"count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
),
});
}
Ok(())
}
pub async fn read_stream(
backend: &dyn EngineBackend,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
validate_stream_read_count(count_limit)?;
Ok(backend
.read_stream(execution_id, attempt_index, from, to, count_limit)
.await?)
}
pub async fn tail_stream(
backend: &dyn EngineBackend,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
tail_stream_with_visibility(
backend,
execution_id,
attempt_index,
after,
block_ms,
count_limit,
ff_core::backend::TailVisibility::All,
)
.await
}
pub async fn tail_stream_with_visibility(
backend: &dyn EngineBackend,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: ff_core::backend::TailVisibility,
) -> Result<StreamFrames, SdkError> {
if block_ms > MAX_TAIL_BLOCK_MS {
return Err(SdkError::Config {
context: "tail_stream".into(),
field: Some("block_ms".into()),
message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
});
}
validate_stream_read_count(count_limit)?;
validate_tail_cursor(&after)?;
Ok(backend
.tail_stream(
execution_id,
attempt_index,
after,
block_ms,
count_limit,
visibility,
)
.await?)
}
#[cfg(test)]
mod tail_stream_boundary_tests {
use super::*;
#[test]
fn rejects_start_cursor() {
let err = validate_tail_cursor(&StreamCursor::Start)
.expect_err("Start must be rejected");
match err {
SdkError::Config { field, context, .. } => {
assert_eq!(field.as_deref(), Some("after"));
assert_eq!(context, "tail_stream");
}
other => panic!("expected SdkError::Config, got {other:?}"),
}
}
#[test]
fn rejects_end_cursor() {
let err = validate_tail_cursor(&StreamCursor::End)
.expect_err("End must be rejected");
assert!(matches!(err, SdkError::Config { .. }));
}
#[test]
fn accepts_at_cursor() {
validate_tail_cursor(&StreamCursor::At("0-0".into()))
.expect("At cursor must be accepted");
validate_tail_cursor(&StreamCursor::from_beginning())
.expect("from_beginning() must be accepted");
validate_tail_cursor(&StreamCursor::At("123-0".into()))
.expect("concrete id must be accepted");
}
}
#[cfg(test)]
mod parse_report_usage_result_tests {
use super::*;
fn s(v: &str) -> Result<Value, ferriskey::Error> {
Ok(Value::SimpleString(v.to_owned()))
}
fn int(n: i64) -> Result<Value, ferriskey::Error> {
Ok(Value::Int(n))
}
fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
Value::Array(items)
}
#[test]
fn ok_status() {
let raw = arr(vec![int(1), s("OK")]);
assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
}
#[test]
fn already_applied_status() {
let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
assert_eq!(
parse_report_usage_result(&raw).unwrap(),
ReportUsageResult::AlreadyApplied
);
}
#[test]
fn soft_breach_status() {
let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
match parse_report_usage_result(&raw).unwrap() {
ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
assert_eq!(dimension, "tokens");
assert_eq!(current_usage, 150);
assert_eq!(soft_limit, 100);
}
other => panic!("expected SoftBreach, got {other:?}"),
}
}
#[test]
fn hard_breach_status() {
let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
match parse_report_usage_result(&raw).unwrap() {
ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
assert_eq!(dimension, "requests");
assert_eq!(current_usage, 10001);
assert_eq!(hard_limit, 10000);
}
other => panic!("expected HardBreach, got {other:?}"),
}
}
#[test]
fn non_array_input_is_parse_error() {
let raw = Value::SimpleString("OK".to_owned());
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.to_lowercase().contains("expected array"),
"error should mention expected shape, got: {msg}"
);
}
#[test]
fn first_element_non_int_is_parse_error() {
let raw = arr(vec![s("not_an_int"), s("OK")]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.to_lowercase().contains("int"),
"error should mention Int status code, got: {msg}"
);
}
#[test]
fn soft_breach_non_numeric_current_is_parse_error() {
let raw = arr(vec![
int(1),
s("SOFT_BREACH"),
s("tokens"),
s("not_a_number"), s("100"),
]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
"error should identify sub-status + field, got: {msg}"
);
assert!(
msg.to_lowercase().contains("u64"),
"error should mention the expected type (u64), got: {msg}"
);
}
#[test]
fn hard_breach_missing_limit_is_parse_error() {
let raw = arr(vec![
int(1),
s("HARD_BREACH"),
s("requests"),
s("10001"),
]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
"error should identify sub-status + field, got: {msg}"
);
assert!(
msg.to_lowercase().contains("missing"),
"error should say 'missing', got: {msg}"
);
}
}
fn composite_first_waitpoint_key(body: &CompositeBody) -> Option<String> {
match body {
CompositeBody::AllOf { members } => members.iter().find_map(|m| match m {
ResumeCondition::Single { waitpoint_key, .. } => Some(waitpoint_key.clone()),
ResumeCondition::Composite(inner) => composite_first_waitpoint_key(inner),
_ => None,
}),
CompositeBody::Count { waitpoints, .. } => waitpoints.first().cloned(),
_ => None,
}
}
#[cfg(test)]
mod resume_signals_tests {
use super::*;
fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
}
#[test]
fn empty_suspension_returns_none() {
let susp = m(&[]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none(), "no suspension record → None");
}
#[test]
fn stale_prior_attempt_returns_none() {
let wp = WaitpointId::new();
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
assert!(out.is_none(), "attempt_index mismatch → None");
}
#[test]
fn non_resumed_close_returns_none() {
let wp = WaitpointId::new();
for reason in ["timeout", "cancelled", "", "expired"] {
let susp = m(&[
("attempt_index", "0"),
("close_reason", reason),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none(), "close_reason={reason:?} must not return signals");
}
}
#[test]
fn resumed_same_attempt_returns_waitpoint() {
let wp = WaitpointId::new();
let susp = m(&[
("attempt_index", "2"),
("close_reason", "resumed"),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
assert_eq!(out, Some(wp));
}
#[test]
fn malformed_waitpoint_id_is_error() {
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", "not-a-uuid"),
]);
let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
assert!(
format!("{err}").contains("not a valid UUID"),
"error should mention invalid UUID, got: {err}"
);
}
#[test]
fn empty_waitpoint_id_returns_none() {
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", ""),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none());
}
}
#[cfg(test)]
mod terminal_replay_parsing_tests {
use super::*;
use ferriskey::Value;
fn bulk(s: &str) -> Value {
Value::SimpleString(s.to_owned())
}
#[test]
fn parse_success_result_extracts_all_four_detail_slots() {
let raw = Value::Array(vec![
Ok(Value::Int(0)),
Ok(bulk("execution_not_active")),
Ok(bulk("success")),
Ok(bulk("42")),
Ok(bulk("terminal")),
Ok(bulk("11111111-1111-1111-1111-111111111111")),
]);
let err = parse_success_result(&raw, "test").unwrap_err();
let unboxed = match err {
SdkError::Engine(b) => *b,
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
};
match unboxed {
crate::EngineError::Contention(
crate::ContentionKind::ExecutionNotActive {
terminal_outcome,
lease_epoch,
lifecycle_phase,
attempt_id,
},
) => {
assert_eq!(terminal_outcome, "success");
assert_eq!(lease_epoch, "42");
assert_eq!(lifecycle_phase, "terminal");
assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
}
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
}
}
#[test]
fn parse_fail_result_extracts_all_four_detail_slots() {
let raw = Value::Array(vec![
Ok(Value::Int(0)),
Ok(bulk("execution_not_active")),
Ok(bulk("none")),
Ok(bulk("7")),
Ok(bulk("runnable")),
Ok(bulk("22222222-2222-2222-2222-222222222222")),
]);
let err = parse_fail_result(&raw).unwrap_err();
let unboxed = match err {
SdkError::Engine(b) => *b,
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
};
match unboxed {
crate::EngineError::Contention(
crate::ContentionKind::ExecutionNotActive {
terminal_outcome,
lease_epoch,
lifecycle_phase,
attempt_id,
},
) => {
assert_eq!(terminal_outcome, "none");
assert_eq!(lease_epoch, "7");
assert_eq!(lifecycle_phase, "runnable");
assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
}
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
}
}
#[test]
fn parse_success_result_missing_slots_defaults_to_empty() {
let raw = Value::Array(vec![
Ok(Value::Int(0)),
Ok(bulk("execution_not_active")),
]);
let err = parse_success_result(&raw, "test").unwrap_err();
let unboxed = match err {
SdkError::Engine(b) => *b,
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
};
match unboxed {
crate::EngineError::Contention(
crate::ContentionKind::ExecutionNotActive {
terminal_outcome,
lease_epoch,
lifecycle_phase,
attempt_id,
},
) => {
assert_eq!(terminal_outcome, "");
assert_eq!(lease_epoch, "");
assert_eq!(lifecycle_phase, "");
assert_eq!(attempt_id, "");
}
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
}
}
}