Skip to main content

ff_script/functions/
execution.rs

1//! Typed FCALL wrappers for execution lifecycle functions (lua/execution.lua).
2//!
3//! ## Partial-type pattern (RFC-011 §2.4)
4//!
5//! Post-RFC-011, `ExecutionId` no longer has `Default`, so parsers cannot
6//! construct result structs with a placeholder `execution_id` to be
7//! overwritten by the caller. Instead, each `ff_function!` wrapper whose
8//! result carries an `execution_id` returns a `*Partial` type that omits
9//! the field, with a `.complete(execution_id)` combinator the caller
10//! invokes after the FCALL returns (the caller always knows the
11//! `execution_id` — it supplied the id as ARGV).
12//!
13//! `complete` is a total match over the Partial variants, so future
14//! result variants that carry an `execution_id` force a compile error
15//! in `complete` until the new variant is wired through.
16
17use ff_core::contracts::*;
18use crate::error::ScriptError;
19use ff_core::keys::{ExecKeyContext, IndexKeys};
20use ff_core::state::{AttemptType, PublicState};
21use ff_core::types::*;
22
23use crate::result::{FcallResult, FromFcallResult};
24
25// ─── Partial types (RFC-011 §2.4) ──────────────────────────────────────
26
27/// Partial form of [`ClaimedExecution`] used by the parser path;
28/// caller-supplied `execution_id` is attached via [`ClaimExecutionResultPartial::complete`].
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct ClaimedExecutionPartial {
31    pub lease_id: LeaseId,
32    pub lease_epoch: LeaseEpoch,
33    pub attempt_index: AttemptIndex,
34    pub attempt_id: AttemptId,
35    pub attempt_type: AttemptType,
36    pub lease_expires_at: TimestampMs,
37}
38
39/// Partial form of [`ClaimExecutionResult`].
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub enum ClaimExecutionResultPartial {
42    Claimed(ClaimedExecutionPartial),
43}
44
45impl ClaimExecutionResultPartial {
46    /// Attach the caller-supplied `execution_id` and lift to the full
47    /// [`ClaimExecutionResult`]. Total match over Partial variants.
48    pub fn complete(self, execution_id: ExecutionId) -> ClaimExecutionResult {
49        match self {
50            Self::Claimed(p) => ClaimExecutionResult::Claimed(ClaimedExecution {
51                execution_id,
52                lease_id: p.lease_id,
53                lease_epoch: p.lease_epoch,
54                attempt_index: p.attempt_index,
55                attempt_id: p.attempt_id,
56                attempt_type: p.attempt_type,
57                lease_expires_at: p.lease_expires_at,
58            }),
59        }
60    }
61}
62
63/// Partial form of [`CompleteExecutionResult`] (omits `execution_id`).
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum CompleteExecutionResultPartial {
66    Completed { public_state: PublicState },
67}
68
69impl CompleteExecutionResultPartial {
70    pub fn complete(self, execution_id: ExecutionId) -> CompleteExecutionResult {
71        match self {
72            Self::Completed { public_state } => CompleteExecutionResult::Completed {
73                execution_id,
74                public_state,
75            },
76        }
77    }
78}
79
80/// Partial form of [`CancelExecutionResult`] (omits `execution_id`).
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum CancelExecutionResultPartial {
83    Cancelled { public_state: PublicState },
84}
85
86impl CancelExecutionResultPartial {
87    pub fn complete(self, execution_id: ExecutionId) -> CancelExecutionResult {
88        match self {
89            Self::Cancelled { public_state } => CancelExecutionResult::Cancelled {
90                execution_id,
91                public_state,
92            },
93        }
94    }
95}
96
97/// Partial form of [`DelayExecutionResult`] (omits `execution_id`).
98#[derive(Clone, Debug, PartialEq, Eq)]
99pub enum DelayExecutionResultPartial {
100    Delayed { public_state: PublicState },
101}
102
103impl DelayExecutionResultPartial {
104    pub fn complete(self, execution_id: ExecutionId) -> DelayExecutionResult {
105        match self {
106            Self::Delayed { public_state } => DelayExecutionResult::Delayed {
107                execution_id,
108                public_state,
109            },
110        }
111    }
112}
113
114/// Partial form of [`MoveToWaitingChildrenResult`] (omits `execution_id`).
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub enum MoveToWaitingChildrenResultPartial {
117    Moved { public_state: PublicState },
118}
119
120impl MoveToWaitingChildrenResultPartial {
121    pub fn complete(self, execution_id: ExecutionId) -> MoveToWaitingChildrenResult {
122        match self {
123            Self::Moved { public_state } => MoveToWaitingChildrenResult::Moved {
124                execution_id,
125                public_state,
126            },
127        }
128    }
129}
130
131/// Partial form of [`ExpireExecutionResult`].
132///
133/// Multi-variant: `Expired` carries `execution_id` (lifted); `AlreadyTerminal`
134/// does not. `complete` attaches the id only on `Expired`.
135#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum ExpireExecutionResultPartial {
137    Expired,
138    AlreadyTerminal,
139}
140
141impl ExpireExecutionResultPartial {
142    pub fn complete(self, execution_id: ExecutionId) -> ExpireExecutionResult {
143        match self {
144            Self::Expired => ExpireExecutionResult::Expired { execution_id },
145            Self::AlreadyTerminal => ExpireExecutionResult::AlreadyTerminal,
146        }
147    }
148}
149
150/// Bundles ExecKeyContext + IndexKeys + lane-scoped index resolution.
151/// Passed as the key context to all execution ff_function! invocations.
152pub struct ExecOpKeys<'a> {
153    pub ctx: &'a ExecKeyContext,
154    pub idx: &'a IndexKeys,
155    pub lane_id: &'a LaneId,
156    pub worker_instance_id: &'a WorkerInstanceId,
157}
158
159// ─── ff_create_execution ───────────────────────────────────────────────
160//
161// Lua KEYS (8): exec_core, payload, policy, tags,
162//               eligible_or_delayed_zset, idem_key,
163//               execution_deadline_zset, all_executions_set
164// Lua ARGV (13): execution_id, namespace, lane_id, execution_kind,
165//                priority, creator_identity, policy_json,
166//                input_payload, delay_until, dedup_ttl_ms,
167//                tags_json, execution_deadline_at, partition_id
168
169ff_function! {
170    pub ff_create_execution(args: CreateExecutionArgs) -> CreateExecutionResult {
171        keys(k: &ExecOpKeys<'_>) {
172            k.ctx.core(),
173            k.ctx.payload(),
174            k.ctx.policy(),
175            k.ctx.tags(),
176            // KEYS[5] = scheduling_zset: eligible OR delayed depending on delay_until.
177            // Lua ZADDs to this single key for both paths.
178            if args.delay_until.is_some() {
179                k.idx.lane_delayed(k.lane_id)
180            } else {
181                k.idx.lane_eligible(k.lane_id)
182            },
183            args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
184                ff_core::keys::idempotency_key(k.ctx.hash_tag(), args.namespace.as_str(), ik)
185            }).unwrap_or_else(|| k.ctx.noop()),
186            k.idx.execution_deadline(),
187            k.idx.all_executions(),
188        }
189        argv {
190            args.execution_id.to_string(),
191            args.namespace.to_string(),
192            args.lane_id.to_string(),
193            args.execution_kind.clone(),
194            args.priority.to_string(),
195            args.creator_identity.clone(),
196            args.policy.as_ref().map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".into())).unwrap_or_else(|| "{}".into()),
197            String::from_utf8_lossy(&args.input_payload).into_owned(),
198            args.delay_until.map(|t| t.to_string()).unwrap_or_default(),
199            args.idempotency_key.as_ref().map(|_| "86400000".to_string()).unwrap_or_default(),
200            serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".into()),
201            String::new(),
202            args.partition_id.to_string(),
203        }
204    }
205}
206
207impl FromFcallResult for CreateExecutionResult {
208    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
209        let r = FcallResult::parse(raw)?;
210        // DUPLICATE status: {1, "DUPLICATE", execution_id}
211        if r.status == "DUPLICATE" {
212            let eid_str = r.field_str(0);
213            let eid = ExecutionId::parse(&eid_str)
214                .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
215            return Ok(CreateExecutionResult::Duplicate { execution_id: eid });
216        }
217        let r = r.into_success()?;
218        let eid_str = r.field_str(0);
219        let ps_str = r.field_str(1);
220        let eid = ExecutionId::parse(&eid_str)
221            .map_err(|e| ScriptError::Parse(format!("bad execution_id: {e}")))?;
222        let public_state = parse_public_state(&ps_str)?;
223        Ok(CreateExecutionResult::Created {
224            execution_id: eid,
225            public_state,
226        })
227    }
228}
229
230// ─── ff_claim_execution ────────────────────────────────────────────────
231//
232// Lua KEYS (14): exec_core, claim_grant, eligible_zset, lease_expiry_zset,
233//                worker_leases, attempt_hash, attempt_usage, attempt_policy,
234//                attempts_zset, lease_current, lease_history, active_index,
235//                attempt_timeout_zset, execution_deadline_zset
236// Lua ARGV (12): execution_id, worker_id, worker_instance_id, lane,
237//                capability_snapshot_hash, lease_id, lease_ttl_ms,
238//                renew_before_ms, attempt_id, attempt_policy_json,
239//                attempt_timeout_ms, execution_deadline_at
240
241ff_function! {
242    pub ff_claim_execution(args: ClaimExecutionArgs) -> ClaimExecutionResultPartial {
243        keys(k: &ExecOpKeys<'_>) {
244            k.ctx.core(),
245            k.ctx.claim_grant(),
246            k.idx.lane_eligible(k.lane_id),
247            k.idx.lease_expiry(),
248            k.idx.worker_leases(k.worker_instance_id),
249            k.ctx.attempt_hash(args.expected_attempt_index),
250            k.ctx.attempt_usage(args.expected_attempt_index),
251            k.ctx.attempt_policy(args.expected_attempt_index),
252            k.ctx.attempts(),
253            k.ctx.lease_current(),
254            k.ctx.lease_history(),
255            k.idx.lane_active(k.lane_id),
256            k.idx.attempt_timeout(),
257            k.idx.execution_deadline(),
258        }
259        argv {
260            args.execution_id.to_string(),
261            args.worker_id.to_string(),
262            args.worker_instance_id.to_string(),
263            args.lane_id.to_string(),
264            String::new(),
265            args.lease_id.to_string(),
266            args.lease_ttl_ms.to_string(),
267            (args.lease_ttl_ms * 2 / 3).to_string(),
268            args.attempt_id.to_string(),
269            args.attempt_policy_json.clone(),
270            args.attempt_timeout_ms.map(|t| t.to_string()).unwrap_or_default(),
271            args.execution_deadline_at.map(|t| t.to_string()).unwrap_or_default(),
272        }
273    }
274}
275
276impl FromFcallResult for ClaimExecutionResultPartial {
277    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
278        let r = FcallResult::parse(raw)?.into_success()?;
279        // ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
280        let lease_id = LeaseId::parse(&r.field_str(0))
281            .map_err(|e| ScriptError::Parse(format!("bad lease_id: {e}")))?;
282        let epoch = r.field_str(1).parse::<u64>()
283            .map_err(|e| ScriptError::Parse(format!("bad epoch: {e}")))?;
284        let expires_at = r.field_str(2).parse::<i64>()
285            .map_err(|e| ScriptError::Parse(format!("bad expires_at: {e}")))?;
286        let attempt_id = AttemptId::parse(&r.field_str(3))
287            .map_err(|e| ScriptError::Parse(format!("bad attempt_id: {e}")))?;
288        let attempt_index = r.field_str(4).parse::<u32>()
289            .map_err(|e| ScriptError::Parse(format!("bad attempt_index: {e}")))?;
290        let attempt_type = parse_attempt_type(&r.field_str(5))?;
291
292        Ok(Self::Claimed(ClaimedExecutionPartial {
293            lease_id,
294            lease_epoch: LeaseEpoch::new(epoch),
295            attempt_index: AttemptIndex::new(attempt_index),
296            attempt_id,
297            attempt_type,
298            lease_expires_at: TimestampMs::from_millis(expires_at),
299        }))
300    }
301}
302
303// ─── ff_complete_execution ─────────────────────────────────────────────
304//
305// Lua KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,
306//                terminal_zset, lease_current, lease_history, active_index,
307//                stream_meta, result_key, attempt_timeout_zset,
308//                execution_deadline_zset
309// Lua ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, result_payload
310
311ff_function! {
312    pub ff_complete_execution(args: CompleteExecutionArgs) -> CompleteExecutionResultPartial {
313        keys(k: &ExecOpKeys<'_>) {
314            k.ctx.core(),
315            k.ctx.attempt_hash(args.attempt_index),
316            k.idx.lease_expiry(),
317            k.idx.worker_leases(k.worker_instance_id),
318            k.idx.lane_terminal(k.lane_id),
319            k.ctx.lease_current(),
320            k.ctx.lease_history(),
321            k.idx.lane_active(k.lane_id),
322            k.ctx.stream_meta(args.attempt_index),
323            k.ctx.result(),
324            k.idx.attempt_timeout(),
325            k.idx.execution_deadline(),
326        }
327        argv {
328            args.execution_id.to_string(),
329            args.lease_id.to_string(),
330            args.lease_epoch.to_string(),
331            args.attempt_id.to_string(),
332            args.result_payload.as_ref()
333                .map(|p| String::from_utf8_lossy(p).into_owned())
334                .unwrap_or_default(),
335        }
336    }
337}
338
339impl FromFcallResult for CompleteExecutionResultPartial {
340    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
341        let _r = FcallResult::parse(raw)?.into_success()?;
342        // ok("completed")
343        Ok(Self::Completed { public_state: PublicState::Completed })
344    }
345}
346
347// ─── ff_cancel_execution ───────────────────────────────────────────────
348//
349// Lua KEYS (21): exec_core, attempt_hash, stream_meta, lease_current,
350//                lease_history, lease_expiry_zset, worker_leases,
351//                suspension_current, waitpoint_hash, wp_condition,
352//                suspension_timeout_zset, terminal_zset,
353//                attempt_timeout_zset, execution_deadline_zset,
354//                eligible_zset, delayed_zset, blocked_deps_zset,
355//                blocked_budget_zset, blocked_quota_zset,
356//                blocked_route_zset, blocked_operator_zset
357// Lua ARGV (5): execution_id, reason, source, lease_id, lease_epoch
358
359// Cancel needs suspension/waitpoint keys that depend on runtime state.
360// KEYS[9] and [10] require the waitpoint_id from the suspension record,
361// which isn't known until we read suspension:current inside the Lua.
362// Phase 1 workaround: pass the suspension_current key as a same-slot
363// placeholder. The Lua uses EXISTS checks and will no-op when the key
364// type doesn't match HSET expectations (suspension_current is a HASH,
365// not a waitpoint hash, but EXISTS returns 0 if the suspension was
366// already closed/deleted). Phase 3 must pre-read the waitpoint_id and
367// pass the real keys.
368ff_function! {
369    pub ff_cancel_execution(args: CancelExecutionArgs) -> CancelExecutionResultPartial {
370        keys(k: &ExecOpKeys<'_>) {
371            k.ctx.core(),                                       // 1
372            k.ctx.attempt_hash(AttemptIndex::new(0)),           // 2 placeholder
373            k.ctx.stream_meta(AttemptIndex::new(0)),            // 3 placeholder
374            k.ctx.lease_current(),                              // 4
375            k.ctx.lease_history(),                               // 5
376            k.idx.lease_expiry(),                               // 6
377            k.idx.worker_leases(k.worker_instance_id),          // 7
378            k.ctx.suspension_current(),                         // 8
379            k.ctx.suspension_current(),                         // 9 placeholder — real: wp hash (Phase 3)
380            k.ctx.suspension_current(),                         // 10 placeholder — real: wp condition (Phase 3)
381            k.idx.suspension_timeout(),                         // 11
382            k.idx.lane_terminal(k.lane_id),                     // 12
383            k.idx.attempt_timeout(),                            // 13
384            k.idx.execution_deadline(),                         // 14
385            k.idx.lane_eligible(k.lane_id),                     // 15
386            k.idx.lane_delayed(k.lane_id),                      // 16
387            k.idx.lane_blocked_dependencies(k.lane_id),         // 17
388            k.idx.lane_blocked_budget(k.lane_id),               // 18
389            k.idx.lane_blocked_quota(k.lane_id),                // 19
390            k.idx.lane_blocked_route(k.lane_id),                // 20
391            k.idx.lane_blocked_operator(k.lane_id),             // 21
392        }
393        argv {
394            args.execution_id.to_string(),
395            args.reason.clone(),
396            args.source.to_string(),
397            args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
398            args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
399        }
400    }
401}
402
403impl FromFcallResult for CancelExecutionResultPartial {
404    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
405        let _r = FcallResult::parse(raw)?.into_success()?;
406        // ok("cancelled", cancelled_from_state)
407        Ok(Self::Cancelled { public_state: PublicState::Cancelled })
408    }
409}
410
411// ─── ff_delay_execution ────────────────────────────────────────────────
412//
413// Lua KEYS (9): exec_core, attempt_hash, lease_current, lease_history,
414//               lease_expiry_zset, worker_leases, active_index,
415//               delayed_zset, attempt_timeout_zset
416// Lua ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, delay_until
417
418ff_function! {
419    pub ff_delay_execution(args: DelayExecutionArgs) -> DelayExecutionResultPartial {
420        keys(k: &ExecOpKeys<'_>) {
421            k.ctx.core(),
422            k.ctx.attempt_hash(args.attempt_index),
423            k.ctx.lease_current(),
424            k.ctx.lease_history(),
425            k.idx.lease_expiry(),
426            k.idx.worker_leases(k.worker_instance_id),
427            k.idx.lane_active(k.lane_id),
428            k.idx.lane_delayed(k.lane_id),
429            k.idx.attempt_timeout(),
430        }
431        argv {
432            args.execution_id.to_string(),
433            args.lease_id.to_string(),
434            args.lease_epoch.to_string(),
435            args.attempt_id.to_string(),
436            args.delay_until.to_string(),
437        }
438    }
439}
440
441impl FromFcallResult for DelayExecutionResultPartial {
442    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
443        let _r = FcallResult::parse(raw)?.into_success()?;
444        // ok(delay_until)
445        Ok(Self::Delayed { public_state: PublicState::Delayed })
446    }
447}
448
449// ─── ff_move_to_waiting_children ───────────────────────────────────────
450//
451// Lua KEYS (9): exec_core, attempt_hash, lease_current, lease_history,
452//               lease_expiry_zset, worker_leases, active_index,
453//               blocked_deps_zset, attempt_timeout_zset
454// Lua ARGV (4): execution_id, lease_id, lease_epoch, attempt_id
455
456ff_function! {
457    pub ff_move_to_waiting_children(args: MoveToWaitingChildrenArgs) -> MoveToWaitingChildrenResultPartial {
458        keys(k: &ExecOpKeys<'_>) {
459            k.ctx.core(),
460            k.ctx.attempt_hash(args.attempt_index),
461            k.ctx.lease_current(),
462            k.ctx.lease_history(),
463            k.idx.lease_expiry(),
464            k.idx.worker_leases(k.worker_instance_id),
465            k.idx.lane_active(k.lane_id),
466            k.idx.lane_blocked_dependencies(k.lane_id),
467            k.idx.attempt_timeout(),
468        }
469        argv {
470            args.execution_id.to_string(),
471            args.lease_id.to_string(),
472            args.lease_epoch.to_string(),
473            args.attempt_id.to_string(),
474        }
475    }
476}
477
478impl FromFcallResult for MoveToWaitingChildrenResultPartial {
479    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
480        let _r = FcallResult::parse(raw)?.into_success()?;
481        // ok()
482        Ok(Self::Moved { public_state: PublicState::WaitingChildren })
483    }
484}
485
486// ─── ff_fail_execution ─────────────────────────────────────────────────
487//
488// Lua KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,
489//                terminal_zset, delayed_zset, lease_current, lease_history,
490//                active_index, stream_meta, attempt_timeout_zset,
491//                execution_deadline_zset
492// Lua ARGV (7): execution_id, lease_id, lease_epoch, attempt_id,
493//               failure_reason, failure_category, retry_policy_json
494
495ff_function! {
496    pub ff_fail_execution(args: FailExecutionArgs) -> FailExecutionResult {
497        keys(k: &ExecOpKeys<'_>) {
498            k.ctx.core(),
499            k.ctx.attempt_hash(args.attempt_index),
500            k.idx.lease_expiry(),
501            k.idx.worker_leases(k.worker_instance_id),
502            k.idx.lane_terminal(k.lane_id),
503            k.idx.lane_delayed(k.lane_id),
504            k.ctx.lease_current(),
505            k.ctx.lease_history(),
506            k.idx.lane_active(k.lane_id),
507            k.ctx.stream_meta(args.attempt_index),
508            k.idx.attempt_timeout(),
509            k.idx.execution_deadline(),
510        }
511        argv {
512            args.execution_id.to_string(),
513            args.lease_id.to_string(),
514            args.lease_epoch.to_string(),
515            args.attempt_id.to_string(),
516            args.failure_reason.clone(),
517            args.failure_category.clone(),
518            args.retry_policy_json.clone(),
519        }
520    }
521}
522
523impl FromFcallResult for FailExecutionResult {
524    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
525        let r = FcallResult::parse(raw)?.into_success()?;
526        // ok("retry_scheduled", delay_until) or ok("terminal_failed")
527        let sub_status = r.field_str(0);
528        match sub_status.as_str() {
529            "retry_scheduled" => {
530                let delay_str = r.field_str(1);
531                let delay_ms: i64 = delay_str
532                    .parse()
533                    .map_err(|e| ScriptError::Parse(format!("bad delay_until: {e}")))?;
534                Ok(FailExecutionResult::RetryScheduled {
535                    delay_until: TimestampMs::from_millis(delay_ms),
536                    next_attempt_index: AttemptIndex::new(0), // computed by claim_execution
537                })
538            }
539            "terminal_failed" => Ok(FailExecutionResult::TerminalFailed),
540            _ => Err(ScriptError::Parse(format!(
541                "unexpected fail sub-status: {sub_status}"
542            ))),
543        }
544    }
545}
546
547// ─── ff_expire_execution ───────────────────────────────────────────────
548//
549// Lua KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,
550//                lease_history, lease_expiry_zset, worker_leases,
551//                active_index, terminal_zset, attempt_timeout_zset,
552//                execution_deadline_zset, suspended_zset,
553//                suspension_timeout_zset, suspension_current
554// Lua ARGV (2): execution_id, expire_reason
555
556ff_function! {
557    pub ff_expire_execution(args: ExpireExecutionArgs) -> ExpireExecutionResultPartial {
558        keys(k: &ExecOpKeys<'_>) {
559            k.ctx.core(),
560            k.ctx.attempt_hash(AttemptIndex::new(0)),   // placeholder
561            k.ctx.stream_meta(AttemptIndex::new(0)),     // placeholder
562            k.ctx.lease_current(),
563            k.ctx.lease_history(),
564            k.idx.lease_expiry(),
565            k.idx.worker_leases(k.worker_instance_id),
566            k.idx.lane_active(k.lane_id),
567            k.idx.lane_terminal(k.lane_id),
568            k.idx.attempt_timeout(),
569            k.idx.execution_deadline(),
570            k.idx.lane_suspended(k.lane_id),
571            k.idx.suspension_timeout(),
572            k.ctx.suspension_current(),
573        }
574        argv {
575            args.execution_id.to_string(),
576            args.expire_reason.clone(),
577        }
578    }
579}
580
581impl FromFcallResult for ExpireExecutionResultPartial {
582    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
583        let r = FcallResult::parse(raw)?.into_success()?;
584        // ok("expired", from_phase) or ok("already_terminal") or ok("not_found_cleaned")
585        let sub = r.field_str(0);
586        match sub.as_str() {
587            "already_terminal" | "not_found_cleaned" => Ok(Self::AlreadyTerminal),
588            "expired" => Ok(Self::Expired),
589            _ => Ok(Self::Expired),
590        }
591    }
592}
593
594// ─── Helpers ───────────────────────────────────────────────────────────
595
596fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
597    match s {
598        "waiting" => Ok(PublicState::Waiting),
599        "delayed" => Ok(PublicState::Delayed),
600        "rate_limited" => Ok(PublicState::RateLimited),
601        "waiting_children" => Ok(PublicState::WaitingChildren),
602        "active" => Ok(PublicState::Active),
603        "suspended" => Ok(PublicState::Suspended),
604        "completed" => Ok(PublicState::Completed),
605        "failed" => Ok(PublicState::Failed),
606        "cancelled" => Ok(PublicState::Cancelled),
607        "expired" => Ok(PublicState::Expired),
608        "skipped" => Ok(PublicState::Skipped),
609        _ => Err(ScriptError::Parse(format!("unknown public_state: {s}"))),
610    }
611}
612
613fn parse_attempt_type(s: &str) -> Result<AttemptType, ScriptError> {
614    match s {
615        "initial" => Ok(AttemptType::Initial),
616        "retry" => Ok(AttemptType::Retry),
617        "reclaim" => Ok(AttemptType::Reclaim),
618        "replay" => Ok(AttemptType::Replay),
619        "fallback" => Ok(AttemptType::Fallback),
620        _ => Err(ScriptError::Parse(format!("unknown attempt_type: {s}"))),
621    }
622}
623
624// ─── Partial-type tests (RFC-011 §2.4 acceptance) ──────────────────────
625#[cfg(test)]
626mod partial_tests {
627    use super::*;
628    use ff_core::partition::PartitionConfig;
629
630    fn test_eid() -> ExecutionId {
631        ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default())
632    }
633
634    #[test]
635    fn claim_partial_complete_attaches_execution_id() {
636        let partial = ClaimExecutionResultPartial::Claimed(ClaimedExecutionPartial {
637            lease_id: LeaseId::new(),
638            lease_epoch: LeaseEpoch::new(1),
639            attempt_index: AttemptIndex::new(0),
640            attempt_id: AttemptId::new(),
641            attempt_type: AttemptType::Initial,
642            lease_expires_at: TimestampMs::from_millis(1000),
643        });
644        let eid = test_eid();
645        let full = partial.complete(eid.clone());
646        match full {
647            ClaimExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
648        }
649    }
650
651    #[test]
652    fn complete_partial_complete_attaches_execution_id() {
653        let partial = CompleteExecutionResultPartial::Completed {
654            public_state: PublicState::Completed,
655        };
656        let eid = test_eid();
657        let full = partial.complete(eid.clone());
658        match full {
659            CompleteExecutionResult::Completed { execution_id, .. } => assert_eq!(execution_id, eid),
660        }
661    }
662
663    #[test]
664    fn cancel_partial_complete_attaches_execution_id() {
665        let partial = CancelExecutionResultPartial::Cancelled {
666            public_state: PublicState::Cancelled,
667        };
668        let eid = test_eid();
669        let full = partial.complete(eid.clone());
670        match full {
671            CancelExecutionResult::Cancelled { execution_id, .. } => assert_eq!(execution_id, eid),
672        }
673    }
674
675    #[test]
676    fn delay_partial_complete_attaches_execution_id() {
677        let partial = DelayExecutionResultPartial::Delayed {
678            public_state: PublicState::Delayed,
679        };
680        let eid = test_eid();
681        let full = partial.complete(eid.clone());
682        match full {
683            DelayExecutionResult::Delayed { execution_id, .. } => assert_eq!(execution_id, eid),
684        }
685    }
686
687    #[test]
688    fn move_to_waiting_children_partial_complete_attaches_execution_id() {
689        let partial = MoveToWaitingChildrenResultPartial::Moved {
690            public_state: PublicState::WaitingChildren,
691        };
692        let eid = test_eid();
693        let full = partial.complete(eid.clone());
694        match full {
695            MoveToWaitingChildrenResult::Moved { execution_id, .. } => assert_eq!(execution_id, eid),
696        }
697    }
698
699    #[test]
700    fn expire_partial_expired_variant_attaches_execution_id() {
701        let partial = ExpireExecutionResultPartial::Expired;
702        let eid = test_eid();
703        let full = partial.complete(eid.clone());
704        match full {
705            ExpireExecutionResult::Expired { execution_id } => assert_eq!(execution_id, eid),
706            _ => panic!("expected Expired variant"),
707        }
708    }
709
710    #[test]
711    fn expire_partial_already_terminal_variant_ignores_execution_id() {
712        // Multi-variant exhaustiveness test: AlreadyTerminal has no
713        // execution_id field, so complete() passes it through without
714        // attaching. Verifies the variant-mirror pattern.
715        let partial = ExpireExecutionResultPartial::AlreadyTerminal;
716        let eid = test_eid();
717        let full = partial.complete(eid);
718        assert!(matches!(full, ExpireExecutionResult::AlreadyTerminal));
719    }
720}