1use ff_core::contracts::*;
5use crate::error::ScriptError;
6use ff_core::keys::{ExecKeyContext, IndexKeys};
7use ff_core::state::PublicState;
8use ff_core::types::*;
9
10use crate::result::{FcallResult, FromFcallResult};
11
12pub struct SuspendOpKeys<'a> {
15 pub ctx: &'a ExecKeyContext,
16 pub idx: &'a IndexKeys,
17 pub lane_id: &'a LaneId,
18 pub worker_instance_id: &'a WorkerInstanceId,
19}
20
21ff_function! {
36 pub ff_suspend_execution(args: SuspendExecutionArgs) -> SuspendExecutionResult {
37 keys(k: &SuspendOpKeys<'_>) {
38 k.ctx.core(), k.ctx.attempt_hash(args.attempt_index), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.suspension_current(), k.ctx.waitpoint(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.idx.suspension_timeout(), k.idx.pending_waitpoint_expiry(), k.idx.lane_active(k.lane_id), k.idx.lane_suspended(k.lane_id), k.ctx.waitpoints(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.idx.attempt_timeout(), k.idx.waitpoint_hmac_secrets(), }
56 argv {
57 args.execution_id.to_string(), args.attempt_index.to_string(), args.attempt_id.to_string(), args.lease_id.to_string(), args.lease_epoch.to_string(), args.suspension_id.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), args.reason_code.clone(), args.requested_by.clone(), args.timeout_at.map(|t| t.to_string()).unwrap_or_default(), args.resume_condition_json.clone(), args.resume_policy_json.clone(), args.continuation_metadata_pointer.clone().unwrap_or_default(), if args.use_pending_waitpoint { "1".into() } else { String::new() }, args.timeout_behavior.clone(), "1000".to_string(), }
75 }
76}
77
78impl FromFcallResult for SuspendExecutionResult {
79 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
80 let r = FcallResult::parse(raw)?;
81 if r.status == "ALREADY_SATISFIED" {
83 let sid = SuspensionId::parse(&r.field_str(0))
84 .map_err(|e| ScriptError::Parse(format!("bad suspension_id: {e}")))?;
85 let wid = WaitpointId::parse(&r.field_str(1))
86 .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
87 let wkey = r.field_str(2);
88 let token = WaitpointToken::new(r.field_str(3));
89 return Ok(SuspendExecutionResult::AlreadySatisfied {
90 suspension_id: sid,
91 waitpoint_id: wid,
92 waitpoint_key: wkey,
93 waitpoint_token: token,
94 });
95 }
96 let r = r.into_success()?;
97 let sid = SuspensionId::parse(&r.field_str(0))
99 .map_err(|e| ScriptError::Parse(format!("bad suspension_id: {e}")))?;
100 let wid = WaitpointId::parse(&r.field_str(1))
101 .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
102 let wkey = r.field_str(2);
103 let token = WaitpointToken::new(r.field_str(3));
104 Ok(SuspendExecutionResult::Suspended {
105 suspension_id: sid,
106 waitpoint_id: wid,
107 waitpoint_key: wkey,
108 waitpoint_token: token,
109 })
110 }
111}
112
113pub struct ResumeOpKeys<'a> {
123 pub ctx: &'a ExecKeyContext,
124 pub idx: &'a IndexKeys,
125 pub lane_id: &'a LaneId,
126 pub waitpoint_id: &'a WaitpointId,
127}
128
129ff_function! {
130 pub ff_resume_execution(args: ResumeExecutionArgs) -> ResumeExecutionResult {
131 keys(k: &ResumeOpKeys<'_>) {
132 k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_signals(k.waitpoint_id), k.idx.suspension_timeout(), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.lane_suspended(k.lane_id), }
141 argv {
142 args.execution_id.to_string(), args.trigger_type.clone(), args.resume_delay_ms.to_string(), }
146 }
147}
148
149impl FromFcallResult for ResumeExecutionResult {
150 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
151 let r = FcallResult::parse(raw)?.into_success()?;
152 let ps_str = r.field_str(0);
154 let public_state = parse_public_state(&ps_str)?;
155 Ok(ResumeExecutionResult::Resumed { public_state })
156 }
157}
158
159pub struct WaitpointOpKeys<'a> {
167 pub ctx: &'a ExecKeyContext,
168 pub idx: &'a IndexKeys,
169}
170
171ff_function! {
172 pub ff_create_pending_waitpoint(args: CreatePendingWaitpointArgs) -> CreatePendingWaitpointResult {
173 keys(k: &WaitpointOpKeys<'_>) {
174 k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
178 argv {
179 args.execution_id.to_string(), args.attempt_index.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), {
184 let now_ms = TimestampMs::now();
185 TimestampMs::from_millis(now_ms.0 + args.expires_in_ms as i64).to_string()
186 }, }
188 }
189}
190
191impl FromFcallResult for CreatePendingWaitpointResult {
192 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
193 let r = FcallResult::parse(raw)?.into_success()?;
194 let wid = WaitpointId::parse(&r.field_str(0))
196 .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
197 let wkey = r.field_str(1);
198 let token = WaitpointToken::new(r.field_str(2));
199 Ok(CreatePendingWaitpointResult::Created {
200 waitpoint_id: wid,
201 waitpoint_key: wkey,
202 waitpoint_token: token,
203 })
204 }
205}
206
207pub struct ExpireSuspensionOpKeys<'a> {
218 pub ctx: &'a ExecKeyContext,
219 pub idx: &'a IndexKeys,
220 pub lane_id: &'a LaneId,
221 pub waitpoint_id: &'a WaitpointId,
222 pub attempt_index: AttemptIndex,
223}
224
225ff_function! {
226 pub ff_expire_suspension(args: ExpireSuspensionArgs) -> ExpireSuspensionResult {
227 keys(k: &ExpireSuspensionOpKeys<'_>) {
228 k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_condition(k.waitpoint_id), k.ctx.attempt_hash(k.attempt_index), k.ctx.stream_meta(k.attempt_index), k.idx.suspension_timeout(), k.idx.lane_suspended(k.lane_id), k.idx.lane_terminal(k.lane_id), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.ctx.lease_history(), }
241 argv {
242 args.execution_id.to_string(), }
244 }
245}
246
247impl FromFcallResult for ExpireSuspensionResult {
248 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
249 let r = FcallResult::parse(raw)?.into_success()?;
250 let sub = r.field_str(0);
252 match sub.as_str() {
253 "not_found_cleaned" | "not_suspended_cleaned" | "no_active_suspension_cleaned"
254 | "not_yet_due" => Ok(ExpireSuspensionResult::AlreadySatisfied {
255 reason: sub,
256 }),
257 "auto_resume" => Ok(ExpireSuspensionResult::Expired {
258 behavior_applied: "auto_resume".into(),
259 }),
260 "escalate" => Ok(ExpireSuspensionResult::Expired {
261 behavior_applied: "escalate".into(),
262 }),
263 _ => Ok(ExpireSuspensionResult::Expired {
264 behavior_applied: sub,
265 }),
266 }
267 }
268}
269
270ff_function! {
276 pub ff_close_waitpoint(args: CloseWaitpointArgs) -> CloseWaitpointResult {
277 keys(k: &WaitpointOpKeys<'_>) {
278 k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
282 argv {
283 args.waitpoint_id.to_string(), args.reason.clone(), }
286 }
287}
288
289impl FromFcallResult for CloseWaitpointResult {
290 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
291 let _r = FcallResult::parse(raw)?.into_success()?;
292 Ok(CloseWaitpointResult::Closed)
294 }
295}
296
297fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
300 match s {
301 "waiting" => Ok(PublicState::Waiting),
302 "delayed" => Ok(PublicState::Delayed),
303 "rate_limited" => Ok(PublicState::RateLimited),
304 "waiting_children" => Ok(PublicState::WaitingChildren),
305 "active" => Ok(PublicState::Active),
306 "suspended" => Ok(PublicState::Suspended),
307 "completed" => Ok(PublicState::Completed),
308 "failed" => Ok(PublicState::Failed),
309 "cancelled" => Ok(PublicState::Cancelled),
310 "expired" => Ok(PublicState::Expired),
311 "skipped" => Ok(PublicState::Skipped),
312 _ => Err(ScriptError::Parse(format!("unknown public_state: {s}"))),
313 }
314}