use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use ferriskey::{Client, Value};
use ff_core::backend::{Handle, HandleKind};
use ff_core::contracts::ReportUsageResult;
use ff_core::engine_backend::EngineBackend;
use ff_script::error::ScriptError;
use ff_core::keys::{ExecKeyContext, IndexKeys};
use ff_core::partition::{execution_partition, PartitionConfig};
use ff_core::types::*;
use tokio::sync::{Notify, OwnedSemaphorePermit};
use tokio::task::JoinHandle;
use crate::SdkError;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TimeoutBehavior {
Fail,
Cancel,
Expire,
AutoResume,
Escalate,
}
impl TimeoutBehavior {
pub fn as_str(&self) -> &str {
match self {
Self::Fail => "fail",
Self::Cancel => "cancel",
Self::Expire => "expire",
Self::AutoResume => "auto_resume_with_timeout_signal",
Self::Escalate => "escalate",
}
}
}
impl std::fmt::Display for TimeoutBehavior {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for TimeoutBehavior {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"fail" => Ok(Self::Fail),
"cancel" => Ok(Self::Cancel),
"expire" => Ok(Self::Expire),
"auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
"escalate" => Ok(Self::Escalate),
other => Err(format!("unknown timeout behavior: {other}")),
}
}
}
#[derive(Clone, Debug)]
pub struct ConditionMatcher {
pub signal_name: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SuspendOutcome {
Suspended {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
AlreadySatisfied {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
}
#[derive(Clone, Debug)]
pub struct Signal {
pub signal_name: String,
pub signal_category: String,
pub payload: Option<Vec<u8>>,
pub source_type: String,
pub source_identity: String,
pub idempotency_key: Option<String>,
pub waitpoint_token: WaitpointToken,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SignalOutcome {
Accepted { signal_id: SignalId, effect: String },
TriggeredResume { signal_id: SignalId },
Duplicate { existing_signal_id: String },
}
impl SignalOutcome {
pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
parse_signal_result(raw)
}
}
pub use ff_core::backend::ResumeSignal;
pub use ff_core::backend::AppendFrameOutcome;
pub use ff_core::backend::FailOutcome;
pub struct ClaimedTask {
client: Client,
backend: Arc<dyn EngineBackend>,
partition_config: PartitionConfig,
execution_id: ExecutionId,
attempt_index: AttemptIndex,
attempt_id: AttemptId,
lease_id: LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
lane_id: LaneId,
worker_instance_id: WorkerInstanceId,
input_payload: Vec<u8>,
execution_kind: String,
tags: HashMap<String, String>,
renewal_handle: JoinHandle<()>,
renewal_stop: Arc<Notify>,
renewal_failures: Arc<AtomicU32>,
terminal_op_called: AtomicBool,
_concurrency_permit: Option<OwnedSemaphorePermit>,
}
impl ClaimedTask {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: Client,
backend: Arc<dyn EngineBackend>,
partition_config: PartitionConfig,
execution_id: ExecutionId,
attempt_index: AttemptIndex,
attempt_id: AttemptId,
lease_id: LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
lane_id: LaneId,
worker_instance_id: WorkerInstanceId,
input_payload: Vec<u8>,
execution_kind: String,
tags: HashMap<String, String>,
) -> Self {
let renewal_stop = Arc::new(Notify::new());
let renewal_failures = Arc::new(AtomicU32::new(0));
let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
execution_id.clone(),
attempt_index,
attempt_id.clone(),
lease_id.clone(),
lease_epoch,
lease_ttl_ms,
lane_id.clone(),
worker_instance_id.clone(),
HandleKind::Fresh,
);
let renewal_handle = spawn_renewal_task(
backend.clone(),
renewal_handle_cookie,
execution_id.clone(),
lease_ttl_ms,
renewal_stop.clone(),
renewal_failures.clone(),
);
Self {
client,
backend,
partition_config,
execution_id,
attempt_index,
attempt_id,
lease_id,
lease_epoch,
lease_ttl_ms,
lane_id,
worker_instance_id,
input_payload,
execution_kind,
tags,
renewal_handle,
renewal_stop,
renewal_failures,
terminal_op_called: AtomicBool::new(false),
_concurrency_permit: None,
}
}
#[allow(dead_code)]
pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
self._concurrency_permit = Some(permit);
}
pub fn execution_id(&self) -> &ExecutionId {
&self.execution_id
}
pub fn attempt_index(&self) -> AttemptIndex {
self.attempt_index
}
pub fn attempt_id(&self) -> &AttemptId {
&self.attempt_id
}
pub fn lease_id(&self) -> &LeaseId {
&self.lease_id
}
pub fn lease_epoch(&self) -> LeaseEpoch {
self.lease_epoch
}
pub fn input_payload(&self) -> &[u8] {
&self.input_payload
}
pub fn execution_kind(&self) -> &str {
&self.execution_kind
}
pub fn tags(&self) -> &HashMap<String, String> {
&self.tags
}
pub fn lane_id(&self) -> &LaneId {
&self.lane_id
}
pub async fn read_stream(
&self,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
validate_stream_read_count(count_limit)?;
Ok(self
.backend
.read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
.await?)
}
pub async fn tail_stream(
&self,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
if block_ms > MAX_TAIL_BLOCK_MS {
return Err(SdkError::Config {
context: "tail_stream".into(),
field: Some("block_ms".into()),
message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
});
}
validate_stream_read_count(count_limit)?;
validate_tail_cursor(&after)?;
Ok(self
.backend
.tail_stream(
&self.execution_id,
self.attempt_index,
after,
block_ms,
count_limit,
)
.await?)
}
fn synth_handle(&self) -> Handle {
ff_backend_valkey::ValkeyBackend::encode_handle(
self.execution_id.clone(),
self.attempt_index,
self.attempt_id.clone(),
self.lease_id.clone(),
self.lease_epoch,
self.lease_ttl_ms,
self.lane_id.clone(),
self.worker_instance_id.clone(),
HandleKind::Fresh,
)
}
pub fn is_lease_healthy(&self) -> bool {
self.renewal_failures.load(Ordering::Relaxed) < 3
}
pub fn consecutive_renewal_failures(&self) -> u32 {
self.renewal_failures.load(Ordering::Relaxed)
}
pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.delay(&handle, delay_until).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.wait_children(&handle).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.complete(&handle, result_payload).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn fail(
self,
reason: &str,
error_category: &str,
) -> Result<FailOutcome, SdkError> {
let handle = self.synth_handle();
let failure_reason = ff_core::backend::FailureReason::with_detail(
reason.to_owned(),
error_category.as_bytes().to_vec(),
);
let classification = error_category_to_class(error_category);
let out = self.backend.fail(&handle, failure_reason, classification).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
let handle = self.synth_handle();
let out = self.backend.cancel(&handle, reason).await;
if fcall_landed(&out) {
self.stop_renewal();
}
out.map_err(SdkError::from)
}
pub async fn renew_lease(&self) -> Result<(), SdkError> {
let handle = self.synth_handle();
self.backend
.renew(&handle)
.await
.map(|_renewal| ())
.map_err(SdkError::from)
}
pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
let handle = self.synth_handle();
self.backend
.progress(&handle, Some(pct), Some(message.to_owned()))
.await
.map_err(SdkError::from)
}
pub async fn report_usage(
&self,
budget_id: &BudgetId,
dimensions: &[(&str, u64)],
dedup_key: Option<&str>,
) -> Result<ReportUsageResult, SdkError> {
let handle = self.synth_handle();
let mut dims = ff_core::backend::UsageDimensions::default();
for (name, delta) in dimensions {
dims.custom.insert((*name).to_owned(), *delta);
}
dims.dedup_key = dedup_key
.filter(|k| !k.is_empty())
.map(|k| k.to_owned());
self.backend
.report_usage(&handle, budget_id, dims)
.await
.map_err(SdkError::from)
}
pub async fn create_pending_waitpoint(
&self,
waitpoint_key: &str,
expires_in_ms: u64,
) -> Result<(WaitpointId, WaitpointToken), SdkError> {
let handle = self.synth_handle();
let expires_in = std::time::Duration::from_millis(expires_in_ms);
let pending = self
.backend
.create_waitpoint(&handle, waitpoint_key, expires_in)
.await
.map_err(SdkError::from)?;
Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
}
pub async fn append_frame(
&self,
frame_type: &str,
payload: &[u8],
metadata: Option<&str>,
) -> Result<AppendFrameOutcome, SdkError> {
let handle = self.synth_handle();
let mut frame = ff_core::backend::Frame::new(
payload.to_vec(),
ff_core::backend::FrameKind::Event,
)
.with_frame_type(frame_type);
if let Some(cid) = metadata {
frame = frame.with_correlation_id(cid);
}
self.backend
.append_frame(&handle, frame)
.await
.map_err(SdkError::from)
}
pub async fn suspend(
self,
reason_code: &str,
condition_matchers: &[ConditionMatcher],
timeout_ms: Option<u64>,
timeout_behavior: TimeoutBehavior,
) -> Result<SuspendOutcome, SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let suspension_id = SuspensionId::new();
let waitpoint_id = WaitpointId::new();
let waitpoint_key = format!("wpk:{}", waitpoint_id);
let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
let required_signal_names: Vec<&str> = condition_matchers
.iter()
.map(|m| m.signal_name.as_str())
.collect();
let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
let resume_condition_json = serde_json::json!({
"condition_type": "signal_set",
"required_signal_names": required_signal_names,
"signal_match_mode": match_mode,
"minimum_signal_count": 1,
"timeout_behavior": timeout_behavior.as_str(),
"allow_operator_override": true,
}).to_string();
let resume_policy_json = serde_json::json!({
"resume_target": "runnable",
"close_waitpoint_on_resume": true,
"consume_matched_signals": true,
"retain_signal_buffer_until_closed": true,
}).to_string();
let keys: Vec<String> = vec![
ctx.core(), ctx.attempt_hash(self.attempt_index), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&self.worker_instance_id), ctx.suspension_current(), ctx.waitpoint(&waitpoint_id), ctx.waitpoint_signals(&waitpoint_id), idx.suspension_timeout(), idx.pending_waitpoint_expiry(), idx.lane_active(&self.lane_id), idx.lane_suspended(&self.lane_id), ctx.waitpoints(), ctx.waitpoint_condition(&waitpoint_id), idx.attempt_timeout(), idx.waitpoint_hmac_secrets(), ];
let args: Vec<String> = vec![
self.execution_id.to_string(), self.attempt_index.to_string(), self.attempt_id.to_string(), self.lease_id.to_string(), self.lease_epoch.to_string(), suspension_id.to_string(), waitpoint_id.to_string(), waitpoint_key.clone(), reason_code.to_owned(), "worker".to_owned(), timeout_at.map_or(String::new(), |t| t.to_string()), resume_condition_json, resume_policy_json, String::new(), String::new(), timeout_behavior.as_str().to_owned(), "1000".to_owned(), ];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_suspend_execution", &key_refs, &arg_refs)
.await
.map_err(SdkError::from)?;
self.stop_renewal();
parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
}
pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
let handle = self.synth_handle();
self.backend
.observe_signals(&handle)
.await
.map_err(SdkError::from)
}
fn stop_renewal(&self) {
self.terminal_op_called.store(true, Ordering::Release);
self.renewal_stop.notify_one();
}
}
fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
match r {
Ok(_) => true,
Err(crate::EngineError::Transport { .. }) => false,
Err(_) => true,
}
}
fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
use ff_core::backend::FailureClass;
match s {
"transient" => FailureClass::Transient,
"permanent" => FailureClass::Permanent,
"infra_crash" => FailureClass::InfraCrash,
"timeout" => FailureClass::Timeout,
"cancelled" => FailureClass::Cancelled,
_ => FailureClass::Transient,
}
}
impl Drop for ClaimedTask {
fn drop(&mut self) {
if !self.terminal_op_called.load(Ordering::Acquire) {
tracing::warn!(
execution_id = %self.execution_id,
"ClaimedTask dropped without terminal operation — lease will expire"
);
}
self.renewal_handle.abort();
}
}
#[tracing::instrument(
name = "renew_lease",
skip_all,
fields(execution_id = %execution_id)
)]
async fn renew_once(
backend: &dyn EngineBackend,
handle: &Handle,
execution_id: &ExecutionId,
) -> Result<(), crate::EngineError> {
backend.renew(handle).await.map(|_| ())
}
fn spawn_renewal_task(
backend: Arc<dyn EngineBackend>,
handle: Handle,
execution_id: ExecutionId,
lease_ttl_ms: u64,
stop_signal: Arc<Notify>,
failure_counter: Arc<AtomicU32>,
) -> JoinHandle<()> {
let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tick.tick().await;
loop {
tokio::select! {
_ = stop_signal.notified() => {
tracing::debug!(
execution_id = %execution_id,
"lease renewal stopped by signal"
);
return;
}
_ = tick.tick() => {
match renew_once(backend.as_ref(), &handle, &execution_id).await {
Ok(_renewal) => {
failure_counter.store(0, Ordering::Relaxed);
tracing::trace!(
execution_id = %execution_id,
"lease renewed"
);
}
Err(e) if is_terminal_renewal_error(&e) => {
failure_counter.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
execution_id = %execution_id,
error = %e,
"lease renewal failed with terminal error, stopping renewal"
);
return;
}
Err(e) => {
let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
tracing::warn!(
execution_id = %execution_id,
error = %e,
consecutive_failures = count,
"lease renewal failed (will retry next interval)"
);
}
}
}
}
}
})
}
#[allow(dead_code)]
fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
use crate::{ContentionKind, EngineError, StateKind};
matches!(
err,
EngineError::State(
StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
| EngineError::NotFound { entity: "execution" }
)
}
pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: "ff_report_usage_and_check: expected Array".into(),
}));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: "ff_report_usage_and_check: expected Int status code".into(),
}));
}
};
if status_code != 1 {
let error_code = usage_field_str(arr, 1);
let detail = usage_field_str(arr, 2);
return Err(SdkError::from(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: format!("ff_report_usage_and_check: {error_code}"),
}
}),
));
}
let sub_status = usage_field_str(arr, 1);
match sub_status.as_str() {
"OK" => Ok(ReportUsageResult::Ok),
"ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
"SOFT_BREACH" => {
let dim = usage_field_str(arr, 2);
let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
}
"HARD_BREACH" => {
let dim = usage_field_str(arr, 2);
let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
Ok(ReportUsageResult::HardBreach {
dimension: dim,
current_usage: current,
hard_limit: limit,
})
}
_ => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_report_usage_result".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check: unknown sub-status: {sub_status}"
),
})),
}
}
fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
match arr.get(index) {
Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
Some(Ok(Value::SimpleString(s))) => s.clone(),
Some(Ok(Value::Int(n))) => n.to_string(),
_ => String::new(),
}
}
fn parse_usage_u64(
arr: &[Result<Value, ferriskey::Error>],
index: usize,
sub_status: &str,
field_name: &str,
) -> Result<u64, SdkError> {
match arr.get(index) {
Some(Ok(Value::Int(n))) => {
u64::try_from(*n).map_err(|_| {
SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) negative int {n} cannot be u64"
),
})
})
}
Some(Ok(Value::BulkString(b))) => {
let s = String::from_utf8_lossy(b);
s.parse::<u64>().map_err(|_| {
SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) not a u64 string: {s:?}"
),
})
})
}
Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) not a u64 string: {s:?}"
),
})
}),
Some(_) => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) wrong wire type (expected Int or String)"
),
})),
None => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_usage_u64".into(),
execution_id: None,
message: format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) missing from response"
),
})),
}
}
#[allow(dead_code)]
fn resume_waitpoint_id_from_suspension(
susp: &HashMap<String, String>,
claimed_attempt: AttemptIndex,
) -> Result<Option<WaitpointId>, SdkError> {
if susp.is_empty() {
return Ok(None);
}
let susp_att: u32 = susp
.get("attempt_index")
.and_then(|s| s.parse().ok())
.unwrap_or(u32::MAX);
if susp_att != claimed_attempt.0 {
return Ok(None);
}
let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
if close_reason != "resumed" {
return Ok(None);
}
let wp_id_str = susp
.get("waitpoint_id")
.map(String::as_str)
.unwrap_or_default();
if wp_id_str.is_empty() {
return Ok(None);
}
let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
SdkError::from(ScriptError::Parse {
fcall: "resume_waitpoint_id_from_suspension".into(),
execution_id: None,
message: format!(
"resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
),
})
})?;
Ok(Some(waitpoint_id))
}
pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!(
"{function_name}: expected Array, got non-array"
),
}));
}
};
if arr.is_empty() {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!(
"{function_name}: empty result array"
),
}));
}
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!(
"{function_name}: expected Int at index 0"
),
}));
}
};
if status_code == 1 {
Ok(())
} else {
let field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let details: Vec<String> = (2..arr.len()).map(field_str).collect();
let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
.unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_success_result".into(),
execution_id: None,
message: format!("{function_name}: unknown error: {error_code}"),
}
});
Err(SdkError::from(script_err))
}
}
fn parse_suspend_result(
raw: &Value,
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
) -> Result<SuspendOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_suspend_result".into(),
execution_id: None,
message: "ff_suspend_execution: expected Array".into(),
}));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_suspend_result".into(),
execution_id: None,
message: "ff_suspend_execution: bad status code".into(),
}));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = err_field_str(2);
return Err(SdkError::from(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_suspend_result".into(),
execution_id: None,
message: format!("ff_suspend_execution: {error_code}"),
}
}),
));
}
let sub_status = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let waitpoint_token = arr
.get(5)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.map(WaitpointToken::new)
.ok_or_else(|| {
SdkError::from(ScriptError::Parse {
fcall: "parse_suspend_result".into(),
execution_id: None,
message: "ff_suspend_execution: missing waitpoint_token in response".into(),
})
})?;
if sub_status == "ALREADY_SATISFIED" {
Ok(SuspendOutcome::AlreadySatisfied {
suspension_id,
waitpoint_id,
waitpoint_key,
waitpoint_token,
})
} else {
Ok(SuspendOutcome::Suspended {
suspension_id,
waitpoint_id,
waitpoint_key,
waitpoint_token,
})
}
}
pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: "ff_deliver_signal: expected Array".into(),
}));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: "ff_deliver_signal: bad status code".into(),
}));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = err_field_str(2);
return Err(SdkError::from(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: format!("ff_deliver_signal: {error_code}"),
}
}),
));
}
let sub_status = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
if sub_status == "DUPLICATE" {
let existing_id = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
return Ok(SignalOutcome::Duplicate {
existing_signal_id: existing_id,
});
}
let signal_id_str = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let effect = arr
.get(3)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
SdkError::from(ScriptError::Parse {
fcall: "parse_signal_result".into(),
execution_id: None,
message: format!(
"ff_deliver_signal: invalid signal_id from Lua: {e}"
),
})
})?;
if effect == "resume_condition_satisfied" {
Ok(SignalOutcome::TriggeredResume { signal_id })
} else {
Ok(SignalOutcome::Accepted { signal_id, effect })
}
}
#[allow(dead_code)]
fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: "ff_fail_execution: expected Array".into(),
}));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::from(ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: "ff_fail_execution: bad status code".into(),
}));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
return Err(SdkError::from(
ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: format!("ff_fail_execution: {error_code}"),
}
}),
));
}
let sub_status = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
match sub_status.as_str() {
"retry_scheduled" => {
let delay_str = arr
.get(3)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::Int(n)) => Some(n.to_string()),
_ => None,
})
.unwrap_or_default();
let delay_until = delay_str.parse::<i64>().unwrap_or(0);
Ok(FailOutcome::RetryScheduled {
delay_until: TimestampMs::from_millis(delay_until),
})
}
"terminal_failed" => Ok(FailOutcome::TerminalFailed),
_ => Err(SdkError::from(ScriptError::Parse {
fcall: "parse_fail_result".into(),
execution_id: None,
message: format!(
"ff_fail_execution: unexpected sub-status: {sub_status}"
),
})),
}
}
pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
pub use ff_core::contracts::STREAM_READ_HARD_CAP;
pub use ff_core::contracts::StreamFrames;
pub use ff_core::contracts::StreamCursor;
fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
if !after.is_concrete() {
return Err(SdkError::Config {
context: "tail_stream".into(),
field: Some("after".into()),
message: "XREAD cursor must be a concrete entry id; pass \
StreamCursor::from_beginning() to start from the \
beginning"
.into(),
});
}
Ok(())
}
fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
if count_limit == 0 {
return Err(SdkError::Config {
context: "read_stream_frames".into(),
field: Some("count_limit".into()),
message: "count_limit must be >= 1".into(),
});
}
if count_limit > STREAM_READ_HARD_CAP {
return Err(SdkError::Config {
context: "read_stream_frames".into(),
field: Some("count_limit".into()),
message: format!(
"count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
),
});
}
Ok(())
}
pub async fn read_stream(
backend: &dyn EngineBackend,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
validate_stream_read_count(count_limit)?;
Ok(backend
.read_stream(execution_id, attempt_index, from, to, count_limit)
.await?)
}
pub async fn tail_stream(
backend: &dyn EngineBackend,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
if block_ms > MAX_TAIL_BLOCK_MS {
return Err(SdkError::Config {
context: "tail_stream".into(),
field: Some("block_ms".into()),
message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
});
}
validate_stream_read_count(count_limit)?;
validate_tail_cursor(&after)?;
Ok(backend
.tail_stream(execution_id, attempt_index, after, block_ms, count_limit)
.await?)
}
#[cfg(test)]
mod tail_stream_boundary_tests {
use super::*;
#[test]
fn rejects_start_cursor() {
let err = validate_tail_cursor(&StreamCursor::Start)
.expect_err("Start must be rejected");
match err {
SdkError::Config { field, context, .. } => {
assert_eq!(field.as_deref(), Some("after"));
assert_eq!(context, "tail_stream");
}
other => panic!("expected SdkError::Config, got {other:?}"),
}
}
#[test]
fn rejects_end_cursor() {
let err = validate_tail_cursor(&StreamCursor::End)
.expect_err("End must be rejected");
assert!(matches!(err, SdkError::Config { .. }));
}
#[test]
fn accepts_at_cursor() {
validate_tail_cursor(&StreamCursor::At("0-0".into()))
.expect("At cursor must be accepted");
validate_tail_cursor(&StreamCursor::from_beginning())
.expect("from_beginning() must be accepted");
validate_tail_cursor(&StreamCursor::At("123-0".into()))
.expect("concrete id must be accepted");
}
}
#[cfg(test)]
mod parse_report_usage_result_tests {
use super::*;
fn s(v: &str) -> Result<Value, ferriskey::Error> {
Ok(Value::SimpleString(v.to_owned()))
}
fn int(n: i64) -> Result<Value, ferriskey::Error> {
Ok(Value::Int(n))
}
fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
Value::Array(items)
}
#[test]
fn ok_status() {
let raw = arr(vec![int(1), s("OK")]);
assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
}
#[test]
fn already_applied_status() {
let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
assert_eq!(
parse_report_usage_result(&raw).unwrap(),
ReportUsageResult::AlreadyApplied
);
}
#[test]
fn soft_breach_status() {
let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
match parse_report_usage_result(&raw).unwrap() {
ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
assert_eq!(dimension, "tokens");
assert_eq!(current_usage, 150);
assert_eq!(soft_limit, 100);
}
other => panic!("expected SoftBreach, got {other:?}"),
}
}
#[test]
fn hard_breach_status() {
let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
match parse_report_usage_result(&raw).unwrap() {
ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
assert_eq!(dimension, "requests");
assert_eq!(current_usage, 10001);
assert_eq!(hard_limit, 10000);
}
other => panic!("expected HardBreach, got {other:?}"),
}
}
#[test]
fn non_array_input_is_parse_error() {
let raw = Value::SimpleString("OK".to_owned());
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.to_lowercase().contains("expected array"),
"error should mention expected shape, got: {msg}"
);
}
#[test]
fn first_element_non_int_is_parse_error() {
let raw = arr(vec![s("not_an_int"), s("OK")]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.to_lowercase().contains("int"),
"error should mention Int status code, got: {msg}"
);
}
#[test]
fn soft_breach_non_numeric_current_is_parse_error() {
let raw = arr(vec![
int(1),
s("SOFT_BREACH"),
s("tokens"),
s("not_a_number"), s("100"),
]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
"error should identify sub-status + field, got: {msg}"
);
assert!(
msg.to_lowercase().contains("u64"),
"error should mention the expected type (u64), got: {msg}"
);
}
#[test]
fn hard_breach_missing_limit_is_parse_error() {
let raw = arr(vec![
int(1),
s("HARD_BREACH"),
s("requests"),
s("10001"),
]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
"error should identify sub-status + field, got: {msg}"
);
assert!(
msg.to_lowercase().contains("missing"),
"error should say 'missing', got: {msg}"
);
}
}
#[cfg(test)]
mod resume_signals_tests {
use super::*;
fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
}
#[test]
fn empty_suspension_returns_none() {
let susp = m(&[]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none(), "no suspension record → None");
}
#[test]
fn stale_prior_attempt_returns_none() {
let wp = WaitpointId::new();
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
assert!(out.is_none(), "attempt_index mismatch → None");
}
#[test]
fn non_resumed_close_returns_none() {
let wp = WaitpointId::new();
for reason in ["timeout", "cancelled", "", "expired"] {
let susp = m(&[
("attempt_index", "0"),
("close_reason", reason),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none(), "close_reason={reason:?} must not return signals");
}
}
#[test]
fn resumed_same_attempt_returns_waitpoint() {
let wp = WaitpointId::new();
let susp = m(&[
("attempt_index", "2"),
("close_reason", "resumed"),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
assert_eq!(out, Some(wp));
}
#[test]
fn malformed_waitpoint_id_is_error() {
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", "not-a-uuid"),
]);
let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
assert!(
format!("{err}").contains("not a valid UUID"),
"error should mention invalid UUID, got: {err}"
);
}
#[test]
fn empty_waitpoint_id_returns_none() {
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", ""),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none());
}
}
#[cfg(test)]
mod terminal_replay_parsing_tests {
use super::*;
use ferriskey::Value;
fn bulk(s: &str) -> Value {
Value::SimpleString(s.to_owned())
}
#[test]
fn parse_success_result_extracts_all_four_detail_slots() {
let raw = Value::Array(vec![
Ok(Value::Int(0)),
Ok(bulk("execution_not_active")),
Ok(bulk("success")),
Ok(bulk("42")),
Ok(bulk("terminal")),
Ok(bulk("11111111-1111-1111-1111-111111111111")),
]);
let err = parse_success_result(&raw, "test").unwrap_err();
let unboxed = match err {
SdkError::Engine(b) => *b,
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
};
match unboxed {
crate::EngineError::Contention(
crate::ContentionKind::ExecutionNotActive {
terminal_outcome,
lease_epoch,
lifecycle_phase,
attempt_id,
},
) => {
assert_eq!(terminal_outcome, "success");
assert_eq!(lease_epoch, "42");
assert_eq!(lifecycle_phase, "terminal");
assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
}
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
}
}
#[test]
fn parse_fail_result_extracts_all_four_detail_slots() {
let raw = Value::Array(vec![
Ok(Value::Int(0)),
Ok(bulk("execution_not_active")),
Ok(bulk("none")),
Ok(bulk("7")),
Ok(bulk("runnable")),
Ok(bulk("22222222-2222-2222-2222-222222222222")),
]);
let err = parse_fail_result(&raw).unwrap_err();
let unboxed = match err {
SdkError::Engine(b) => *b,
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
};
match unboxed {
crate::EngineError::Contention(
crate::ContentionKind::ExecutionNotActive {
terminal_outcome,
lease_epoch,
lifecycle_phase,
attempt_id,
},
) => {
assert_eq!(terminal_outcome, "none");
assert_eq!(lease_epoch, "7");
assert_eq!(lifecycle_phase, "runnable");
assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
}
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
}
}
#[test]
fn parse_success_result_missing_slots_defaults_to_empty() {
let raw = Value::Array(vec![
Ok(Value::Int(0)),
Ok(bulk("execution_not_active")),
]);
let err = parse_success_result(&raw, "test").unwrap_err();
let unboxed = match err {
SdkError::Engine(b) => *b,
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
};
match unboxed {
crate::EngineError::Contention(
crate::ContentionKind::ExecutionNotActive {
terminal_outcome,
lease_epoch,
lifecycle_phase,
attempt_id,
},
) => {
assert_eq!(terminal_outcome, "");
assert_eq!(lease_epoch, "");
assert_eq!(lifecycle_phase, "");
assert_eq!(attempt_id, "");
}
other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
}
}
}