use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use ferriskey::{Client, Value};
use ff_core::contracts::ReportUsageResult;
use ff_script::error::ScriptError;
use ff_core::keys::{usage_dedup_key, BudgetKeyContext, ExecKeyContext, IndexKeys};
use ff_core::partition::{budget_partition, execution_partition, PartitionConfig};
use ff_core::types::*;
use tokio::sync::{Notify, OwnedSemaphorePermit};
use tokio::task::JoinHandle;
use crate::SdkError;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum TimeoutBehavior {
Fail,
Cancel,
Expire,
AutoResume,
Escalate,
}
impl TimeoutBehavior {
pub fn as_str(&self) -> &str {
match self {
Self::Fail => "fail",
Self::Cancel => "cancel",
Self::Expire => "expire",
Self::AutoResume => "auto_resume_with_timeout_signal",
Self::Escalate => "escalate",
}
}
}
impl std::fmt::Display for TimeoutBehavior {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl std::str::FromStr for TimeoutBehavior {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"fail" => Ok(Self::Fail),
"cancel" => Ok(Self::Cancel),
"expire" => Ok(Self::Expire),
"auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
"escalate" => Ok(Self::Escalate),
other => Err(format!("unknown timeout behavior: {other}")),
}
}
}
#[derive(Clone, Debug)]
pub struct ConditionMatcher {
pub signal_name: String,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SuspendOutcome {
Suspended {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
AlreadySatisfied {
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
waitpoint_token: WaitpointToken,
},
}
#[derive(Clone, Debug)]
pub struct Signal {
pub signal_name: String,
pub signal_category: String,
pub payload: Option<Vec<u8>>,
pub source_type: String,
pub source_identity: String,
pub idempotency_key: Option<String>,
pub waitpoint_token: WaitpointToken,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SignalOutcome {
Accepted { signal_id: SignalId, effect: String },
TriggeredResume { signal_id: SignalId },
Duplicate { existing_signal_id: String },
}
impl SignalOutcome {
pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
parse_signal_result(raw)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ResumeSignal {
pub signal_id: SignalId,
pub signal_name: String,
pub signal_category: String,
pub source_type: String,
pub source_identity: String,
pub correlation_id: String,
pub accepted_at: TimestampMs,
pub payload: Option<Vec<u8>>,
}
#[derive(Clone, Debug)]
pub struct AppendFrameOutcome {
pub stream_id: String,
pub frame_count: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FailOutcome {
RetryScheduled {
delay_until: TimestampMs,
},
TerminalFailed,
}
pub struct ClaimedTask {
client: Client,
partition_config: PartitionConfig,
execution_id: ExecutionId,
attempt_index: AttemptIndex,
attempt_id: AttemptId,
lease_id: LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
lane_id: LaneId,
worker_instance_id: WorkerInstanceId,
input_payload: Vec<u8>,
execution_kind: String,
tags: HashMap<String, String>,
renewal_handle: JoinHandle<()>,
renewal_stop: Arc<Notify>,
renewal_failures: Arc<AtomicU32>,
terminal_op_called: AtomicBool,
_concurrency_permit: Option<OwnedSemaphorePermit>,
}
impl ClaimedTask {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
client: Client,
partition_config: PartitionConfig,
execution_id: ExecutionId,
attempt_index: AttemptIndex,
attempt_id: AttemptId,
lease_id: LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
lane_id: LaneId,
worker_instance_id: WorkerInstanceId,
input_payload: Vec<u8>,
execution_kind: String,
tags: HashMap<String, String>,
) -> Self {
let renewal_stop = Arc::new(Notify::new());
let renewal_failures = Arc::new(AtomicU32::new(0));
let renewal_handle = spawn_renewal_task(
client.clone(),
partition_config,
execution_id.clone(),
attempt_index,
attempt_id.clone(),
lease_id.clone(),
lease_epoch,
lease_ttl_ms,
renewal_stop.clone(),
renewal_failures.clone(),
);
Self {
client,
partition_config,
execution_id,
attempt_index,
attempt_id,
lease_id,
lease_epoch,
lease_ttl_ms,
lane_id,
worker_instance_id,
input_payload,
execution_kind,
tags,
renewal_handle,
renewal_stop,
renewal_failures,
terminal_op_called: AtomicBool::new(false),
_concurrency_permit: None,
}
}
#[allow(dead_code)]
pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
self._concurrency_permit = Some(permit);
}
pub fn execution_id(&self) -> &ExecutionId {
&self.execution_id
}
pub fn attempt_index(&self) -> AttemptIndex {
self.attempt_index
}
pub fn attempt_id(&self) -> &AttemptId {
&self.attempt_id
}
pub fn lease_id(&self) -> &LeaseId {
&self.lease_id
}
pub fn lease_epoch(&self) -> LeaseEpoch {
self.lease_epoch
}
pub fn input_payload(&self) -> &[u8] {
&self.input_payload
}
pub fn execution_kind(&self) -> &str {
&self.execution_kind
}
pub fn tags(&self) -> &HashMap<String, String> {
&self.tags
}
pub fn lane_id(&self) -> &LaneId {
&self.lane_id
}
pub fn is_lease_healthy(&self) -> bool {
self.renewal_failures.load(Ordering::Relaxed) < 3
}
pub fn consecutive_renewal_failures(&self) -> u32 {
self.renewal_failures.load(Ordering::Relaxed)
}
pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let keys: Vec<String> = vec![
ctx.core(),
ctx.attempt_hash(self.attempt_index),
ctx.lease_current(),
ctx.lease_history(),
idx.lease_expiry(),
idx.worker_leases(&self.worker_instance_id),
idx.lane_active(&self.lane_id),
idx.lane_delayed(&self.lane_id),
idx.attempt_timeout(),
];
let args: Vec<String> = vec![
self.execution_id.to_string(),
self.lease_id.to_string(),
self.lease_epoch.to_string(),
self.attempt_id.to_string(),
delay_until.to_string(),
];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_delay_execution", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
self.stop_renewal();
parse_success_result(&raw, "ff_delay_execution")
}
pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let keys: Vec<String> = vec![
ctx.core(),
ctx.attempt_hash(self.attempt_index),
ctx.lease_current(),
ctx.lease_history(),
idx.lease_expiry(),
idx.worker_leases(&self.worker_instance_id),
idx.lane_active(&self.lane_id),
idx.lane_blocked_dependencies(&self.lane_id),
idx.attempt_timeout(),
];
let args: Vec<String> = vec![
self.execution_id.to_string(),
self.lease_id.to_string(),
self.lease_epoch.to_string(),
self.attempt_id.to_string(),
];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_move_to_waiting_children", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
self.stop_renewal();
parse_success_result(&raw, "ff_move_to_waiting_children")
}
pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let keys: Vec<String> = vec![
ctx.core(), ctx.attempt_hash(self.attempt_index), idx.lease_expiry(), idx.worker_leases(&self.worker_instance_id), idx.lane_terminal(&self.lane_id), ctx.lease_current(), ctx.lease_history(), idx.lane_active(&self.lane_id), ctx.stream_meta(self.attempt_index), ctx.result(), idx.attempt_timeout(), idx.execution_deadline(), ];
let result_bytes = result_payload.unwrap_or_default();
let result_str = String::from_utf8_lossy(&result_bytes);
let args: Vec<String> = vec![
self.execution_id.to_string(),
self.lease_id.to_string(),
self.lease_epoch.to_string(),
self.attempt_id.to_string(),
result_str.into_owned(),
];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_complete_execution", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
self.stop_renewal();
parse_success_result(&raw, "ff_complete_execution")
}
pub async fn fail(
self,
reason: &str,
error_category: &str,
) -> Result<FailOutcome, SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let keys: Vec<String> = vec![
ctx.core(),
ctx.attempt_hash(self.attempt_index),
idx.lease_expiry(),
idx.worker_leases(&self.worker_instance_id),
idx.lane_terminal(&self.lane_id),
idx.lane_delayed(&self.lane_id),
ctx.lease_current(),
ctx.lease_history(),
idx.lane_active(&self.lane_id),
ctx.stream_meta(self.attempt_index),
idx.attempt_timeout(),
idx.execution_deadline(),
];
let retry_policy_json = self.read_retry_policy_json(&ctx).await?;
let args: Vec<String> = vec![
self.execution_id.to_string(),
self.lease_id.to_string(),
self.lease_epoch.to_string(),
self.attempt_id.to_string(),
reason.to_owned(),
error_category.to_owned(),
retry_policy_json,
];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_fail_execution", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
self.stop_renewal();
parse_fail_result(&raw)
}
pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
self.cancel_inner(reason).await
}
async fn cancel_inner(self, reason: &str) -> Result<(), SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let wp_id_str: Option<String> = self
.client
.hget(&ctx.core(), "current_waitpoint_id")
.await
.map_err(|e| SdkError::ValkeyContext { source: e, context: "read current_waitpoint_id".into() })?;
let wp_id = match wp_id_str.as_deref().filter(|s| !s.is_empty()) {
Some(s) => match WaitpointId::parse(s) {
Ok(id) => id,
Err(e) => {
tracing::warn!(
execution_id = %self.execution_id,
raw = %s,
error = %e,
"corrupt waitpoint_id in exec_core, using placeholder"
);
WaitpointId::new()
}
},
None => WaitpointId::default(),
};
let keys: Vec<String> = vec![
ctx.core(), ctx.attempt_hash(self.attempt_index), ctx.stream_meta(self.attempt_index), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&self.worker_instance_id), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&self.lane_id), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&self.lane_id), idx.lane_delayed(&self.lane_id), idx.lane_blocked_dependencies(&self.lane_id), idx.lane_blocked_budget(&self.lane_id), idx.lane_blocked_quota(&self.lane_id), idx.lane_blocked_route(&self.lane_id), idx.lane_blocked_operator(&self.lane_id), ];
let args: Vec<String> = vec![
self.execution_id.to_string(), reason.to_owned(), "worker".to_owned(), self.lease_id.to_string(), self.lease_epoch.to_string(), ];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_cancel_execution", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
self.stop_renewal();
parse_success_result(&raw, "ff_cancel_execution")
}
pub async fn renew_lease(&self) -> Result<(), SdkError> {
renew_lease_inner(
&self.client,
&self.partition_config,
&self.execution_id,
self.attempt_index,
&self.attempt_id,
&self.lease_id,
self.lease_epoch,
self.lease_ttl_ms,
)
.await
}
pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let keys: Vec<String> = vec![ctx.core()];
let args: Vec<String> = vec![
self.execution_id.to_string(),
self.lease_id.to_string(),
self.lease_epoch.to_string(),
pct.to_string(),
message.to_owned(),
];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_update_progress", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
parse_success_result(&raw, "ff_update_progress")
}
pub async fn report_usage(
&self,
budget_id: &BudgetId,
dimensions: &[(&str, u64)],
dedup_key: Option<&str>,
) -> Result<ReportUsageResult, SdkError> {
let partition = budget_partition(budget_id, &self.partition_config);
let bctx = BudgetKeyContext::new(&partition, budget_id);
let keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
let now = TimestampMs::now();
let dim_count = dimensions.len();
let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
argv.push(dim_count.to_string());
for (dim, _) in dimensions {
argv.push((*dim).to_string());
}
for (_, delta) in dimensions {
argv.push(delta.to_string());
}
argv.push(now.to_string());
let dedup_key_val = dedup_key
.filter(|k| !k.is_empty())
.map(|k| usage_dedup_key(bctx.hash_tag(), k))
.unwrap_or_default();
argv.push(dedup_key_val);
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_report_usage_and_check", &key_refs, &argv_refs)
.await
.map_err(SdkError::Valkey)?;
parse_report_usage_result(&raw)
}
pub async fn create_pending_waitpoint(
&self,
waitpoint_key: &str,
expires_in_ms: u64,
) -> Result<(WaitpointId, WaitpointToken), SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let waitpoint_id = WaitpointId::new();
let expires_at = TimestampMs::from_millis(TimestampMs::now().0 + expires_in_ms as i64);
let keys: Vec<String> = vec![
ctx.core(),
ctx.waitpoint(&waitpoint_id),
idx.pending_waitpoint_expiry(),
idx.waitpoint_hmac_secrets(),
];
let args: Vec<String> = vec![
self.execution_id.to_string(),
self.attempt_index.to_string(),
waitpoint_id.to_string(),
waitpoint_key.to_owned(),
expires_at.to_string(),
];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_create_pending_waitpoint", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
let token = extract_pending_waitpoint_token(&raw)?;
Ok((waitpoint_id, token))
}
pub async fn append_frame(
&self,
frame_type: &str,
payload: &[u8],
metadata: Option<&str>,
) -> Result<AppendFrameOutcome, SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let now = TimestampMs::now();
let keys: Vec<String> = vec![
ctx.core(),
ctx.stream(self.attempt_index),
ctx.stream_meta(self.attempt_index),
];
let payload_str = String::from_utf8_lossy(payload);
let args: Vec<String> = vec![
self.execution_id.to_string(), self.attempt_index.to_string(), self.lease_id.to_string(), self.lease_epoch.to_string(), frame_type.to_owned(), now.to_string(), payload_str.into_owned(), "utf8".to_owned(), metadata.unwrap_or("").to_owned(), "worker".to_owned(), "10000".to_owned(), self.attempt_id.to_string(), "65536".to_owned(), ];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_append_frame", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
parse_append_frame_result(&raw)
}
pub async fn suspend(
self,
reason_code: &str,
condition_matchers: &[ConditionMatcher],
timeout_ms: Option<u64>,
timeout_behavior: TimeoutBehavior,
) -> Result<SuspendOutcome, SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let idx = IndexKeys::new(&partition);
let suspension_id = SuspensionId::new();
let waitpoint_id = WaitpointId::new();
let waitpoint_key = format!("wpk:{}", waitpoint_id);
let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
let required_signal_names: Vec<&str> = condition_matchers
.iter()
.map(|m| m.signal_name.as_str())
.collect();
let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
let resume_condition_json = serde_json::json!({
"condition_type": "signal_set",
"required_signal_names": required_signal_names,
"signal_match_mode": match_mode,
"minimum_signal_count": 1,
"timeout_behavior": timeout_behavior.as_str(),
"allow_operator_override": true,
}).to_string();
let resume_policy_json = serde_json::json!({
"resume_target": "runnable",
"close_waitpoint_on_resume": true,
"consume_matched_signals": true,
"retain_signal_buffer_until_closed": true,
}).to_string();
let keys: Vec<String> = vec![
ctx.core(), ctx.attempt_hash(self.attempt_index), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&self.worker_instance_id), ctx.suspension_current(), ctx.waitpoint(&waitpoint_id), ctx.waitpoint_signals(&waitpoint_id), idx.suspension_timeout(), idx.pending_waitpoint_expiry(), idx.lane_active(&self.lane_id), idx.lane_suspended(&self.lane_id), ctx.waitpoints(), ctx.waitpoint_condition(&waitpoint_id), idx.attempt_timeout(), idx.waitpoint_hmac_secrets(), ];
let args: Vec<String> = vec![
self.execution_id.to_string(), self.attempt_index.to_string(), self.attempt_id.to_string(), self.lease_id.to_string(), self.lease_epoch.to_string(), suspension_id.to_string(), waitpoint_id.to_string(), waitpoint_key.clone(), reason_code.to_owned(), "worker".to_owned(), timeout_at.map_or(String::new(), |t| t.to_string()), resume_condition_json, resume_policy_json, String::new(), String::new(), timeout_behavior.as_str().to_owned(), "1000".to_owned(), ];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = self
.client
.fcall("ff_suspend_execution", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
self.stop_renewal();
parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
}
pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
let partition = execution_partition(&self.execution_id, &self.partition_config);
let ctx = ExecKeyContext::new(&partition, &self.execution_id);
let susp: HashMap<String, String> = self
.client
.hgetall(&ctx.suspension_current())
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "HGETALL suspension_current".into(),
})?;
let Some(waitpoint_id) =
resume_waitpoint_id_from_suspension(&susp, self.attempt_index)?
else {
return Ok(Vec::new());
};
let wp_cond_key = ctx.waitpoint_condition(&waitpoint_id);
let total_str: Option<String> = self
.client
.hget(&wp_cond_key, "total_matchers")
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "HGET total_matchers".into(),
})?;
let total: usize = total_str
.as_deref()
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let mut signal_ids: Vec<SignalId> = Vec::new();
for i in 0..total {
let fields: Vec<Option<String>> = self
.client
.cmd("HMGET")
.arg(&wp_cond_key)
.arg(format!("matcher:{i}:satisfied"))
.arg(format!("matcher:{i}:signal_id"))
.execute()
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "HMGET matcher slot".into(),
})?;
let satisfied = fields.first().and_then(|o| o.as_deref());
if satisfied != Some("1") {
continue;
}
let Some(raw) = fields.get(1).and_then(|o| o.as_deref()).filter(|s| !s.is_empty())
else {
continue;
};
match SignalId::parse(raw) {
Ok(sid) => signal_ids.push(sid),
Err(e) => {
tracing::warn!(
execution_id = %self.execution_id,
waitpoint_id = %waitpoint_id,
raw = %raw,
error = %e,
"resume_signals: matcher signal_id failed to parse, skipping"
);
}
}
}
let mut out: Vec<ResumeSignal> = Vec::with_capacity(signal_ids.len());
for signal_id in signal_ids {
let sig: HashMap<String, String> = self
.client
.hgetall(&ctx.signal(&signal_id))
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "HGETALL signal_hash".into(),
})?;
if sig.is_empty() {
continue;
}
let payload_raw: Option<Value> = self
.client
.cmd("GET")
.arg(ctx.signal_payload(&signal_id))
.execute()
.await
.map_err(|e| SdkError::ValkeyContext {
source: e,
context: "GET signal_payload".into(),
})?;
let payload: Option<Vec<u8>> = match payload_raw {
Some(Value::BulkString(b)) => Some(b.to_vec()),
Some(Value::SimpleString(s)) => Some(s.into_bytes()),
_ => None,
};
let accepted_at = sig
.get("accepted_at")
.and_then(|s| s.parse::<i64>().ok())
.map(TimestampMs::from_millis)
.unwrap_or_else(|| TimestampMs::from_millis(0));
out.push(ResumeSignal {
signal_id,
signal_name: sig.get("signal_name").cloned().unwrap_or_default(),
signal_category: sig.get("signal_category").cloned().unwrap_or_default(),
source_type: sig.get("source_type").cloned().unwrap_or_default(),
source_identity: sig.get("source_identity").cloned().unwrap_or_default(),
correlation_id: sig.get("correlation_id").cloned().unwrap_or_default(),
accepted_at,
payload,
});
}
Ok(out)
}
fn stop_renewal(&self) {
self.terminal_op_called.store(true, Ordering::Release);
self.renewal_stop.notify_one();
}
async fn read_retry_policy_json(&self, ctx: &ExecKeyContext) -> Result<String, SdkError> {
let policy_str: Option<String> = self
.client
.get(&ctx.policy())
.await
.map_err(|e| SdkError::ValkeyContext { source: e, context: "read retry policy".into() })?;
match policy_str {
Some(json) => {
match serde_json::from_str::<serde_json::Value>(&json) {
Ok(policy) => {
if let Some(retry) = policy.get("retry_policy") {
return Ok(serde_json::to_string(retry).unwrap_or_default());
}
Ok(String::new())
}
Err(e) => {
tracing::warn!(
execution_id = %self.execution_id,
error = %e,
"malformed retry policy JSON, treating as no policy"
);
Ok(String::new())
}
}
}
None => Ok(String::new()),
}
}
}
impl Drop for ClaimedTask {
fn drop(&mut self) {
if !self.terminal_op_called.load(Ordering::Acquire) {
tracing::warn!(
execution_id = %self.execution_id,
"ClaimedTask dropped without terminal operation — lease will expire"
);
}
self.renewal_handle.abort();
}
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "renew_lease",
skip_all,
fields(execution_id = %execution_id)
)]
async fn renew_lease_inner(
client: &Client,
partition_config: &PartitionConfig,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
attempt_id: &AttemptId,
lease_id: &LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
) -> Result<(), SdkError> {
let partition = execution_partition(execution_id, partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let idx = IndexKeys::new(&partition);
let keys: Vec<String> = vec![
ctx.core(),
ctx.lease_current(),
ctx.lease_history(),
idx.lease_expiry(),
];
let lease_history_grace_ms = 5000_u64; let args: Vec<String> = vec![
execution_id.to_string(),
attempt_index.to_string(),
attempt_id.to_string(),
lease_id.to_string(),
lease_epoch.to_string(),
lease_ttl_ms.to_string(),
lease_history_grace_ms.to_string(),
];
let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
let raw: Value = client
.fcall("ff_renew_lease", &key_refs, &arg_refs)
.await
.map_err(SdkError::Valkey)?;
parse_success_result(&raw, "ff_renew_lease")
}
#[allow(clippy::too_many_arguments, dead_code)]
fn spawn_renewal_task(
client: Client,
partition_config: PartitionConfig,
execution_id: ExecutionId,
attempt_index: AttemptIndex,
attempt_id: AttemptId,
lease_id: LeaseId,
lease_epoch: LeaseEpoch,
lease_ttl_ms: u64,
stop_signal: Arc<Notify>,
failure_counter: Arc<AtomicU32>,
) -> JoinHandle<()> {
let interval = Duration::from_millis(lease_ttl_ms / 3);
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
tick.tick().await;
loop {
tokio::select! {
_ = stop_signal.notified() => {
tracing::debug!(
execution_id = %execution_id,
"lease renewal stopped by signal"
);
return;
}
_ = tick.tick() => {
match renew_lease_inner(
&client,
&partition_config,
&execution_id,
attempt_index,
&attempt_id,
&lease_id,
lease_epoch,
lease_ttl_ms,
)
.await
{
Ok(()) => {
failure_counter.store(0, Ordering::Relaxed);
tracing::trace!(
execution_id = %execution_id,
"lease renewed"
);
}
Err(SdkError::Script(ref e)) if is_terminal_renewal_error(e) => {
failure_counter.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
execution_id = %execution_id,
error = %e,
"lease renewal failed with terminal error, stopping renewal"
);
return;
}
Err(e) => {
let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
tracing::warn!(
execution_id = %execution_id,
error = %e,
consecutive_failures = count,
"lease renewal failed (will retry next interval)"
);
}
}
}
}
}
})
}
#[allow(dead_code)]
fn is_terminal_renewal_error(err: &ScriptError) -> bool {
matches!(
err,
ScriptError::StaleLease
| ScriptError::LeaseExpired
| ScriptError::LeaseRevoked
| ScriptError::ExecutionNotActive
| ScriptError::ExecutionNotFound
)
}
pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_report_usage_and_check: expected Array".into(),
)));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_report_usage_and_check: expected Int status code".into(),
)));
}
};
if status_code != 1 {
let error_code = usage_field_str(arr, 1);
let detail = usage_field_str(arr, 2);
return Err(SdkError::Script(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse(format!("ff_report_usage_and_check: {error_code}"))
}),
));
}
let sub_status = usage_field_str(arr, 1);
match sub_status.as_str() {
"OK" => Ok(ReportUsageResult::Ok),
"ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
"SOFT_BREACH" => {
let dim = usage_field_str(arr, 2);
let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
}
"HARD_BREACH" => {
let dim = usage_field_str(arr, 2);
let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
Ok(ReportUsageResult::HardBreach {
dimension: dim,
current_usage: current,
hard_limit: limit,
})
}
_ => Err(SdkError::Script(ScriptError::Parse(format!(
"ff_report_usage_and_check: unknown sub-status: {sub_status}"
)))),
}
}
fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
match arr.get(index) {
Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
Some(Ok(Value::SimpleString(s))) => s.clone(),
Some(Ok(Value::Int(n))) => n.to_string(),
_ => String::new(),
}
}
fn parse_usage_u64(
arr: &[Result<Value, ferriskey::Error>],
index: usize,
sub_status: &str,
field_name: &str,
) -> Result<u64, SdkError> {
match arr.get(index) {
Some(Ok(Value::Int(n))) => {
u64::try_from(*n).map_err(|_| {
SdkError::Script(ScriptError::Parse(format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) negative int {n} cannot be u64"
)))
})
}
Some(Ok(Value::BulkString(b))) => {
let s = String::from_utf8_lossy(b);
s.parse::<u64>().map_err(|_| {
SdkError::Script(ScriptError::Parse(format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) not a u64 string: {s:?}"
)))
})
}
Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
SdkError::Script(ScriptError::Parse(format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) not a u64 string: {s:?}"
)))
}),
Some(_) => Err(SdkError::Script(ScriptError::Parse(format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) wrong wire type (expected Int or String)"
)))),
None => Err(SdkError::Script(ScriptError::Parse(format!(
"ff_report_usage_and_check {sub_status}: {field_name} \
(index {index}) missing from response"
)))),
}
}
fn extract_pending_waitpoint_token(raw: &Value) -> Result<WaitpointToken, SdkError> {
parse_success_result(raw, "ff_create_pending_waitpoint")?;
let arr = match raw {
Value::Array(arr) => arr,
_ => unreachable!("parse_success_result would have rejected non-array"),
};
let token_str = arr
.get(4)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.ok_or_else(|| {
SdkError::Script(ScriptError::Parse(
"ff_create_pending_waitpoint: missing waitpoint_token in response".into(),
))
})?;
Ok(WaitpointToken::new(token_str))
}
fn resume_waitpoint_id_from_suspension(
susp: &HashMap<String, String>,
claimed_attempt: AttemptIndex,
) -> Result<Option<WaitpointId>, SdkError> {
if susp.is_empty() {
return Ok(None);
}
let susp_att: u32 = susp
.get("attempt_index")
.and_then(|s| s.parse().ok())
.unwrap_or(u32::MAX);
if susp_att != claimed_attempt.0 {
return Ok(None);
}
let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
if close_reason != "resumed" {
return Ok(None);
}
let wp_id_str = susp
.get("waitpoint_id")
.map(String::as_str)
.unwrap_or_default();
if wp_id_str.is_empty() {
return Ok(None);
}
let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
SdkError::Script(ScriptError::Parse(format!(
"resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
)))
})?;
Ok(Some(waitpoint_id))
}
pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::Script(ScriptError::Parse(format!(
"{function_name}: expected Array, got non-array"
))));
}
};
if arr.is_empty() {
return Err(SdkError::Script(ScriptError::Parse(format!(
"{function_name}: empty result array"
))));
}
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::Script(ScriptError::Parse(format!(
"{function_name}: expected Int at index 0"
))));
}
};
if status_code == 1 {
Ok(())
} else {
let field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = field_str(2);
let script_err = ScriptError::from_code_with_detail(&error_code, &detail)
.unwrap_or_else(|| {
ScriptError::Parse(format!("{function_name}: unknown error: {error_code}"))
});
Err(SdkError::Script(script_err))
}
}
fn parse_suspend_result(
raw: &Value,
suspension_id: SuspensionId,
waitpoint_id: WaitpointId,
waitpoint_key: String,
) -> Result<SuspendOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_suspend_execution: expected Array".into(),
)));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_suspend_execution: bad status code".into(),
)));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = err_field_str(2);
return Err(SdkError::Script(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse(format!("ff_suspend_execution: {error_code}"))
}),
));
}
let sub_status = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let waitpoint_token = arr
.get(5)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.map(WaitpointToken::new)
.ok_or_else(|| {
SdkError::Script(ScriptError::Parse(
"ff_suspend_execution: missing waitpoint_token in response".into(),
))
})?;
if sub_status == "ALREADY_SATISFIED" {
Ok(SuspendOutcome::AlreadySatisfied {
suspension_id,
waitpoint_id,
waitpoint_key,
waitpoint_token,
})
} else {
Ok(SuspendOutcome::Suspended {
suspension_id,
waitpoint_id,
waitpoint_key,
waitpoint_token,
})
}
}
pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_deliver_signal: expected Array".into(),
)));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_deliver_signal: bad status code".into(),
)));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = err_field_str(2);
return Err(SdkError::Script(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse(format!("ff_deliver_signal: {error_code}"))
}),
));
}
let sub_status = arr
.get(1)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
if sub_status == "DUPLICATE" {
let existing_id = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
return Ok(SignalOutcome::Duplicate {
existing_signal_id: existing_id,
});
}
let signal_id_str = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let effect = arr
.get(3)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
SdkError::Script(ScriptError::Parse(format!(
"ff_deliver_signal: invalid signal_id from Lua: {e}"
)))
})?;
if effect == "resume_condition_satisfied" {
Ok(SignalOutcome::TriggeredResume { signal_id })
} else {
Ok(SignalOutcome::Accepted { signal_id, effect })
}
}
fn parse_append_frame_result(raw: &Value) -> Result<AppendFrameOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_append_frame: expected Array".into(),
)));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_append_frame: bad status code".into(),
)));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = err_field_str(2);
return Err(SdkError::Script(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse(format!("ff_append_frame: {error_code}"))
}),
));
}
let stream_id = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
let frame_count = arr
.get(3)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => String::from_utf8_lossy(b).parse::<u64>().ok(),
Ok(Value::SimpleString(s)) => s.parse::<u64>().ok(),
Ok(Value::Int(n)) => Some(*n as u64),
_ => None,
})
.unwrap_or(0);
Ok(AppendFrameOutcome {
stream_id,
frame_count,
})
}
fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
let arr = match raw {
Value::Array(arr) => arr,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_fail_execution: expected Array".into(),
)));
}
};
let status_code = match arr.first() {
Some(Ok(Value::Int(n))) => *n,
_ => {
return Err(SdkError::Script(ScriptError::Parse(
"ff_fail_execution: bad status code".into(),
)));
}
};
if status_code != 1 {
let err_field_str = |idx: usize| -> String {
arr.get(idx)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default()
};
let error_code = {
let s = err_field_str(1);
if s.is_empty() { "unknown".to_owned() } else { s }
};
let detail = err_field_str(2);
return Err(SdkError::Script(
ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
ScriptError::Parse(format!("ff_fail_execution: {error_code}"))
}),
));
}
let sub_status = arr
.get(2)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::SimpleString(s)) => Some(s.clone()),
_ => None,
})
.unwrap_or_default();
match sub_status.as_str() {
"retry_scheduled" => {
let delay_str = arr
.get(3)
.and_then(|v| match v {
Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
Ok(Value::Int(n)) => Some(n.to_string()),
_ => None,
})
.unwrap_or_default();
let delay_until = delay_str.parse::<i64>().unwrap_or(0);
Ok(FailOutcome::RetryScheduled {
delay_until: TimestampMs::from_millis(delay_until),
})
}
"terminal_failed" => Ok(FailOutcome::TerminalFailed),
_ => Err(SdkError::Script(ScriptError::Parse(format!(
"ff_fail_execution: unexpected sub-status: {sub_status}"
)))),
}
}
pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
pub use ff_core::contracts::STREAM_READ_HARD_CAP;
pub use ff_core::contracts::StreamFrames;
fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
if count_limit == 0 {
return Err(SdkError::Config("count_limit must be >= 1".to_owned()));
}
if count_limit > STREAM_READ_HARD_CAP {
return Err(SdkError::Config(format!(
"count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
)));
}
Ok(())
}
pub async fn read_stream(
client: &Client,
partition_config: &PartitionConfig,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from_id: &str,
to_id: &str,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
validate_stream_read_count(count_limit)?;
let partition = execution_partition(execution_id, partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
let args = ReadFramesArgs {
execution_id: execution_id.clone(),
attempt_index,
from_id: from_id.to_owned(),
to_id: to_id.to_owned(),
count_limit,
};
let ReadFramesResult::Frames(f) =
ff_script::functions::stream::ff_read_attempt_stream(client, &keys, &args)
.await
.map_err(SdkError::Script)?;
Ok(f)
}
pub async fn tail_stream(
client: &Client,
partition_config: &PartitionConfig,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
last_id: &str,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, SdkError> {
if block_ms > MAX_TAIL_BLOCK_MS {
return Err(SdkError::Config(format!(
"block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
)));
}
validate_stream_read_count(count_limit)?;
let partition = execution_partition(execution_id, partition_config);
let ctx = ExecKeyContext::new(&partition, execution_id);
let stream_key = ctx.stream(attempt_index);
let stream_meta_key = ctx.stream_meta(attempt_index);
ff_script::stream_tail::xread_block(
client,
&stream_key,
&stream_meta_key,
last_id,
block_ms,
count_limit,
)
.await
.map_err(SdkError::Script)
}
#[cfg(test)]
mod parse_report_usage_result_tests {
use super::*;
fn s(v: &str) -> Result<Value, ferriskey::Error> {
Ok(Value::SimpleString(v.to_owned()))
}
fn int(n: i64) -> Result<Value, ferriskey::Error> {
Ok(Value::Int(n))
}
fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
Value::Array(items)
}
#[test]
fn ok_status() {
let raw = arr(vec![int(1), s("OK")]);
assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
}
#[test]
fn already_applied_status() {
let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
assert_eq!(
parse_report_usage_result(&raw).unwrap(),
ReportUsageResult::AlreadyApplied
);
}
#[test]
fn soft_breach_status() {
let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
match parse_report_usage_result(&raw).unwrap() {
ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
assert_eq!(dimension, "tokens");
assert_eq!(current_usage, 150);
assert_eq!(soft_limit, 100);
}
other => panic!("expected SoftBreach, got {other:?}"),
}
}
#[test]
fn hard_breach_status() {
let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
match parse_report_usage_result(&raw).unwrap() {
ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
assert_eq!(dimension, "requests");
assert_eq!(current_usage, 10001);
assert_eq!(hard_limit, 10000);
}
other => panic!("expected HardBreach, got {other:?}"),
}
}
#[test]
fn non_array_input_is_parse_error() {
let raw = Value::SimpleString("OK".to_owned());
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.to_lowercase().contains("expected array"),
"error should mention expected shape, got: {msg}"
);
}
#[test]
fn first_element_non_int_is_parse_error() {
let raw = arr(vec![s("not_an_int"), s("OK")]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.to_lowercase().contains("int"),
"error should mention Int status code, got: {msg}"
);
}
#[test]
fn soft_breach_non_numeric_current_is_parse_error() {
let raw = arr(vec![
int(1),
s("SOFT_BREACH"),
s("tokens"),
s("not_a_number"), s("100"),
]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
"error should identify sub-status + field, got: {msg}"
);
assert!(
msg.to_lowercase().contains("u64"),
"error should mention the expected type (u64), got: {msg}"
);
}
#[test]
fn hard_breach_missing_limit_is_parse_error() {
let raw = arr(vec![
int(1),
s("HARD_BREACH"),
s("requests"),
s("10001"),
]);
let err = parse_report_usage_result(&raw).unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
"error should identify sub-status + field, got: {msg}"
);
assert!(
msg.to_lowercase().contains("missing"),
"error should say 'missing', got: {msg}"
);
}
}
#[cfg(test)]
mod resume_signals_tests {
use super::*;
fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
}
#[test]
fn empty_suspension_returns_none() {
let susp = m(&[]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none(), "no suspension record → None");
}
#[test]
fn stale_prior_attempt_returns_none() {
let wp = WaitpointId::new();
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
assert!(out.is_none(), "attempt_index mismatch → None");
}
#[test]
fn non_resumed_close_returns_none() {
let wp = WaitpointId::new();
for reason in ["timeout", "cancelled", "", "expired"] {
let susp = m(&[
("attempt_index", "0"),
("close_reason", reason),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none(), "close_reason={reason:?} must not return signals");
}
}
#[test]
fn resumed_same_attempt_returns_waitpoint() {
let wp = WaitpointId::new();
let susp = m(&[
("attempt_index", "2"),
("close_reason", "resumed"),
("waitpoint_id", &wp.to_string()),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
assert_eq!(out, Some(wp));
}
#[test]
fn malformed_waitpoint_id_is_error() {
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", "not-a-uuid"),
]);
let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
assert!(
format!("{err}").contains("not a valid UUID"),
"error should mention invalid UUID, got: {err}"
);
}
#[test]
fn empty_waitpoint_id_returns_none() {
let susp = m(&[
("attempt_index", "0"),
("close_reason", "resumed"),
("waitpoint_id", ""),
]);
let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
assert!(out.is_none());
}
}