use crate::error::ScriptError;
use ff_core::contracts::*;
use ff_core::keys::{ExecKeyContext, IndexKeys};
use ff_core::state::PublicState;
use ff_core::types::*;
use crate::result::{FcallResult, FromFcallResult};
pub struct SuspendOpKeys<'a> {
pub ctx: &'a ExecKeyContext,
pub idx: &'a IndexKeys,
pub lane_id: &'a LaneId,
pub worker_instance_id: &'a WorkerInstanceId,
}
ff_function! {
pub ff_suspend_execution(args: SuspendExecutionArgs) -> SuspendExecutionResult {
keys(k: &SuspendOpKeys<'_>) {
k.ctx.core(), k.ctx.attempt_hash(args.attempt_index), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.suspension_current(), k.ctx.waitpoint(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.idx.suspension_timeout(), k.idx.pending_waitpoint_expiry(), k.idx.lane_active(k.lane_id), k.idx.lane_suspended(k.lane_id), k.ctx.waitpoints(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.idx.attempt_timeout(), k.idx.waitpoint_hmac_secrets(), }
argv {
args.execution_id.to_string(), args.attempt_index.to_string(), args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(), args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(), args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(), args.suspension_id.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), args.reason_code.clone(), args.requested_by.clone(), args.timeout_at.map(|t| t.to_string()).unwrap_or_default(), args.resume_condition_json.clone(), args.resume_policy_json.clone(), args.continuation_metadata_pointer.clone().unwrap_or_default(), if args.use_pending_waitpoint { "1".into() } else { String::new() }, args.timeout_behavior.clone(), "1000".to_string(), }
}
}
impl FromFcallResult for SuspendExecutionResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?;
if r.status == "ALREADY_SATISFIED" {
let sid = SuspensionId::parse(&r.field_str(0))
.map_err(|e| ScriptError::Parse {
fcall: "ff_suspend_execution".into(),
execution_id: None,
message: format!("bad suspension_id: {e}"),
})?;
let wid = WaitpointId::parse(&r.field_str(1))
.map_err(|e| ScriptError::Parse {
fcall: "ff_suspend_execution".into(),
execution_id: None,
message: format!("bad waitpoint_id: {e}"),
})?;
let wkey = r.field_str(2);
let token = WaitpointToken::new(r.field_str(3));
return Ok(SuspendExecutionResult::AlreadySatisfied {
suspension_id: sid,
waitpoint_id: wid,
waitpoint_key: wkey,
waitpoint_token: token,
});
}
let r = r.into_success()?;
let sid = SuspensionId::parse(&r.field_str(0))
.map_err(|e| ScriptError::Parse {
fcall: "ff_suspend_execution".into(),
execution_id: None,
message: format!("bad suspension_id: {e}"),
})?;
let wid = WaitpointId::parse(&r.field_str(1))
.map_err(|e| ScriptError::Parse {
fcall: "ff_suspend_execution".into(),
execution_id: None,
message: format!("bad waitpoint_id: {e}"),
})?;
let wkey = r.field_str(2);
let token = WaitpointToken::new(r.field_str(3));
Ok(SuspendExecutionResult::Suspended {
suspension_id: sid,
waitpoint_id: wid,
waitpoint_key: wkey,
waitpoint_token: token,
})
}
}
pub struct ResumeOpKeys<'a> {
pub ctx: &'a ExecKeyContext,
pub idx: &'a IndexKeys,
pub lane_id: &'a LaneId,
pub waitpoint_id: &'a WaitpointId,
}
ff_function! {
pub ff_resume_execution(args: ResumeExecutionArgs) -> ResumeExecutionResult {
keys(k: &ResumeOpKeys<'_>) {
k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_signals(k.waitpoint_id), k.idx.suspension_timeout(), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.lane_suspended(k.lane_id), }
argv {
args.execution_id.to_string(), args.trigger_type.clone(), args.resume_delay_ms.to_string(), }
}
}
impl FromFcallResult for ResumeExecutionResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let ps_str = r.field_str(0);
let public_state = parse_public_state(&ps_str)?;
Ok(ResumeExecutionResult::Resumed { public_state })
}
}
pub struct WaitpointOpKeys<'a> {
pub ctx: &'a ExecKeyContext,
pub idx: &'a IndexKeys,
}
ff_function! {
pub ff_create_pending_waitpoint(args: CreatePendingWaitpointArgs) -> CreatePendingWaitpointResult {
keys(k: &WaitpointOpKeys<'_>) {
k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
argv {
args.execution_id.to_string(), args.attempt_index.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), {
let now_ms = TimestampMs::now();
TimestampMs::from_millis(now_ms.0 + args.expires_in_ms as i64).to_string()
}, }
}
}
impl FromFcallResult for CreatePendingWaitpointResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let wid = WaitpointId::parse(&r.field_str(0))
.map_err(|e| ScriptError::Parse {
fcall: "ff_create_pending_waitpoint".into(),
execution_id: None,
message: format!("bad waitpoint_id: {e}"),
})?;
let wkey = r.field_str(1);
let token = WaitpointToken::new(r.field_str(2));
Ok(CreatePendingWaitpointResult::Created {
waitpoint_id: wid,
waitpoint_key: wkey,
waitpoint_token: token,
})
}
}
pub struct ExpireSuspensionOpKeys<'a> {
pub ctx: &'a ExecKeyContext,
pub idx: &'a IndexKeys,
pub lane_id: &'a LaneId,
pub waitpoint_id: &'a WaitpointId,
pub attempt_index: AttemptIndex,
}
ff_function! {
pub ff_expire_suspension(args: ExpireSuspensionArgs) -> ExpireSuspensionResult {
keys(k: &ExpireSuspensionOpKeys<'_>) {
k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_condition(k.waitpoint_id), k.ctx.attempt_hash(k.attempt_index), k.ctx.stream_meta(k.attempt_index), k.idx.suspension_timeout(), k.idx.lane_suspended(k.lane_id), k.idx.lane_terminal(k.lane_id), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.ctx.lease_history(), }
argv {
args.execution_id.to_string(), }
}
}
impl FromFcallResult for ExpireSuspensionResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let sub = r.field_str(0);
match sub.as_str() {
"not_found_cleaned"
| "not_suspended_cleaned"
| "no_active_suspension_cleaned"
| "not_yet_due" => Ok(ExpireSuspensionResult::AlreadySatisfied { reason: sub }),
"auto_resume" => Ok(ExpireSuspensionResult::Expired {
behavior_applied: "auto_resume".into(),
}),
"escalate" => Ok(ExpireSuspensionResult::Expired {
behavior_applied: "escalate".into(),
}),
_ => Ok(ExpireSuspensionResult::Expired {
behavior_applied: sub,
}),
}
}
}
ff_function! {
pub ff_close_waitpoint(args: CloseWaitpointArgs) -> CloseWaitpointResult {
keys(k: &WaitpointOpKeys<'_>) {
k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
argv {
args.waitpoint_id.to_string(), args.reason.clone(), }
}
}
impl FromFcallResult for CloseWaitpointResult {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let _r = FcallResult::parse(raw)?.into_success()?;
Ok(CloseWaitpointResult::Closed)
}
}
ff_function! {
pub ff_rotate_waitpoint_hmac_secret(args: RotateWaitpointHmacSecretArgs) -> RotateWaitpointHmacSecretOutcome {
keys(k: &IndexKeys) {
k.waitpoint_hmac_secrets(), }
argv {
args.new_kid.clone(), args.new_secret_hex.clone(), args.grace_ms.to_string(), }
}
}
impl FromFcallResult for RotateWaitpointHmacSecretOutcome {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let variant = r.field_str(0);
match variant.as_str() {
"rotated" => {
let prev = r.field_str(1);
let new_kid = r.field_str(2);
let gc_count = r
.field_str(3)
.parse::<u32>()
.map_err(|e| ScriptError::Parse {
fcall: "ff_rotate_waitpoint_hmac_secret_outcome".into(),
execution_id: None,
message: format!("bad gc_count: {e}"),
})?;
Ok(RotateWaitpointHmacSecretOutcome::Rotated {
previous_kid: if prev.is_empty() { None } else { Some(prev) },
new_kid,
gc_count,
})
}
"noop" => Ok(RotateWaitpointHmacSecretOutcome::Noop {
kid: r.field_str(1),
}),
other => Err(ScriptError::Parse {
fcall: "ff_rotate_waitpoint_hmac_secret_outcome".into(),
execution_id: None,
message: format!(
"unexpected rotation outcome: {other}"
),
}),
}
}
}
ff_function! {
pub ff_list_waitpoint_hmac_kids(_args: ListWaitpointHmacKidsArgs) -> WaitpointHmacKids {
keys(k: &IndexKeys) {
k.waitpoint_hmac_secrets(), }
argv {}
}
}
impl FromFcallResult for WaitpointHmacKids {
fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
let r = FcallResult::parse(raw)?.into_success()?;
let current = r.field_str(0);
let n = r
.field_str(1)
.parse::<usize>()
.map_err(|e| ScriptError::Parse {
fcall: "ff_waitpoint_hmac_kids".into(),
execution_id: None,
message: format!("bad verifying count: {e}"),
})?;
let mut verifying = Vec::with_capacity(n);
for i in 0..n {
let kid = r.field_str(2 + 2 * i);
let exp = r
.field_str(2 + 2 * i + 1)
.parse::<i64>()
.map_err(|e| ScriptError::Parse {
fcall: "ff_waitpoint_hmac_kids".into(),
execution_id: None,
message: format!("bad expires_at_ms for kid {kid}: {e}"),
})?;
verifying.push(VerifyingKid {
kid,
expires_at_ms: exp,
});
}
Ok(WaitpointHmacKids {
current_kid: if current.is_empty() {
None
} else {
Some(current)
},
verifying,
})
}
}
fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
match s {
"waiting" => Ok(PublicState::Waiting),
"delayed" => Ok(PublicState::Delayed),
"rate_limited" => Ok(PublicState::RateLimited),
"waiting_children" => Ok(PublicState::WaitingChildren),
"active" => Ok(PublicState::Active),
"suspended" => Ok(PublicState::Suspended),
"resumable" => Ok(PublicState::Resumable),
"completed" => Ok(PublicState::Completed),
"failed" => Ok(PublicState::Failed),
"cancelled" => Ok(PublicState::Cancelled),
"expired" => Ok(PublicState::Expired),
"skipped" => Ok(PublicState::Skipped),
_ => Err(ScriptError::Parse {
fcall: "parse_public_state".into(),
execution_id: None,
message: format!("unknown public_state: {s}"),
}),
}
}