Skip to main content

ff_script/functions/
suspension.rs

1//! Typed FCALL wrappers for suspension and waitpoint functions
2//! (lua/suspension.lua).
3
4use ff_core::contracts::*;
5use crate::error::ScriptError;
6use ff_core::keys::{ExecKeyContext, IndexKeys};
7use ff_core::state::PublicState;
8use ff_core::types::*;
9
10use crate::result::{FcallResult, FromFcallResult};
11
12/// Key context for suspension operations.
13/// Needs exec keys + index keys + lane + worker_instance_id (for lease release).
14pub struct SuspendOpKeys<'a> {
15    pub ctx: &'a ExecKeyContext,
16    pub idx: &'a IndexKeys,
17    pub lane_id: &'a LaneId,
18    pub worker_instance_id: &'a WorkerInstanceId,
19}
20
21// ─── ff_suspend_execution ─────────────────────────────────────────────
22//
23// Lua KEYS (17): exec_core, attempt_record, lease_current, lease_history,
24//                lease_expiry_zset, worker_leases, suspension_current,
25//                waitpoint_hash, waitpoint_signals, suspension_timeout_zset,
26//                pending_wp_expiry_zset, active_index, suspended_zset,
27//                waitpoint_history, wp_condition, attempt_timeout_zset,
28//                hmac_secrets
29// Lua ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
30//                lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
31//                reason_code, requested_by, timeout_at, resume_condition_json,
32//                resume_policy_json, continuation_metadata_pointer,
33//                use_pending_waitpoint, timeout_behavior, lease_history_maxlen
34
35ff_function! {
36    pub ff_suspend_execution(args: SuspendExecutionArgs) -> SuspendExecutionResult {
37        keys(k: &SuspendOpKeys<'_>) {
38            k.ctx.core(),                                              // 1
39            k.ctx.attempt_hash(args.attempt_index),                    // 2
40            k.ctx.lease_current(),                                     // 3
41            k.ctx.lease_history(),                                     // 4
42            k.idx.lease_expiry(),                                      // 5
43            k.idx.worker_leases(k.worker_instance_id),                 // 6
44            k.ctx.suspension_current(),                                // 7
45            k.ctx.waitpoint(&args.waitpoint_id),                       // 8
46            k.ctx.waitpoint_signals(&args.waitpoint_id),               // 9
47            k.idx.suspension_timeout(),                                // 10
48            k.idx.pending_waitpoint_expiry(),                          // 11
49            k.idx.lane_active(k.lane_id),                              // 12
50            k.idx.lane_suspended(k.lane_id),                           // 13
51            k.ctx.waitpoints(),                                        // 14
52            k.ctx.waitpoint_condition(&args.waitpoint_id),             // 15
53            k.idx.attempt_timeout(),                                   // 16
54            k.idx.waitpoint_hmac_secrets(),                            // 17
55        }
56        argv {
57            args.execution_id.to_string(),                             // 1
58            args.attempt_index.to_string(),                            // 2
59            args.attempt_id.to_string(),                               // 3
60            args.lease_id.to_string(),                                 // 4
61            args.lease_epoch.to_string(),                              // 5
62            args.suspension_id.to_string(),                            // 6
63            args.waitpoint_id.to_string(),                             // 7
64            args.waitpoint_key.clone(),                                // 8
65            args.reason_code.clone(),                                  // 9
66            args.requested_by.clone(),                                 // 10
67            args.timeout_at.map(|t| t.to_string()).unwrap_or_default(), // 11
68            args.resume_condition_json.clone(),                        // 12
69            args.resume_policy_json.clone(),                           // 13
70            args.continuation_metadata_pointer.clone().unwrap_or_default(), // 14
71            if args.use_pending_waitpoint { "1".into() } else { String::new() }, // 15
72            args.timeout_behavior.clone(),                             // 16
73            "1000".to_string(),                                        // 17 lease_history_maxlen
74        }
75    }
76}
77
78impl FromFcallResult for SuspendExecutionResult {
79    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
80        let r = FcallResult::parse(raw)?;
81        // ALREADY_SATISFIED: {1, "ALREADY_SATISFIED", suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
82        if r.status == "ALREADY_SATISFIED" {
83            let sid = SuspensionId::parse(&r.field_str(0))
84                .map_err(|e| ScriptError::Parse(format!("bad suspension_id: {e}")))?;
85            let wid = WaitpointId::parse(&r.field_str(1))
86                .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
87            let wkey = r.field_str(2);
88            let token = WaitpointToken::new(r.field_str(3));
89            return Ok(SuspendExecutionResult::AlreadySatisfied {
90                suspension_id: sid,
91                waitpoint_id: wid,
92                waitpoint_key: wkey,
93                waitpoint_token: token,
94            });
95        }
96        let r = r.into_success()?;
97        // ok(suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)
98        let sid = SuspensionId::parse(&r.field_str(0))
99            .map_err(|e| ScriptError::Parse(format!("bad suspension_id: {e}")))?;
100        let wid = WaitpointId::parse(&r.field_str(1))
101            .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
102        let wkey = r.field_str(2);
103        let token = WaitpointToken::new(r.field_str(3));
104        Ok(SuspendExecutionResult::Suspended {
105            suspension_id: sid,
106            waitpoint_id: wid,
107            waitpoint_key: wkey,
108            waitpoint_token: token,
109        })
110    }
111}
112
113// ─── ff_resume_execution ──────────────────────────────────────────────
114//
115// Lua KEYS (8): exec_core, suspension_current, waitpoint_hash,
116//               waitpoint_signals, suspension_timeout_zset,
117//               eligible_zset, delayed_zset, suspended_zset
118// Lua ARGV (3): execution_id, trigger_type, resume_delay_ms
119
120/// Key context for resume — caller must pre-read current_waitpoint_id from
121/// exec_core so the correct waitpoint keys can be passed to the Lua.
122pub struct ResumeOpKeys<'a> {
123    pub ctx: &'a ExecKeyContext,
124    pub idx: &'a IndexKeys,
125    pub lane_id: &'a LaneId,
126    pub waitpoint_id: &'a WaitpointId,
127}
128
129ff_function! {
130    pub ff_resume_execution(args: ResumeExecutionArgs) -> ResumeExecutionResult {
131        keys(k: &ResumeOpKeys<'_>) {
132            k.ctx.core(),                                              // 1
133            k.ctx.suspension_current(),                                // 2
134            k.ctx.waitpoint(k.waitpoint_id),                           // 3
135            k.ctx.waitpoint_signals(k.waitpoint_id),                   // 4
136            k.idx.suspension_timeout(),                                // 5
137            k.idx.lane_eligible(k.lane_id),                            // 6
138            k.idx.lane_delayed(k.lane_id),                             // 7
139            k.idx.lane_suspended(k.lane_id),                           // 8
140        }
141        argv {
142            args.execution_id.to_string(),                             // 1
143            args.trigger_type.clone(),                                 // 2
144            args.resume_delay_ms.to_string(),                          // 3
145        }
146    }
147}
148
149impl FromFcallResult for ResumeExecutionResult {
150    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
151        let r = FcallResult::parse(raw)?.into_success()?;
152        // ok(public_state)
153        let ps_str = r.field_str(0);
154        let public_state = parse_public_state(&ps_str)?;
155        Ok(ResumeExecutionResult::Resumed { public_state })
156    }
157}
158
159// ─── ff_create_pending_waitpoint ──────────────────────────────────────
160//
161// Lua KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset
162// Lua ARGV (5): execution_id, attempt_index, waitpoint_id, waitpoint_key,
163//               expires_at
164
165/// Minimal key context for create_pending_waitpoint.
166pub struct WaitpointOpKeys<'a> {
167    pub ctx: &'a ExecKeyContext,
168    pub idx: &'a IndexKeys,
169}
170
171ff_function! {
172    pub ff_create_pending_waitpoint(args: CreatePendingWaitpointArgs) -> CreatePendingWaitpointResult {
173        keys(k: &WaitpointOpKeys<'_>) {
174            k.ctx.core(),                                              // 1
175            k.ctx.waitpoint(&args.waitpoint_id),                       // 2
176            k.idx.pending_waitpoint_expiry(),                          // 3
177        }
178        argv {
179            args.execution_id.to_string(),                             // 1
180            args.attempt_index.to_string(),                            // 2
181            args.waitpoint_id.to_string(),                             // 3
182            args.waitpoint_key.clone(),                                // 4
183            {
184                let now_ms = TimestampMs::now();
185                TimestampMs::from_millis(now_ms.0 + args.expires_in_ms as i64).to_string()
186            },                                                         // 5
187        }
188    }
189}
190
191impl FromFcallResult for CreatePendingWaitpointResult {
192    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
193        let r = FcallResult::parse(raw)?.into_success()?;
194        // ok(waitpoint_id, waitpoint_key, waitpoint_token)
195        let wid = WaitpointId::parse(&r.field_str(0))
196            .map_err(|e| ScriptError::Parse(format!("bad waitpoint_id: {e}")))?;
197        let wkey = r.field_str(1);
198        let token = WaitpointToken::new(r.field_str(2));
199        Ok(CreatePendingWaitpointResult::Created {
200            waitpoint_id: wid,
201            waitpoint_key: wkey,
202            waitpoint_token: token,
203        })
204    }
205}
206
207// ─── ff_expire_suspension ─────────────────────────────────────────────
208//
209// Lua KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,
210//                attempt_hash, stream_meta, suspension_timeout_zset,
211//                suspended_zset, terminal_zset, eligible_zset, delayed_zset,
212//                lease_history
213// Lua ARGV (1): execution_id
214
215/// Key context for expire_suspension — caller must pre-read current_waitpoint_id
216/// and current_attempt_index from exec_core so correct keys can be passed.
217pub struct ExpireSuspensionOpKeys<'a> {
218    pub ctx: &'a ExecKeyContext,
219    pub idx: &'a IndexKeys,
220    pub lane_id: &'a LaneId,
221    pub waitpoint_id: &'a WaitpointId,
222    pub attempt_index: AttemptIndex,
223}
224
225ff_function! {
226    pub ff_expire_suspension(args: ExpireSuspensionArgs) -> ExpireSuspensionResult {
227        keys(k: &ExpireSuspensionOpKeys<'_>) {
228            k.ctx.core(),                                              // 1
229            k.ctx.suspension_current(),                                // 2
230            k.ctx.waitpoint(k.waitpoint_id),                           // 3
231            k.ctx.waitpoint_condition(k.waitpoint_id),                 // 4
232            k.ctx.attempt_hash(k.attempt_index),                       // 5
233            k.ctx.stream_meta(k.attempt_index),                        // 6
234            k.idx.suspension_timeout(),                                // 7
235            k.idx.lane_suspended(k.lane_id),                           // 8
236            k.idx.lane_terminal(k.lane_id),                            // 9
237            k.idx.lane_eligible(k.lane_id),                            // 10
238            k.idx.lane_delayed(k.lane_id),                             // 11
239            k.ctx.lease_history(),                                     // 12
240        }
241        argv {
242            args.execution_id.to_string(),                             // 1
243        }
244    }
245}
246
247impl FromFcallResult for ExpireSuspensionResult {
248    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
249        let r = FcallResult::parse(raw)?.into_success()?;
250        // ok(behavior, public_state) or ok("not_found_cleaned") etc.
251        let sub = r.field_str(0);
252        match sub.as_str() {
253            "not_found_cleaned" | "not_suspended_cleaned" | "no_active_suspension_cleaned"
254            | "not_yet_due" => Ok(ExpireSuspensionResult::AlreadySatisfied {
255                reason: sub,
256            }),
257            "auto_resume" => Ok(ExpireSuspensionResult::Expired {
258                behavior_applied: "auto_resume".into(),
259            }),
260            "escalate" => Ok(ExpireSuspensionResult::Expired {
261                behavior_applied: "escalate".into(),
262            }),
263            _ => Ok(ExpireSuspensionResult::Expired {
264                behavior_applied: sub,
265            }),
266        }
267    }
268}
269
270// ─── ff_close_waitpoint ───────────────────────────────────────────────
271//
272// Lua KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset
273// Lua ARGV (2): waitpoint_id, reason
274
275ff_function! {
276    pub ff_close_waitpoint(args: CloseWaitpointArgs) -> CloseWaitpointResult {
277        keys(k: &WaitpointOpKeys<'_>) {
278            k.ctx.core(),                                              // 1
279            k.ctx.waitpoint(&args.waitpoint_id),                       // 2
280            k.idx.pending_waitpoint_expiry(),                          // 3
281        }
282        argv {
283            args.waitpoint_id.to_string(),                             // 1
284            args.reason.clone(),                                       // 2
285        }
286    }
287}
288
289impl FromFcallResult for CloseWaitpointResult {
290    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
291        let _r = FcallResult::parse(raw)?.into_success()?;
292        // ok() or ok("already_closed")
293        Ok(CloseWaitpointResult::Closed)
294    }
295}
296
297// ─── Helpers ──────────────────────────────────────────────────────────
298
299fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
300    match s {
301        "waiting" => Ok(PublicState::Waiting),
302        "delayed" => Ok(PublicState::Delayed),
303        "rate_limited" => Ok(PublicState::RateLimited),
304        "waiting_children" => Ok(PublicState::WaitingChildren),
305        "active" => Ok(PublicState::Active),
306        "suspended" => Ok(PublicState::Suspended),
307        "completed" => Ok(PublicState::Completed),
308        "failed" => Ok(PublicState::Failed),
309        "cancelled" => Ok(PublicState::Cancelled),
310        "expired" => Ok(PublicState::Expired),
311        "skipped" => Ok(PublicState::Skipped),
312        _ => Err(ScriptError::Parse(format!("unknown public_state: {s}"))),
313    }
314}