1use ff_core::contracts::*;
5use crate::error::ScriptError;
6use ff_core::keys::{ExecKeyContext, IndexKeys};
7use ff_core::state::PublicState;
8use ff_core::types::*;
9
10use crate::result::{FcallResult, FromFcallResult};
11
12pub struct SuspendOpKeys<'a> {
15 pub ctx: &'a ExecKeyContext,
16 pub idx: &'a IndexKeys,
17 pub lane_id: &'a LaneId,
18 pub worker_instance_id: &'a WorkerInstanceId,
19}
20
21ff_function! {
36 pub ff_suspend_execution(args: SuspendExecutionArgs) -> SuspendExecutionResult {
37 keys(k: &SuspendOpKeys<'_>) {
38 k.ctx.core(), k.ctx.attempt_hash(args.attempt_index), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.suspension_current(), k.ctx.waitpoint(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.idx.suspension_timeout(), k.idx.pending_waitpoint_expiry(), k.idx.lane_active(k.lane_id), k.idx.lane_suspended(k.lane_id), k.ctx.waitpoints(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.idx.attempt_timeout(), k.idx.waitpoint_hmac_secrets(), }
56 argv {
57 args.execution_id.to_string(), args.attempt_index.to_string(), args.attempt_id.to_string(), args.lease_id.to_string(), args.lease_epoch.to_string(), args.suspension_id.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), args.reason_code.clone(), args.requested_by.clone(), args.timeout_at.map(|t| t.to_string()).unwrap_or_default(), args.resume_condition_json.clone(), args.resume_policy_json.clone(), args.continuation_metadata_pointer.clone().unwrap_or_default(), if args.use_pending_waitpoint { "1".into() } else { String::new() }, args.timeout_behavior.clone(), "1000".to_string(), }
75 }
76}
77
78impl FromFcallResult for SuspendExecutionResult {
79 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
80 let r = FcallResult::parse(raw)?;
81 if r.status == "ALREADY_SATISFIED" {
83 let sid = SuspensionId::parse(&r.field_str(0))
84 .map_err(|e| ScriptError::Parse(format!("bad suspension_id: {e}")))?;
85 let wid = WaitpointId::parse(&r.field_str(1))
86 .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
87 let wkey = r.field_str(2);
88 let token = WaitpointToken::new(r.field_str(3));
89 return Ok(SuspendExecutionResult::AlreadySatisfied {
90 suspension_id: sid,
91 waitpoint_id: wid,
92 waitpoint_key: wkey,
93 waitpoint_token: token,
94 });
95 }
96 let r = r.into_success()?;
97 let sid = SuspensionId::parse(&r.field_str(0))
99 .map_err(|e| ScriptError::Parse(format!("bad suspension_id: {e}")))?;
100 let wid = WaitpointId::parse(&r.field_str(1))
101 .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
102 let wkey = r.field_str(2);
103 let token = WaitpointToken::new(r.field_str(3));
104 Ok(SuspendExecutionResult::Suspended {
105 suspension_id: sid,
106 waitpoint_id: wid,
107 waitpoint_key: wkey,
108 waitpoint_token: token,
109 })
110 }
111}
112
113pub struct ResumeOpKeys<'a> {
123 pub ctx: &'a ExecKeyContext,
124 pub idx: &'a IndexKeys,
125 pub lane_id: &'a LaneId,
126 pub waitpoint_id: &'a WaitpointId,
127}
128
129ff_function! {
130 pub ff_resume_execution(args: ResumeExecutionArgs) -> ResumeExecutionResult {
131 keys(k: &ResumeOpKeys<'_>) {
132 k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_signals(k.waitpoint_id), k.idx.suspension_timeout(), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.lane_suspended(k.lane_id), }
141 argv {
142 args.execution_id.to_string(), args.trigger_type.clone(), args.resume_delay_ms.to_string(), }
146 }
147}
148
149impl FromFcallResult for ResumeExecutionResult {
150 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
151 let r = FcallResult::parse(raw)?.into_success()?;
152 let ps_str = r.field_str(0);
154 let public_state = parse_public_state(&ps_str)?;
155 Ok(ResumeExecutionResult::Resumed { public_state })
156 }
157}
158
159pub struct WaitpointOpKeys<'a> {
167 pub ctx: &'a ExecKeyContext,
168 pub idx: &'a IndexKeys,
169}
170
171ff_function! {
172 pub ff_create_pending_waitpoint(args: CreatePendingWaitpointArgs) -> CreatePendingWaitpointResult {
173 keys(k: &WaitpointOpKeys<'_>) {
174 k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
178 argv {
179 args.execution_id.to_string(), args.attempt_index.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), {
184 let now_ms = TimestampMs::now();
185 TimestampMs::from_millis(now_ms.0 + args.expires_in_ms as i64).to_string()
186 }, }
188 }
189}
190
191impl FromFcallResult for CreatePendingWaitpointResult {
192 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
193 let r = FcallResult::parse(raw)?.into_success()?;
194 let wid = WaitpointId::parse(&r.field_str(0))
196 .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
197 let wkey = r.field_str(1);
198 let token = WaitpointToken::new(r.field_str(2));
199 Ok(CreatePendingWaitpointResult::Created {
200 waitpoint_id: wid,
201 waitpoint_key: wkey,
202 waitpoint_token: token,
203 })
204 }
205}
206
207pub struct ExpireSuspensionOpKeys<'a> {
218 pub ctx: &'a ExecKeyContext,
219 pub idx: &'a IndexKeys,
220 pub lane_id: &'a LaneId,
221 pub waitpoint_id: &'a WaitpointId,
222 pub attempt_index: AttemptIndex,
223}
224
225ff_function! {
226 pub ff_expire_suspension(args: ExpireSuspensionArgs) -> ExpireSuspensionResult {
227 keys(k: &ExpireSuspensionOpKeys<'_>) {
228 k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_condition(k.waitpoint_id), k.ctx.attempt_hash(k.attempt_index), k.ctx.stream_meta(k.attempt_index), k.idx.suspension_timeout(), k.idx.lane_suspended(k.lane_id), k.idx.lane_terminal(k.lane_id), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.ctx.lease_history(), }
241 argv {
242 args.execution_id.to_string(), }
244 }
245}
246
247impl FromFcallResult for ExpireSuspensionResult {
248 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
249 let r = FcallResult::parse(raw)?.into_success()?;
250 let sub = r.field_str(0);
252 match sub.as_str() {
253 "not_found_cleaned" | "not_suspended_cleaned" | "no_active_suspension_cleaned"
254 | "not_yet_due" => Ok(ExpireSuspensionResult::AlreadySatisfied {
255 reason: sub,
256 }),
257 "auto_resume" => Ok(ExpireSuspensionResult::Expired {
258 behavior_applied: "auto_resume".into(),
259 }),
260 "escalate" => Ok(ExpireSuspensionResult::Expired {
261 behavior_applied: "escalate".into(),
262 }),
263 _ => Ok(ExpireSuspensionResult::Expired {
264 behavior_applied: sub,
265 }),
266 }
267 }
268}
269
270ff_function! {
276 pub ff_close_waitpoint(args: CloseWaitpointArgs) -> CloseWaitpointResult {
277 keys(k: &WaitpointOpKeys<'_>) {
278 k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
282 argv {
283 args.waitpoint_id.to_string(), args.reason.clone(), }
286 }
287}
288
289impl FromFcallResult for CloseWaitpointResult {
290 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
291 let _r = FcallResult::parse(raw)?.into_success()?;
292 Ok(CloseWaitpointResult::Closed)
294 }
295}
296
297ff_function! {
303 pub ff_rotate_waitpoint_hmac_secret(args: RotateWaitpointHmacSecretArgs) -> RotateWaitpointHmacSecretOutcome {
314 keys(k: &IndexKeys) {
315 k.waitpoint_hmac_secrets(), }
317 argv {
318 args.new_kid.clone(), args.new_secret_hex.clone(), args.grace_ms.to_string(), }
322 }
323}
324
325impl FromFcallResult for RotateWaitpointHmacSecretOutcome {
326 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
327 let r = FcallResult::parse(raw)?.into_success()?;
328 let variant = r.field_str(0);
332 match variant.as_str() {
333 "rotated" => {
334 let prev = r.field_str(1);
335 let new_kid = r.field_str(2);
336 let gc_count = r.field_str(3).parse::<u32>().map_err(|e| {
337 ScriptError::Parse(format!("bad gc_count: {e}"))
338 })?;
339 Ok(RotateWaitpointHmacSecretOutcome::Rotated {
340 previous_kid: if prev.is_empty() { None } else { Some(prev) },
341 new_kid,
342 gc_count,
343 })
344 }
345 "noop" => Ok(RotateWaitpointHmacSecretOutcome::Noop {
346 kid: r.field_str(1),
347 }),
348 other => Err(ScriptError::Parse(format!(
349 "unexpected rotation outcome: {other}"
350 ))),
351 }
352 }
353}
354
355ff_function! {
361 pub ff_list_waitpoint_hmac_kids(_args: ListWaitpointHmacKidsArgs) -> WaitpointHmacKids {
364 keys(k: &IndexKeys) {
365 k.waitpoint_hmac_secrets(), }
367 argv {}
368 }
369}
370
371impl FromFcallResult for WaitpointHmacKids {
372 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
373 let r = FcallResult::parse(raw)?.into_success()?;
374 let current = r.field_str(0);
376 let n = r.field_str(1).parse::<usize>().map_err(|e| {
377 ScriptError::Parse(format!("bad verifying count: {e}"))
378 })?;
379 let mut verifying = Vec::with_capacity(n);
380 for i in 0..n {
381 let kid = r.field_str(2 + 2 * i);
382 let exp = r.field_str(2 + 2 * i + 1).parse::<i64>().map_err(|e| {
383 ScriptError::Parse(format!("bad expires_at_ms for kid {kid}: {e}"))
384 })?;
385 verifying.push(VerifyingKid { kid, expires_at_ms: exp });
386 }
387 Ok(WaitpointHmacKids {
388 current_kid: if current.is_empty() { None } else { Some(current) },
389 verifying,
390 })
391 }
392}
393
394fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
397 match s {
398 "waiting" => Ok(PublicState::Waiting),
399 "delayed" => Ok(PublicState::Delayed),
400 "rate_limited" => Ok(PublicState::RateLimited),
401 "waiting_children" => Ok(PublicState::WaitingChildren),
402 "active" => Ok(PublicState::Active),
403 "suspended" => Ok(PublicState::Suspended),
404 "completed" => Ok(PublicState::Completed),
405 "failed" => Ok(PublicState::Failed),
406 "cancelled" => Ok(PublicState::Cancelled),
407 "expired" => Ok(PublicState::Expired),
408 "skipped" => Ok(PublicState::Skipped),
409 _ => Err(ScriptError::Parse(format!("unknown public_state: {s}"))),
410 }
411}