1use crate::error::ScriptError;
5use ff_core::contracts::*;
6use ff_core::keys::{ExecKeyContext, IndexKeys};
7use ff_core::state::PublicState;
8use ff_core::types::*;
9
10use crate::result::{FcallResult, FromFcallResult};
11
12pub struct SuspendOpKeys<'a> {
15 pub ctx: &'a ExecKeyContext,
16 pub idx: &'a IndexKeys,
17 pub lane_id: &'a LaneId,
18 pub worker_instance_id: &'a WorkerInstanceId,
19}
20
21ff_function! {
36 pub ff_suspend_execution(args: SuspendExecutionArgs) -> SuspendExecutionResult {
37 keys(k: &SuspendOpKeys<'_>) {
38 k.ctx.core(), k.ctx.attempt_hash(args.attempt_index), k.ctx.lease_current(), k.ctx.lease_history(), k.idx.lease_expiry(), k.idx.worker_leases(k.worker_instance_id), k.ctx.suspension_current(), k.ctx.waitpoint(&args.waitpoint_id), k.ctx.waitpoint_signals(&args.waitpoint_id), k.idx.suspension_timeout(), k.idx.pending_waitpoint_expiry(), k.idx.lane_active(k.lane_id), k.idx.lane_suspended(k.lane_id), k.ctx.waitpoints(), k.ctx.waitpoint_condition(&args.waitpoint_id), k.idx.attempt_timeout(), k.idx.waitpoint_hmac_secrets(), }
56 argv {
59 args.execution_id.to_string(), args.attempt_index.to_string(), args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(), args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(), args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(), args.suspension_id.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), args.reason_code.clone(), args.requested_by.clone(), args.timeout_at.map(|t| t.to_string()).unwrap_or_default(), args.resume_condition_json.clone(), args.resume_policy_json.clone(), args.continuation_metadata_pointer.clone().unwrap_or_default(), if args.use_pending_waitpoint { "1".into() } else { String::new() }, args.timeout_behavior.clone(), "1000".to_string(), }
77 }
78}
79
80impl FromFcallResult for SuspendExecutionResult {
81 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
82 let r = FcallResult::parse(raw)?;
83 if r.status == "ALREADY_SATISFIED" {
85 let sid = SuspensionId::parse(&r.field_str(0))
86 .map_err(|e| ScriptError::Parse {
87 fcall: "ff_suspend_execution".into(),
88 execution_id: None,
89 message: format!("bad suspension_id: {e}"),
90 })?;
91 let wid = WaitpointId::parse(&r.field_str(1))
92 .map_err(|e| ScriptError::Parse {
93 fcall: "ff_suspend_execution".into(),
94 execution_id: None,
95 message: format!("bad waitpoint_id: {e}"),
96 })?;
97 let wkey = r.field_str(2);
98 let token = WaitpointToken::new(r.field_str(3));
99 return Ok(SuspendExecutionResult::AlreadySatisfied {
100 suspension_id: sid,
101 waitpoint_id: wid,
102 waitpoint_key: wkey,
103 waitpoint_token: token,
104 });
105 }
106 let r = r.into_success()?;
107 let sid = SuspensionId::parse(&r.field_str(0))
109 .map_err(|e| ScriptError::Parse {
110 fcall: "ff_suspend_execution".into(),
111 execution_id: None,
112 message: format!("bad suspension_id: {e}"),
113 })?;
114 let wid = WaitpointId::parse(&r.field_str(1))
115 .map_err(|e| ScriptError::Parse {
116 fcall: "ff_suspend_execution".into(),
117 execution_id: None,
118 message: format!("bad waitpoint_id: {e}"),
119 })?;
120 let wkey = r.field_str(2);
121 let token = WaitpointToken::new(r.field_str(3));
122 Ok(SuspendExecutionResult::Suspended {
123 suspension_id: sid,
124 waitpoint_id: wid,
125 waitpoint_key: wkey,
126 waitpoint_token: token,
127 })
128 }
129}
130
131pub struct ResumeOpKeys<'a> {
141 pub ctx: &'a ExecKeyContext,
142 pub idx: &'a IndexKeys,
143 pub lane_id: &'a LaneId,
144 pub waitpoint_id: &'a WaitpointId,
145}
146
147ff_function! {
148 pub ff_resume_execution(args: ResumeExecutionArgs) -> ResumeExecutionResult {
149 keys(k: &ResumeOpKeys<'_>) {
150 k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_signals(k.waitpoint_id), k.idx.suspension_timeout(), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.idx.lane_suspended(k.lane_id), }
159 argv {
160 args.execution_id.to_string(), args.trigger_type.clone(), args.resume_delay_ms.to_string(), }
164 }
165}
166
167impl FromFcallResult for ResumeExecutionResult {
168 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
169 let r = FcallResult::parse(raw)?.into_success()?;
170 let ps_str = r.field_str(0);
172 let public_state = parse_public_state(&ps_str)?;
173 Ok(ResumeExecutionResult::Resumed { public_state })
174 }
175}
176
177pub struct WaitpointOpKeys<'a> {
185 pub ctx: &'a ExecKeyContext,
186 pub idx: &'a IndexKeys,
187}
188
189ff_function! {
190 pub ff_create_pending_waitpoint(args: CreatePendingWaitpointArgs) -> CreatePendingWaitpointResult {
191 keys(k: &WaitpointOpKeys<'_>) {
192 k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
196 argv {
197 args.execution_id.to_string(), args.attempt_index.to_string(), args.waitpoint_id.to_string(), args.waitpoint_key.clone(), {
202 let now_ms = TimestampMs::now();
203 TimestampMs::from_millis(now_ms.0 + args.expires_in_ms as i64).to_string()
204 }, }
206 }
207}
208
209impl FromFcallResult for CreatePendingWaitpointResult {
210 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
211 let r = FcallResult::parse(raw)?.into_success()?;
212 let wid = WaitpointId::parse(&r.field_str(0))
214 .map_err(|e| ScriptError::Parse {
215 fcall: "ff_create_pending_waitpoint".into(),
216 execution_id: None,
217 message: format!("bad waitpoint_id: {e}"),
218 })?;
219 let wkey = r.field_str(1);
220 let token = WaitpointToken::new(r.field_str(2));
221 Ok(CreatePendingWaitpointResult::Created {
222 waitpoint_id: wid,
223 waitpoint_key: wkey,
224 waitpoint_token: token,
225 })
226 }
227}
228
229pub struct ExpireSuspensionOpKeys<'a> {
240 pub ctx: &'a ExecKeyContext,
241 pub idx: &'a IndexKeys,
242 pub lane_id: &'a LaneId,
243 pub waitpoint_id: &'a WaitpointId,
244 pub attempt_index: AttemptIndex,
245}
246
247ff_function! {
248 pub ff_expire_suspension(args: ExpireSuspensionArgs) -> ExpireSuspensionResult {
249 keys(k: &ExpireSuspensionOpKeys<'_>) {
250 k.ctx.core(), k.ctx.suspension_current(), k.ctx.waitpoint(k.waitpoint_id), k.ctx.waitpoint_condition(k.waitpoint_id), k.ctx.attempt_hash(k.attempt_index), k.ctx.stream_meta(k.attempt_index), k.idx.suspension_timeout(), k.idx.lane_suspended(k.lane_id), k.idx.lane_terminal(k.lane_id), k.idx.lane_eligible(k.lane_id), k.idx.lane_delayed(k.lane_id), k.ctx.lease_history(), }
263 argv {
264 args.execution_id.to_string(), }
266 }
267}
268
269impl FromFcallResult for ExpireSuspensionResult {
270 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
271 let r = FcallResult::parse(raw)?.into_success()?;
272 let sub = r.field_str(0);
274 match sub.as_str() {
275 "not_found_cleaned"
276 | "not_suspended_cleaned"
277 | "no_active_suspension_cleaned"
278 | "not_yet_due" => Ok(ExpireSuspensionResult::AlreadySatisfied { reason: sub }),
279 "auto_resume" => Ok(ExpireSuspensionResult::Expired {
280 behavior_applied: "auto_resume".into(),
281 }),
282 "escalate" => Ok(ExpireSuspensionResult::Expired {
283 behavior_applied: "escalate".into(),
284 }),
285 _ => Ok(ExpireSuspensionResult::Expired {
286 behavior_applied: sub,
287 }),
288 }
289 }
290}
291
292ff_function! {
298 pub ff_close_waitpoint(args: CloseWaitpointArgs) -> CloseWaitpointResult {
299 keys(k: &WaitpointOpKeys<'_>) {
300 k.ctx.core(), k.ctx.waitpoint(&args.waitpoint_id), k.idx.pending_waitpoint_expiry(), }
304 argv {
305 args.waitpoint_id.to_string(), args.reason.clone(), }
308 }
309}
310
311impl FromFcallResult for CloseWaitpointResult {
312 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
313 let _r = FcallResult::parse(raw)?.into_success()?;
314 Ok(CloseWaitpointResult::Closed)
316 }
317}
318
319ff_function! {
325 pub ff_rotate_waitpoint_hmac_secret(args: RotateWaitpointHmacSecretArgs) -> RotateWaitpointHmacSecretOutcome {
336 keys(k: &IndexKeys) {
337 k.waitpoint_hmac_secrets(), }
339 argv {
340 args.new_kid.clone(), args.new_secret_hex.clone(), args.grace_ms.to_string(), }
344 }
345}
346
347impl FromFcallResult for RotateWaitpointHmacSecretOutcome {
348 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
349 let r = FcallResult::parse(raw)?.into_success()?;
350 let variant = r.field_str(0);
354 match variant.as_str() {
355 "rotated" => {
356 let prev = r.field_str(1);
357 let new_kid = r.field_str(2);
358 let gc_count = r
359 .field_str(3)
360 .parse::<u32>()
361 .map_err(|e| ScriptError::Parse {
362 fcall: "ff_rotate_waitpoint_hmac_secret_outcome".into(),
363 execution_id: None,
364 message: format!("bad gc_count: {e}"),
365 })?;
366 Ok(RotateWaitpointHmacSecretOutcome::Rotated {
367 previous_kid: if prev.is_empty() { None } else { Some(prev) },
368 new_kid,
369 gc_count,
370 })
371 }
372 "noop" => Ok(RotateWaitpointHmacSecretOutcome::Noop {
373 kid: r.field_str(1),
374 }),
375 other => Err(ScriptError::Parse {
376 fcall: "ff_rotate_waitpoint_hmac_secret_outcome".into(),
377 execution_id: None,
378 message: format!(
379 "unexpected rotation outcome: {other}"
380 ),
381 }),
382 }
383 }
384}
385
386ff_function! {
392 pub ff_list_waitpoint_hmac_kids(_args: ListWaitpointHmacKidsArgs) -> WaitpointHmacKids {
395 keys(k: &IndexKeys) {
396 k.waitpoint_hmac_secrets(), }
398 argv {}
399 }
400}
401
402impl FromFcallResult for WaitpointHmacKids {
403 fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
404 let r = FcallResult::parse(raw)?.into_success()?;
405 let current = r.field_str(0);
407 let n = r
408 .field_str(1)
409 .parse::<usize>()
410 .map_err(|e| ScriptError::Parse {
411 fcall: "ff_waitpoint_hmac_kids".into(),
412 execution_id: None,
413 message: format!("bad verifying count: {e}"),
414 })?;
415 let mut verifying = Vec::with_capacity(n);
416 for i in 0..n {
417 let kid = r.field_str(2 + 2 * i);
418 let exp = r
419 .field_str(2 + 2 * i + 1)
420 .parse::<i64>()
421 .map_err(|e| ScriptError::Parse {
422 fcall: "ff_waitpoint_hmac_kids".into(),
423 execution_id: None,
424 message: format!("bad expires_at_ms for kid {kid}: {e}"),
425 })?;
426 verifying.push(VerifyingKid {
427 kid,
428 expires_at_ms: exp,
429 });
430 }
431 Ok(WaitpointHmacKids {
432 current_kid: if current.is_empty() {
433 None
434 } else {
435 Some(current)
436 },
437 verifying,
438 })
439 }
440}
441
442fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
445 match s {
446 "waiting" => Ok(PublicState::Waiting),
447 "delayed" => Ok(PublicState::Delayed),
448 "rate_limited" => Ok(PublicState::RateLimited),
449 "waiting_children" => Ok(PublicState::WaitingChildren),
450 "active" => Ok(PublicState::Active),
451 "suspended" => Ok(PublicState::Suspended),
452 "completed" => Ok(PublicState::Completed),
453 "failed" => Ok(PublicState::Failed),
454 "cancelled" => Ok(PublicState::Cancelled),
455 "expired" => Ok(PublicState::Expired),
456 "skipped" => Ok(PublicState::Skipped),
457 _ => Err(ScriptError::Parse {
458 fcall: "parse_public_state".into(),
459 execution_id: None,
460 message: format!("unknown public_state: {s}"),
461 }),
462 }
463}