Skip to main content

ff_script/functions/
suspension.rs

1//! Typed FCALL wrappers for suspension and waitpoint functions
2//! (lua/suspension.lua).
3
4use crate::error::ScriptError;
5use ff_core::contracts::*;
6use ff_core::keys::{ExecKeyContext, IndexKeys};
7use ff_core::state::PublicState;
8use ff_core::types::*;
9
10use crate::result::{FcallResult, FromFcallResult};
11
12/// Key context for suspension operations.
13/// Needs exec keys + index keys + lane + worker_instance_id (for lease release).
14pub struct SuspendOpKeys<'a> {
15    pub ctx: &'a ExecKeyContext,
16    pub idx: &'a IndexKeys,
17    pub lane_id: &'a LaneId,
18    pub worker_instance_id: &'a WorkerInstanceId,
19}
20
21// ─── ff_suspend_execution ─────────────────────────────────────────────
22//
23// Lua KEYS (17): exec_core, attempt_record, lease_current, lease_history,
24//                lease_expiry_zset, worker_leases, suspension_current,
25//                waitpoint_hash, waitpoint_signals, suspension_timeout_zset,
26//                pending_wp_expiry_zset, active_index, suspended_zset,
27//                waitpoint_history, wp_condition, attempt_timeout_zset,
28//                hmac_secrets
29// Lua ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
30//                lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
31//                reason_code, requested_by, timeout_at, resume_condition_json,
32//                resume_policy_json, continuation_metadata_pointer,
33//                use_pending_waitpoint, timeout_behavior, lease_history_maxlen
34
35ff_function! {
36    pub ff_suspend_execution(args: SuspendExecutionArgs) -> SuspendExecutionResult {
37        keys(k: &SuspendOpKeys<'_>) {
38            k.ctx.core(),                                              // 1
39            k.ctx.attempt_hash(args.attempt_index),                    // 2
40            k.ctx.lease_current(),                                     // 3
41            k.ctx.lease_history(),                                     // 4
42            k.idx.lease_expiry(),                                      // 5
43            k.idx.worker_leases(k.worker_instance_id),                 // 6
44            k.ctx.suspension_current(),                                // 7
45            k.ctx.waitpoint(&args.waitpoint_id),                       // 8
46            k.ctx.waitpoint_signals(&args.waitpoint_id),               // 9
47            k.idx.suspension_timeout(),                                // 10
48            k.idx.pending_waitpoint_expiry(),                          // 11
49            k.idx.lane_active(k.lane_id),                              // 12
50            k.idx.lane_suspended(k.lane_id),                           // 13
51            k.ctx.waitpoints(),                                        // 14
52            k.ctx.waitpoint_condition(&args.waitpoint_id),             // 15
53            k.idx.attempt_timeout(),                                   // 16
54            k.idx.waitpoint_hmac_secrets(),                            // 17
55        }
56        // RFC #58.5: `fence` is Option<LeaseFence>. Suspend hard-rejects
57        // empty triples with `fence_required` — no operator override.
58        argv {
59            args.execution_id.to_string(),                             // 1
60            args.attempt_index.to_string(),                            // 2
61            args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),  // 3
62            args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),    // 4
63            args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(), // 5
64            args.suspension_id.to_string(),                            // 6
65            args.waitpoint_id.to_string(),                             // 7
66            args.waitpoint_key.clone(),                                // 8
67            args.reason_code.clone(),                                  // 9
68            args.requested_by.clone(),                                 // 10
69            args.timeout_at.map(|t| t.to_string()).unwrap_or_default(), // 11
70            args.resume_condition_json.clone(),                        // 12
71            args.resume_policy_json.clone(),                           // 13
72            args.continuation_metadata_pointer.clone().unwrap_or_default(), // 14
73            if args.use_pending_waitpoint { "1".into() } else { String::new() }, // 15
74            args.timeout_behavior.clone(),                             // 16
75            "1000".to_string(),                                        // 17 lease_history_maxlen
76        }
77    }
78}
79
80impl FromFcallResult for SuspendExecutionResult {
81    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
82        let r = FcallResult::parse(raw)?;
83        // ALREADY_SATISFIED: {1, "ALREADY_SATISFIED", suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
84        if r.status == "ALREADY_SATISFIED" {
85            let sid = SuspensionId::parse(&r.field_str(0))
86                .map_err(|e| ScriptError::Parse {
87                    fcall: "ff_suspend_execution".into(),
88                    execution_id: None,
89                    message: format!("bad suspension_id: {e}"),
90                })?;
91            let wid = WaitpointId::parse(&r.field_str(1))
92                .map_err(|e| ScriptError::Parse {
93                    fcall: "ff_suspend_execution".into(),
94                    execution_id: None,
95                    message: format!("bad waitpoint_id: {e}"),
96                })?;
97            let wkey = r.field_str(2);
98            let token = WaitpointToken::new(r.field_str(3));
99            return Ok(SuspendExecutionResult::AlreadySatisfied {
100                suspension_id: sid,
101                waitpoint_id: wid,
102                waitpoint_key: wkey,
103                waitpoint_token: token,
104            });
105        }
106        let r = r.into_success()?;
107        // ok(suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)
108        let sid = SuspensionId::parse(&r.field_str(0))
109            .map_err(|e| ScriptError::Parse {
110                fcall: "ff_suspend_execution".into(),
111                execution_id: None,
112                message: format!("bad suspension_id: {e}"),
113            })?;
114        let wid = WaitpointId::parse(&r.field_str(1))
115            .map_err(|e| ScriptError::Parse {
116                fcall: "ff_suspend_execution".into(),
117                execution_id: None,
118                message: format!("bad waitpoint_id: {e}"),
119            })?;
120        let wkey = r.field_str(2);
121        let token = WaitpointToken::new(r.field_str(3));
122        Ok(SuspendExecutionResult::Suspended {
123            suspension_id: sid,
124            waitpoint_id: wid,
125            waitpoint_key: wkey,
126            waitpoint_token: token,
127        })
128    }
129}
130
131// ─── ff_resume_execution ──────────────────────────────────────────────
132//
133// Lua KEYS (8): exec_core, suspension_current, waitpoint_hash,
134//               waitpoint_signals, suspension_timeout_zset,
135//               eligible_zset, delayed_zset, suspended_zset
136// Lua ARGV (3): execution_id, trigger_type, resume_delay_ms
137
138/// Key context for resume — caller must pre-read current_waitpoint_id from
139/// exec_core so the correct waitpoint keys can be passed to the Lua.
140pub struct ResumeOpKeys<'a> {
141    pub ctx: &'a ExecKeyContext,
142    pub idx: &'a IndexKeys,
143    pub lane_id: &'a LaneId,
144    pub waitpoint_id: &'a WaitpointId,
145}
146
147ff_function! {
148    pub ff_resume_execution(args: ResumeExecutionArgs) -> ResumeExecutionResult {
149        keys(k: &ResumeOpKeys<'_>) {
150            k.ctx.core(),                                              // 1
151            k.ctx.suspension_current(),                                // 2
152            k.ctx.waitpoint(k.waitpoint_id),                           // 3
153            k.ctx.waitpoint_signals(k.waitpoint_id),                   // 4
154            k.idx.suspension_timeout(),                                // 5
155            k.idx.lane_eligible(k.lane_id),                            // 6
156            k.idx.lane_delayed(k.lane_id),                             // 7
157            k.idx.lane_suspended(k.lane_id),                           // 8
158        }
159        argv {
160            args.execution_id.to_string(),                             // 1
161            args.trigger_type.clone(),                                 // 2
162            args.resume_delay_ms.to_string(),                          // 3
163        }
164    }
165}
166
167impl FromFcallResult for ResumeExecutionResult {
168    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
169        let r = FcallResult::parse(raw)?.into_success()?;
170        // ok(public_state)
171        let ps_str = r.field_str(0);
172        let public_state = parse_public_state(&ps_str)?;
173        Ok(ResumeExecutionResult::Resumed { public_state })
174    }
175}
176
177// ─── ff_create_pending_waitpoint ──────────────────────────────────────
178//
179// Lua KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset
180// Lua ARGV (5): execution_id, attempt_index, waitpoint_id, waitpoint_key,
181//               expires_at
182
183/// Minimal key context for create_pending_waitpoint.
184pub struct WaitpointOpKeys<'a> {
185    pub ctx: &'a ExecKeyContext,
186    pub idx: &'a IndexKeys,
187}
188
189ff_function! {
190    pub ff_create_pending_waitpoint(args: CreatePendingWaitpointArgs) -> CreatePendingWaitpointResult {
191        keys(k: &WaitpointOpKeys<'_>) {
192            k.ctx.core(),                                              // 1
193            k.ctx.waitpoint(&args.waitpoint_id),                       // 2
194            k.idx.pending_waitpoint_expiry(),                          // 3
195        }
196        argv {
197            args.execution_id.to_string(),                             // 1
198            args.attempt_index.to_string(),                            // 2
199            args.waitpoint_id.to_string(),                             // 3
200            args.waitpoint_key.clone(),                                // 4
201            {
202                let now_ms = TimestampMs::now();
203                TimestampMs::from_millis(now_ms.0 + args.expires_in_ms as i64).to_string()
204            },                                                         // 5
205        }
206    }
207}
208
209impl FromFcallResult for CreatePendingWaitpointResult {
210    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
211        let r = FcallResult::parse(raw)?.into_success()?;
212        // ok(waitpoint_id, waitpoint_key, waitpoint_token)
213        let wid = WaitpointId::parse(&r.field_str(0))
214            .map_err(|e| ScriptError::Parse {
215                fcall: "ff_create_pending_waitpoint".into(),
216                execution_id: None,
217                message: format!("bad waitpoint_id: {e}"),
218            })?;
219        let wkey = r.field_str(1);
220        let token = WaitpointToken::new(r.field_str(2));
221        Ok(CreatePendingWaitpointResult::Created {
222            waitpoint_id: wid,
223            waitpoint_key: wkey,
224            waitpoint_token: token,
225        })
226    }
227}
228
229// ─── ff_expire_suspension ─────────────────────────────────────────────
230//
231// Lua KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,
232//                attempt_hash, stream_meta, suspension_timeout_zset,
233//                suspended_zset, terminal_zset, eligible_zset, delayed_zset,
234//                lease_history
235// Lua ARGV (1): execution_id
236
237/// Key context for expire_suspension — caller must pre-read current_waitpoint_id
238/// and current_attempt_index from exec_core so correct keys can be passed.
239pub struct ExpireSuspensionOpKeys<'a> {
240    pub ctx: &'a ExecKeyContext,
241    pub idx: &'a IndexKeys,
242    pub lane_id: &'a LaneId,
243    pub waitpoint_id: &'a WaitpointId,
244    pub attempt_index: AttemptIndex,
245}
246
247ff_function! {
248    pub ff_expire_suspension(args: ExpireSuspensionArgs) -> ExpireSuspensionResult {
249        keys(k: &ExpireSuspensionOpKeys<'_>) {
250            k.ctx.core(),                                              // 1
251            k.ctx.suspension_current(),                                // 2
252            k.ctx.waitpoint(k.waitpoint_id),                           // 3
253            k.ctx.waitpoint_condition(k.waitpoint_id),                 // 4
254            k.ctx.attempt_hash(k.attempt_index),                       // 5
255            k.ctx.stream_meta(k.attempt_index),                        // 6
256            k.idx.suspension_timeout(),                                // 7
257            k.idx.lane_suspended(k.lane_id),                           // 8
258            k.idx.lane_terminal(k.lane_id),                            // 9
259            k.idx.lane_eligible(k.lane_id),                            // 10
260            k.idx.lane_delayed(k.lane_id),                             // 11
261            k.ctx.lease_history(),                                     // 12
262        }
263        argv {
264            args.execution_id.to_string(),                             // 1
265        }
266    }
267}
268
269impl FromFcallResult for ExpireSuspensionResult {
270    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
271        let r = FcallResult::parse(raw)?.into_success()?;
272        // ok(behavior, public_state) or ok("not_found_cleaned") etc.
273        let sub = r.field_str(0);
274        match sub.as_str() {
275            "not_found_cleaned"
276            | "not_suspended_cleaned"
277            | "no_active_suspension_cleaned"
278            | "not_yet_due" => Ok(ExpireSuspensionResult::AlreadySatisfied { reason: sub }),
279            "auto_resume" => Ok(ExpireSuspensionResult::Expired {
280                behavior_applied: "auto_resume".into(),
281            }),
282            "escalate" => Ok(ExpireSuspensionResult::Expired {
283                behavior_applied: "escalate".into(),
284            }),
285            _ => Ok(ExpireSuspensionResult::Expired {
286                behavior_applied: sub,
287            }),
288        }
289    }
290}
291
292// ─── ff_close_waitpoint ───────────────────────────────────────────────
293//
294// Lua KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset
295// Lua ARGV (2): waitpoint_id, reason
296
297ff_function! {
298    pub ff_close_waitpoint(args: CloseWaitpointArgs) -> CloseWaitpointResult {
299        keys(k: &WaitpointOpKeys<'_>) {
300            k.ctx.core(),                                              // 1
301            k.ctx.waitpoint(&args.waitpoint_id),                       // 2
302            k.idx.pending_waitpoint_expiry(),                          // 3
303        }
304        argv {
305            args.waitpoint_id.to_string(),                             // 1
306            args.reason.clone(),                                       // 2
307        }
308    }
309}
310
311impl FromFcallResult for CloseWaitpointResult {
312    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
313        let _r = FcallResult::parse(raw)?.into_success()?;
314        // ok() or ok("already_closed")
315        Ok(CloseWaitpointResult::Closed)
316    }
317}
318
319// ─── ff_rotate_waitpoint_hmac_secret ──────────────────────────────────
320//
321// Lua KEYS (1): hmac_secrets
322// Lua ARGV (3): new_kid, new_secret_hex, grace_ms
323
324ff_function! {
325    /// Rotate the waitpoint HMAC signing kid on a single partition.
326    ///
327    /// Public FCALL surface for direct-Valkey consumers (e.g. cairn-rs).
328    /// ff-server's HTTP rotation endpoint delegates to this same FCALL.
329    /// Callers fan out across every partition themselves — this wrapper
330    /// touches exactly one.
331    ///
332    /// Returns [`RotateWaitpointHmacSecretOutcome::Noop`] on exact replay
333    /// (same kid + secret). Same kid + DIFFERENT secret surfaces as
334    /// [`ScriptError::RotationConflict`] — operator should pick a fresh kid.
335    pub ff_rotate_waitpoint_hmac_secret(args: RotateWaitpointHmacSecretArgs) -> RotateWaitpointHmacSecretOutcome {
336        keys(k: &IndexKeys) {
337            k.waitpoint_hmac_secrets(), // 1
338        }
339        argv {
340            args.new_kid.clone(),        // 1
341            args.new_secret_hex.clone(), // 2
342            args.grace_ms.to_string(),   // 3
343        }
344    }
345}
346
347impl FromFcallResult for RotateWaitpointHmacSecretOutcome {
348    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
349        let r = FcallResult::parse(raw)?.into_success()?;
350        // Lua shapes:
351        //   ok("rotated", previous_kid_or_empty, new_kid, gc_count)
352        //   ok("noop",    kid)
353        let variant = r.field_str(0);
354        match variant.as_str() {
355            "rotated" => {
356                let prev = r.field_str(1);
357                let new_kid = r.field_str(2);
358                let gc_count = r
359                    .field_str(3)
360                    .parse::<u32>()
361                    .map_err(|e| ScriptError::Parse {
362                        fcall: "ff_rotate_waitpoint_hmac_secret_outcome".into(),
363                        execution_id: None,
364                        message: format!("bad gc_count: {e}"),
365                    })?;
366                Ok(RotateWaitpointHmacSecretOutcome::Rotated {
367                    previous_kid: if prev.is_empty() { None } else { Some(prev) },
368                    new_kid,
369                    gc_count,
370                })
371            }
372            "noop" => Ok(RotateWaitpointHmacSecretOutcome::Noop {
373                kid: r.field_str(1),
374            }),
375            other => Err(ScriptError::Parse {
376                fcall: "ff_rotate_waitpoint_hmac_secret_outcome".into(),
377                execution_id: None,
378                message: format!(
379                "unexpected rotation outcome: {other}"
380            ),
381            }),
382        }
383    }
384}
385
386// ─── ff_list_waitpoint_hmac_kids ──────────────────────────────────────
387//
388// Lua KEYS (1): hmac_secrets
389// Lua ARGV (0): none
390
391ff_function! {
392    /// Read-back snapshot of the waitpoint HMAC keystore on one partition.
393    /// Callers that need cluster-wide state fan out across partitions.
394    pub ff_list_waitpoint_hmac_kids(_args: ListWaitpointHmacKidsArgs) -> WaitpointHmacKids {
395        keys(k: &IndexKeys) {
396            k.waitpoint_hmac_secrets(), // 1
397        }
398        argv {}
399    }
400}
401
402impl FromFcallResult for WaitpointHmacKids {
403    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
404        let r = FcallResult::parse(raw)?.into_success()?;
405        // Lua shape: ok(current_kid_or_empty, n, kid1, exp1, kid2, exp2, ...)
406        let current = r.field_str(0);
407        let n = r
408            .field_str(1)
409            .parse::<usize>()
410            .map_err(|e| ScriptError::Parse {
411                fcall: "ff_waitpoint_hmac_kids".into(),
412                execution_id: None,
413                message: format!("bad verifying count: {e}"),
414            })?;
415        let mut verifying = Vec::with_capacity(n);
416        for i in 0..n {
417            let kid = r.field_str(2 + 2 * i);
418            let exp = r
419                .field_str(2 + 2 * i + 1)
420                .parse::<i64>()
421                .map_err(|e| ScriptError::Parse {
422                    fcall: "ff_waitpoint_hmac_kids".into(),
423                    execution_id: None,
424                    message: format!("bad expires_at_ms for kid {kid}: {e}"),
425                })?;
426            verifying.push(VerifyingKid {
427                kid,
428                expires_at_ms: exp,
429            });
430        }
431        Ok(WaitpointHmacKids {
432            current_kid: if current.is_empty() {
433                None
434            } else {
435                Some(current)
436            },
437            verifying,
438        })
439    }
440}
441
442// ─── Helpers ──────────────────────────────────────────────────────────
443
444fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
445    match s {
446        "waiting" => Ok(PublicState::Waiting),
447        "delayed" => Ok(PublicState::Delayed),
448        "rate_limited" => Ok(PublicState::RateLimited),
449        "waiting_children" => Ok(PublicState::WaitingChildren),
450        "active" => Ok(PublicState::Active),
451        "suspended" => Ok(PublicState::Suspended),
452        "completed" => Ok(PublicState::Completed),
453        "failed" => Ok(PublicState::Failed),
454        "cancelled" => Ok(PublicState::Cancelled),
455        "expired" => Ok(PublicState::Expired),
456        "skipped" => Ok(PublicState::Skipped),
457        _ => Err(ScriptError::Parse {
458            fcall: "parse_public_state".into(),
459            execution_id: None,
460            message: format!("unknown public_state: {s}"),
461        }),
462    }
463}