Skip to main content

ff_script/functions/
execution.rs

1//! Typed FCALL wrappers for execution lifecycle functions (lua/execution.lua).
2//!
3//! ## Partial-type pattern (RFC-011 §2.4)
4//!
5//! Post-RFC-011, `ExecutionId` no longer has `Default`, so parsers cannot
6//! construct result structs with a placeholder `execution_id` to be
7//! overwritten by the caller. Instead, each `ff_function!` wrapper whose
8//! result carries an `execution_id` returns a `*Partial` type that omits
9//! the field, with a `.complete(execution_id)` combinator the caller
10//! invokes after the FCALL returns (the caller always knows the
11//! `execution_id` — it supplied the id as ARGV).
12//!
13//! `complete` is a total match over the Partial variants, so future
14//! result variants that carry an `execution_id` force a compile error
15//! in `complete` until the new variant is wired through.
16
17use crate::error::ScriptError;
18use ff_core::contracts::*;
19use ff_core::keys::{ExecKeyContext, IndexKeys};
20use ff_core::state::{AttemptType, PublicState};
21use ff_core::types::*;
22
23use crate::result::{FcallResult, FromFcallResult};
24
25// ─── Partial types (RFC-011 §2.4) ──────────────────────────────────────
26
27/// Partial form of [`ClaimedExecution`] used by the parser path;
28/// caller-supplied `execution_id` is attached via [`ClaimExecutionResultPartial::complete`].
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct ClaimedExecutionPartial {
31    pub lease_id: LeaseId,
32    pub lease_epoch: LeaseEpoch,
33    pub attempt_index: AttemptIndex,
34    pub attempt_id: AttemptId,
35    pub attempt_type: AttemptType,
36    pub lease_expires_at: TimestampMs,
37}
38
39/// Partial form of [`ClaimExecutionResult`].
40#[derive(Clone, Debug, PartialEq, Eq)]
41pub enum ClaimExecutionResultPartial {
42    Claimed(ClaimedExecutionPartial),
43}
44
45impl ClaimExecutionResultPartial {
46    /// Attach the caller-supplied `execution_id` and lift to the full
47    /// [`ClaimExecutionResult`]. Total match over Partial variants.
48    pub fn complete(self, execution_id: ExecutionId) -> ClaimExecutionResult {
49        match self {
50            Self::Claimed(p) => ClaimExecutionResult::Claimed(ClaimedExecution {
51                execution_id,
52                lease_id: p.lease_id,
53                lease_epoch: p.lease_epoch,
54                attempt_index: p.attempt_index,
55                attempt_id: p.attempt_id,
56                attempt_type: p.attempt_type,
57                lease_expires_at: p.lease_expires_at,
58            }),
59        }
60    }
61}
62
63/// Partial form of [`CompleteExecutionResult`] (omits `execution_id`).
64#[derive(Clone, Debug, PartialEq, Eq)]
65pub enum CompleteExecutionResultPartial {
66    Completed { public_state: PublicState },
67}
68
69impl CompleteExecutionResultPartial {
70    pub fn complete(self, execution_id: ExecutionId) -> CompleteExecutionResult {
71        match self {
72            Self::Completed { public_state } => CompleteExecutionResult::Completed {
73                execution_id,
74                public_state,
75            },
76        }
77    }
78}
79
80/// Partial form of [`CancelExecutionResult`] (omits `execution_id`).
81#[derive(Clone, Debug, PartialEq, Eq)]
82pub enum CancelExecutionResultPartial {
83    Cancelled { public_state: PublicState },
84}
85
86impl CancelExecutionResultPartial {
87    pub fn complete(self, execution_id: ExecutionId) -> CancelExecutionResult {
88        match self {
89            Self::Cancelled { public_state } => CancelExecutionResult::Cancelled {
90                execution_id,
91                public_state,
92            },
93        }
94    }
95}
96
97/// Partial form of [`DelayExecutionResult`] (omits `execution_id`).
98#[derive(Clone, Debug, PartialEq, Eq)]
99pub enum DelayExecutionResultPartial {
100    Delayed { public_state: PublicState },
101}
102
103impl DelayExecutionResultPartial {
104    pub fn complete(self, execution_id: ExecutionId) -> DelayExecutionResult {
105        match self {
106            Self::Delayed { public_state } => DelayExecutionResult::Delayed {
107                execution_id,
108                public_state,
109            },
110        }
111    }
112}
113
114/// Partial form of [`MoveToWaitingChildrenResult`] (omits `execution_id`).
115#[derive(Clone, Debug, PartialEq, Eq)]
116pub enum MoveToWaitingChildrenResultPartial {
117    Moved { public_state: PublicState },
118}
119
120impl MoveToWaitingChildrenResultPartial {
121    pub fn complete(self, execution_id: ExecutionId) -> MoveToWaitingChildrenResult {
122        match self {
123            Self::Moved { public_state } => MoveToWaitingChildrenResult::Moved {
124                execution_id,
125                public_state,
126            },
127        }
128    }
129}
130
131/// Partial form of [`ExpireExecutionResult`].
132///
133/// Multi-variant: `Expired` carries `execution_id` (lifted); `AlreadyTerminal`
134/// does not. `complete` attaches the id only on `Expired`.
135#[derive(Clone, Debug, PartialEq, Eq)]
136pub enum ExpireExecutionResultPartial {
137    Expired,
138    AlreadyTerminal,
139}
140
141impl ExpireExecutionResultPartial {
142    pub fn complete(self, execution_id: ExecutionId) -> ExpireExecutionResult {
143        match self {
144            Self::Expired => ExpireExecutionResult::Expired { execution_id },
145            Self::AlreadyTerminal => ExpireExecutionResult::AlreadyTerminal,
146        }
147    }
148}
149
150/// Bundles ExecKeyContext + IndexKeys + lane-scoped index resolution.
151/// Passed as the key context to all execution ff_function! invocations.
152pub struct ExecOpKeys<'a> {
153    pub ctx: &'a ExecKeyContext,
154    pub idx: &'a IndexKeys,
155    pub lane_id: &'a LaneId,
156    pub worker_instance_id: &'a WorkerInstanceId,
157}
158
159// ─── ff_create_execution ───────────────────────────────────────────────
160//
161// Lua KEYS (8): exec_core, payload, policy, tags,
162//               eligible_or_delayed_zset, idem_key,
163//               execution_deadline_zset, all_executions_set
164// Lua ARGV (13): execution_id, namespace, lane_id, execution_kind,
165//                priority, creator_identity, policy_json,
166//                input_payload, delay_until, dedup_ttl_ms,
167//                tags_json, execution_deadline_at, partition_id
168
169ff_function! {
170    pub ff_create_execution(args: CreateExecutionArgs) -> CreateExecutionResult {
171        keys(k: &ExecOpKeys<'_>) {
172            k.ctx.core(),
173            k.ctx.payload(),
174            k.ctx.policy(),
175            k.ctx.tags(),
176            // KEYS[5] = scheduling_zset: eligible OR delayed depending on delay_until.
177            // Lua ZADDs to this single key for both paths.
178            if args.delay_until.is_some() {
179                k.idx.lane_delayed(k.lane_id)
180            } else {
181                k.idx.lane_eligible(k.lane_id)
182            },
183            args.idempotency_key.as_ref().filter(|ik| !ik.is_empty()).map(|ik| {
184                ff_core::keys::idempotency_key(k.ctx.hash_tag(), args.namespace.as_str(), ik)
185            }).unwrap_or_else(|| k.ctx.noop()),
186            k.idx.execution_deadline(),
187            k.idx.all_executions(),
188        }
189        argv {
190            args.execution_id.to_string(),
191            args.namespace.to_string(),
192            args.lane_id.to_string(),
193            args.execution_kind.clone(),
194            args.priority.to_string(),
195            args.creator_identity.clone(),
196            args.policy.as_ref().map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".into())).unwrap_or_else(|| "{}".into()),
197            String::from_utf8_lossy(&args.input_payload).into_owned(),
198            args.delay_until.map(|t| t.to_string()).unwrap_or_default(),
199            args.idempotency_key.as_ref().map(|_| "86400000".to_string()).unwrap_or_default(),
200            serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".into()),
201            args.execution_deadline_at.map(|t| t.to_string()).unwrap_or_default(),
202            args.partition_id.to_string(),
203        }
204    }
205}
206
207impl FromFcallResult for CreateExecutionResult {
208    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
209        let r = FcallResult::parse(raw)?;
210        // DUPLICATE status: {1, "DUPLICATE", execution_id}
211        if r.status == "DUPLICATE" {
212            let eid_str = r.field_str(0);
213            let eid = ExecutionId::parse(&eid_str)
214                .map_err(|e| ScriptError::Parse {
215                    fcall: "ff_create_execution".into(),
216                    execution_id: None,
217                    message: format!("bad execution_id: {e}"),
218                })?;
219            return Ok(CreateExecutionResult::Duplicate { execution_id: eid });
220        }
221        let r = r.into_success()?;
222        let eid_str = r.field_str(0);
223        let ps_str = r.field_str(1);
224        let eid = ExecutionId::parse(&eid_str)
225            .map_err(|e| ScriptError::Parse {
226                fcall: "ff_create_execution".into(),
227                execution_id: None,
228                message: format!("bad execution_id: {e}"),
229            })?;
230        let public_state = parse_public_state(&ps_str)?;
231        Ok(CreateExecutionResult::Created {
232            execution_id: eid,
233            public_state,
234        })
235    }
236}
237
238// ─── ff_claim_execution ────────────────────────────────────────────────
239//
240// Lua KEYS (14): exec_core, claim_grant, eligible_zset, lease_expiry_zset,
241//                worker_leases, attempt_hash, attempt_usage, attempt_policy,
242//                attempts_zset, lease_current, lease_history, active_index,
243//                attempt_timeout_zset, execution_deadline_zset
244// Lua ARGV (12): execution_id, worker_id, worker_instance_id, lane,
245//                capability_snapshot_hash, lease_id, lease_ttl_ms,
246//                renew_before_ms, attempt_id, attempt_policy_json,
247//                attempt_timeout_ms, execution_deadline_at
248
249ff_function! {
250    pub ff_claim_execution(args: ClaimExecutionArgs) -> ClaimExecutionResultPartial {
251        keys(k: &ExecOpKeys<'_>) {
252            k.ctx.core(),
253            k.ctx.claim_grant(),
254            k.idx.lane_eligible(k.lane_id),
255            k.idx.lease_expiry(),
256            k.idx.worker_leases(k.worker_instance_id),
257            k.ctx.attempt_hash(args.expected_attempt_index),
258            k.ctx.attempt_usage(args.expected_attempt_index),
259            k.ctx.attempt_policy(args.expected_attempt_index),
260            k.ctx.attempts(),
261            k.ctx.lease_current(),
262            k.ctx.lease_history(),
263            k.idx.lane_active(k.lane_id),
264            k.idx.attempt_timeout(),
265            k.idx.execution_deadline(),
266        }
267        argv {
268            args.execution_id.to_string(),
269            args.worker_id.to_string(),
270            args.worker_instance_id.to_string(),
271            args.lane_id.to_string(),
272            String::new(),
273            args.lease_id.to_string(),
274            args.lease_ttl_ms.to_string(),
275            (args.lease_ttl_ms * 2 / 3).to_string(),
276            args.attempt_id.to_string(),
277            args.attempt_policy_json.clone(),
278            args.attempt_timeout_ms.map(|t| t.to_string()).unwrap_or_default(),
279            args.execution_deadline_at.map(|t| t.to_string()).unwrap_or_default(),
280        }
281    }
282}
283
284impl FromFcallResult for ClaimExecutionResultPartial {
285    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
286        let r = FcallResult::parse(raw)?.into_success()?;
287        // ok(lease_id, epoch, expires_at, attempt_id, attempt_index, attempt_type)
288        let lease_id = LeaseId::parse(&r.field_str(0))
289            .map_err(|e| ScriptError::Parse {
290                fcall: "ff_claim_execution_result_partial".into(),
291                execution_id: None,
292                message: format!("bad lease_id: {e}"),
293            })?;
294        let epoch = r
295            .field_str(1)
296            .parse::<u64>()
297            .map_err(|e| ScriptError::Parse {
298                fcall: "ff_claim_execution_result_partial".into(),
299                execution_id: None,
300                message: format!("bad epoch: {e}"),
301            })?;
302        let expires_at = r
303            .field_str(2)
304            .parse::<i64>()
305            .map_err(|e| ScriptError::Parse {
306                fcall: "ff_claim_execution_result_partial".into(),
307                execution_id: None,
308                message: format!("bad expires_at: {e}"),
309            })?;
310        let attempt_id = AttemptId::parse(&r.field_str(3))
311            .map_err(|e| ScriptError::Parse {
312                fcall: "ff_claim_execution_result_partial".into(),
313                execution_id: None,
314                message: format!("bad attempt_id: {e}"),
315            })?;
316        let attempt_index = r
317            .field_str(4)
318            .parse::<u32>()
319            .map_err(|e| ScriptError::Parse {
320                fcall: "ff_claim_execution_result_partial".into(),
321                execution_id: None,
322                message: format!("bad attempt_index: {e}"),
323            })?;
324        let attempt_type = parse_attempt_type(&r.field_str(5))?;
325
326        Ok(Self::Claimed(ClaimedExecutionPartial {
327            lease_id,
328            lease_epoch: LeaseEpoch::new(epoch),
329            attempt_index: AttemptIndex::new(attempt_index),
330            attempt_id,
331            attempt_type,
332            lease_expires_at: TimestampMs::from_millis(expires_at),
333        }))
334    }
335}
336
337// ─── ff_complete_execution ─────────────────────────────────────────────
338//
339// Lua KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,
340//                terminal_zset, lease_current, lease_history, active_index,
341//                stream_meta, result_key, attempt_timeout_zset,
342//                execution_deadline_zset
343// Lua ARGV (6): execution_id, lease_id, lease_epoch, attempt_id,
344//               result_payload, source
345//
346// RFC #58.5: `fence` is `Option<LeaseFence>`. `None` emits empty strings
347// for the triple; the Lua then requires `source == "operator_override"`
348// or returns `fence_required`.
349
350ff_function! {
351    pub ff_complete_execution(args: CompleteExecutionArgs) -> CompleteExecutionResultPartial {
352        keys(k: &ExecOpKeys<'_>) {
353            k.ctx.core(),
354            k.ctx.attempt_hash(args.attempt_index),
355            k.idx.lease_expiry(),
356            k.idx.worker_leases(k.worker_instance_id),
357            k.idx.lane_terminal(k.lane_id),
358            k.ctx.lease_current(),
359            k.ctx.lease_history(),
360            k.idx.lane_active(k.lane_id),
361            k.ctx.stream_meta(args.attempt_index),
362            k.ctx.result(),
363            k.idx.attempt_timeout(),
364            k.idx.execution_deadline(),
365        }
366        argv {
367            args.execution_id.to_string(),
368            args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
369            args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
370            args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
371            args.result_payload.as_ref()
372                .map(|p| String::from_utf8_lossy(p).into_owned())
373                .unwrap_or_default(),
374            args.source.to_string(),
375        }
376    }
377}
378
379impl FromFcallResult for CompleteExecutionResultPartial {
380    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
381        let _r = FcallResult::parse(raw)?.into_success()?;
382        // ok("completed")
383        Ok(Self::Completed {
384            public_state: PublicState::Completed,
385        })
386    }
387}
388
389// ─── ff_cancel_execution ───────────────────────────────────────────────
390//
391// Lua KEYS (21): exec_core, attempt_hash, stream_meta, lease_current,
392//                lease_history, lease_expiry_zset, worker_leases,
393//                suspension_current, waitpoint_hash, wp_condition,
394//                suspension_timeout_zset, terminal_zset,
395//                attempt_timeout_zset, execution_deadline_zset,
396//                eligible_zset, delayed_zset, blocked_deps_zset,
397//                blocked_budget_zset, blocked_quota_zset,
398//                blocked_route_zset, blocked_operator_zset
399// Lua ARGV (5): execution_id, reason, source, lease_id, lease_epoch
400
401// Cancel needs suspension/waitpoint keys that depend on runtime state.
402// KEYS[9] and [10] require the waitpoint_id from the suspension record,
403// which isn't known until we read suspension:current inside the Lua.
404// Phase 1 workaround: pass the suspension_current key as a same-slot
405// placeholder. The Lua uses EXISTS checks and will no-op when the key
406// type doesn't match HSET expectations (suspension_current is a HASH,
407// not a waitpoint hash, but EXISTS returns 0 if the suspension was
408// already closed/deleted). Phase 3 must pre-read the waitpoint_id and
409// pass the real keys.
410ff_function! {
411    pub ff_cancel_execution(args: CancelExecutionArgs) -> CancelExecutionResultPartial {
412        keys(k: &ExecOpKeys<'_>) {
413            k.ctx.core(),                                       // 1
414            k.ctx.attempt_hash(AttemptIndex::new(0)),           // 2 placeholder
415            k.ctx.stream_meta(AttemptIndex::new(0)),            // 3 placeholder
416            k.ctx.lease_current(),                              // 4
417            k.ctx.lease_history(),                               // 5
418            k.idx.lease_expiry(),                               // 6
419            k.idx.worker_leases(k.worker_instance_id),          // 7
420            k.ctx.suspension_current(),                         // 8
421            k.ctx.suspension_current(),                         // 9 placeholder — real: wp hash (Phase 3)
422            k.ctx.suspension_current(),                         // 10 placeholder — real: wp condition (Phase 3)
423            k.idx.suspension_timeout(),                         // 11
424            k.idx.lane_terminal(k.lane_id),                     // 12
425            k.idx.attempt_timeout(),                            // 13
426            k.idx.execution_deadline(),                         // 14
427            k.idx.lane_eligible(k.lane_id),                     // 15
428            k.idx.lane_delayed(k.lane_id),                      // 16
429            k.idx.lane_blocked_dependencies(k.lane_id),         // 17
430            k.idx.lane_blocked_budget(k.lane_id),               // 18
431            k.idx.lane_blocked_quota(k.lane_id),                // 19
432            k.idx.lane_blocked_route(k.lane_id),                // 20
433            k.idx.lane_blocked_operator(k.lane_id),             // 21
434        }
435        argv {
436            args.execution_id.to_string(),
437            args.reason.clone(),
438            args.source.to_string(),
439            args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
440            args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
441        }
442    }
443}
444
445impl FromFcallResult for CancelExecutionResultPartial {
446    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
447        let _r = FcallResult::parse(raw)?.into_success()?;
448        // ok("cancelled", cancelled_from_state)
449        Ok(Self::Cancelled {
450            public_state: PublicState::Cancelled,
451        })
452    }
453}
454
455// ─── ff_delay_execution ────────────────────────────────────────────────
456//
457// Lua KEYS (9): exec_core, attempt_hash, lease_current, lease_history,
458//               lease_expiry_zset, worker_leases, active_index,
459//               delayed_zset, attempt_timeout_zset
460// Lua ARGV (6): execution_id, lease_id, lease_epoch, attempt_id,
461//               delay_until, source
462//
463// RFC #58.5: `fence` is `Option<LeaseFence>`. See ff_complete_execution.
464
465ff_function! {
466    pub ff_delay_execution(args: DelayExecutionArgs) -> DelayExecutionResultPartial {
467        keys(k: &ExecOpKeys<'_>) {
468            k.ctx.core(),
469            k.ctx.attempt_hash(args.attempt_index),
470            k.ctx.lease_current(),
471            k.ctx.lease_history(),
472            k.idx.lease_expiry(),
473            k.idx.worker_leases(k.worker_instance_id),
474            k.idx.lane_active(k.lane_id),
475            k.idx.lane_delayed(k.lane_id),
476            k.idx.attempt_timeout(),
477        }
478        argv {
479            args.execution_id.to_string(),
480            args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
481            args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
482            args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
483            args.delay_until.to_string(),
484            args.source.to_string(),
485        }
486    }
487}
488
489impl FromFcallResult for DelayExecutionResultPartial {
490    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
491        let _r = FcallResult::parse(raw)?.into_success()?;
492        // ok(delay_until)
493        Ok(Self::Delayed {
494            public_state: PublicState::Delayed,
495        })
496    }
497}
498
499// ─── ff_move_to_waiting_children ───────────────────────────────────────
500//
501// Lua KEYS (9): exec_core, attempt_hash, lease_current, lease_history,
502//               lease_expiry_zset, worker_leases, active_index,
503//               blocked_deps_zset, attempt_timeout_zset
504// Lua ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, source
505//
506// RFC #58.5: `fence` is `Option<LeaseFence>`. See ff_complete_execution.
507
508ff_function! {
509    pub ff_move_to_waiting_children(args: MoveToWaitingChildrenArgs) -> MoveToWaitingChildrenResultPartial {
510        keys(k: &ExecOpKeys<'_>) {
511            k.ctx.core(),
512            k.ctx.attempt_hash(args.attempt_index),
513            k.ctx.lease_current(),
514            k.ctx.lease_history(),
515            k.idx.lease_expiry(),
516            k.idx.worker_leases(k.worker_instance_id),
517            k.idx.lane_active(k.lane_id),
518            k.idx.lane_blocked_dependencies(k.lane_id),
519            k.idx.attempt_timeout(),
520        }
521        argv {
522            args.execution_id.to_string(),
523            args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
524            args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
525            args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
526            args.source.to_string(),
527        }
528    }
529}
530
531impl FromFcallResult for MoveToWaitingChildrenResultPartial {
532    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
533        let _r = FcallResult::parse(raw)?.into_success()?;
534        // ok()
535        Ok(Self::Moved {
536            public_state: PublicState::WaitingChildren,
537        })
538    }
539}
540
541// ─── ff_fail_execution ─────────────────────────────────────────────────
542//
543// Lua KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,
544//                terminal_zset, delayed_zset, lease_current, lease_history,
545//                active_index, stream_meta, attempt_timeout_zset,
546//                execution_deadline_zset
547// Lua ARGV (8): execution_id, lease_id, lease_epoch, attempt_id,
548//               failure_reason, failure_category, retry_policy_json, source
549//
550// RFC #58.5: `fence` is `Option<LeaseFence>`. See ff_complete_execution.
551
552ff_function! {
553    pub ff_fail_execution(args: FailExecutionArgs) -> FailExecutionResult {
554        keys(k: &ExecOpKeys<'_>) {
555            k.ctx.core(),
556            k.ctx.attempt_hash(args.attempt_index),
557            k.idx.lease_expiry(),
558            k.idx.worker_leases(k.worker_instance_id),
559            k.idx.lane_terminal(k.lane_id),
560            k.idx.lane_delayed(k.lane_id),
561            k.ctx.lease_current(),
562            k.ctx.lease_history(),
563            k.idx.lane_active(k.lane_id),
564            k.ctx.stream_meta(args.attempt_index),
565            k.idx.attempt_timeout(),
566            k.idx.execution_deadline(),
567        }
568        argv {
569            args.execution_id.to_string(),
570            args.fence.as_ref().map(|f| f.lease_id.to_string()).unwrap_or_default(),
571            args.fence.as_ref().map(|f| f.lease_epoch.to_string()).unwrap_or_default(),
572            args.fence.as_ref().map(|f| f.attempt_id.to_string()).unwrap_or_default(),
573            args.failure_reason.clone(),
574            args.failure_category.clone(),
575            args.retry_policy_json.clone(),
576            args.source.to_string(),
577        }
578    }
579}
580
581impl FromFcallResult for FailExecutionResult {
582    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
583        let r = FcallResult::parse(raw)?.into_success()?;
584        // ok("retry_scheduled", delay_until) or ok("terminal_failed")
585        let sub_status = r.field_str(0);
586        match sub_status.as_str() {
587            "retry_scheduled" => {
588                let delay_str = r.field_str(1);
589                let delay_ms: i64 = delay_str
590                    .parse()
591                    .map_err(|e| ScriptError::Parse {
592                        fcall: "ff_fail_execution".into(),
593                        execution_id: None,
594                        message: format!("bad delay_until: {e}"),
595                    })?;
596                Ok(FailExecutionResult::RetryScheduled {
597                    delay_until: TimestampMs::from_millis(delay_ms),
598                    next_attempt_index: AttemptIndex::new(0), // computed by claim_execution
599                })
600            }
601            "terminal_failed" => Ok(FailExecutionResult::TerminalFailed),
602            _ => Err(ScriptError::Parse {
603                fcall: "ff_fail_execution".into(),
604                execution_id: None,
605                message: format!(
606                "unexpected fail sub-status: {sub_status}"
607            ),
608            }),
609        }
610    }
611}
612
613// ─── ff_expire_execution ───────────────────────────────────────────────
614//
615// Lua KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,
616//                lease_history, lease_expiry_zset, worker_leases,
617//                active_index, terminal_zset, attempt_timeout_zset,
618//                execution_deadline_zset, suspended_zset,
619//                suspension_timeout_zset, suspension_current
620// Lua ARGV (2): execution_id, expire_reason
621
622ff_function! {
623    pub ff_expire_execution(args: ExpireExecutionArgs) -> ExpireExecutionResultPartial {
624        keys(k: &ExecOpKeys<'_>) {
625            k.ctx.core(),
626            k.ctx.attempt_hash(AttemptIndex::new(0)),   // placeholder
627            k.ctx.stream_meta(AttemptIndex::new(0)),     // placeholder
628            k.ctx.lease_current(),
629            k.ctx.lease_history(),
630            k.idx.lease_expiry(),
631            k.idx.worker_leases(k.worker_instance_id),
632            k.idx.lane_active(k.lane_id),
633            k.idx.lane_terminal(k.lane_id),
634            k.idx.attempt_timeout(),
635            k.idx.execution_deadline(),
636            k.idx.lane_suspended(k.lane_id),
637            k.idx.suspension_timeout(),
638            k.ctx.suspension_current(),
639        }
640        argv {
641            args.execution_id.to_string(),
642            args.expire_reason.clone(),
643        }
644    }
645}
646
647impl FromFcallResult for ExpireExecutionResultPartial {
648    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
649        let r = FcallResult::parse(raw)?.into_success()?;
650        // ok("expired", from_phase) or ok("already_terminal") or ok("not_found_cleaned")
651        let sub = r.field_str(0);
652        match sub.as_str() {
653            "already_terminal" | "not_found_cleaned" => Ok(Self::AlreadyTerminal),
654            "expired" => Ok(Self::Expired),
655            _ => Ok(Self::Expired),
656        }
657    }
658}
659
660// ─── ff_set_execution_tags (issue #58.4) ────────────────────────────────
661//
662// Variadic-ARGV FCALL: k1, v1, k2, v2, ... Hand-rolled instead of the
663// `ff_function!` macro because the macro assumes a fixed ARGV vector;
664// tags carry a caller-supplied map.
665//
666// KEYS (2): exec_core, tags_key
667// ARGV (>=2, even): k1, v1, k2, v2, ...
668
669/// Call `ff_set_execution_tags`: write caller-supplied tag fields to
670/// the execution's separate tags key. Returns the number of pairs
671/// applied.
672///
673/// Tag keys must match `^[a-z][a-z0-9_]*\.` (the reserved caller
674/// namespace, e.g. `cairn.task_id`); a violating key makes the FCALL
675/// return `invalid_tag_key` carrying the offending key. Callers should
676/// use [`validate_tag_key`] for client-side fast-fail before reaching
677/// the FCALL. Empty or odd-length input returns `invalid_input`.
678pub async fn ff_set_execution_tags(
679    conn: &ferriskey::Client,
680    ctx: &ExecKeyContext,
681    args: &SetExecutionTagsArgs,
682) -> Result<SetExecutionTagsResult, ScriptError> {
683    let keys = [ctx.core(), ctx.tags()];
684    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
685
686    // Flatten the sorted map into alternating key/value ARGV. BTreeMap
687    // iteration is sorted, so ARGV is deterministic for identical
688    // inputs.
689    let mut argv: Vec<String> = Vec::with_capacity(args.tags.len() * 2);
690    for (k, v) in &args.tags {
691        argv.push(k.clone());
692        argv.push(v.clone());
693    }
694    let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
695
696    let raw = conn
697        .fcall::<ferriskey::Value>("ff_set_execution_tags", &key_refs, &argv_refs)
698        .await
699        .map_err(ScriptError::Valkey)?;
700    SetExecutionTagsResult::from_fcall_result(&raw)
701}
702
703impl FromFcallResult for SetExecutionTagsResult {
704    fn from_fcall_result(raw: &ferriskey::Value) -> Result<Self, ScriptError> {
705        let r = FcallResult::parse(raw)?.into_success()?;
706        let count: u32 = r
707            .field_str(0)
708            .parse()
709            .map_err(|e| ScriptError::Parse {
710                fcall: "ff_set_execution_tags".into(),
711                execution_id: None,
712                message: format!("bad tag count: {e}"),
713            })?;
714        Ok(SetExecutionTagsResult::Ok { count })
715    }
716}
717
718/// Client-side fast-fail validator for tag keys. Matches the Lua
719/// pattern `^[a-z][a-z0-9_]*%.[^.]` — requires:
720///
721///   * non-empty;
722///   * first char is lowercase ASCII letter;
723///   * subsequent chars up to the first `.` are lowercase alnum or `_`;
724///   * the first `.` is followed by at least one non-dot character
725///     (so `cairn.` and `cairn..x` are rejected — the `<field>` part
726///     of `<caller>.<field>` must be non-empty).
727///
728/// Characters in the suffix after the mandatory non-dot char are not
729/// further constrained: `app.sub.field` is legal, matching the Lua
730/// pattern. Returns `Err(ScriptError::InvalidTagKey(key))` on
731/// rejection so callers match the same variant they'd see from the
732/// server-side path.
733pub fn validate_tag_key(key: &str) -> Result<(), ScriptError> {
734    let mut chars = key.chars();
735    let first = match chars.next() {
736        Some(c) => c,
737        None => return Err(ScriptError::InvalidTagKey(key.to_owned())),
738    };
739    if !first.is_ascii_lowercase() {
740        return Err(ScriptError::InvalidTagKey(key.to_owned()));
741    }
742    let mut saw_dot = false;
743    for c in chars.by_ref() {
744        if c == '.' {
745            saw_dot = true;
746            break;
747        }
748        if !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') {
749            return Err(ScriptError::InvalidTagKey(key.to_owned()));
750        }
751    }
752    if !saw_dot {
753        return Err(ScriptError::InvalidTagKey(key.to_owned()));
754    }
755    // Require at least one non-dot character after the first dot.
756    match chars.next() {
757        Some(c) if c != '.' => Ok(()),
758        _ => Err(ScriptError::InvalidTagKey(key.to_owned())),
759    }
760}
761
762// ─── Helpers ───────────────────────────────────────────────────────────
763
764fn parse_public_state(s: &str) -> Result<PublicState, ScriptError> {
765    match s {
766        "waiting" => Ok(PublicState::Waiting),
767        "delayed" => Ok(PublicState::Delayed),
768        "rate_limited" => Ok(PublicState::RateLimited),
769        "waiting_children" => Ok(PublicState::WaitingChildren),
770        "active" => Ok(PublicState::Active),
771        "suspended" => Ok(PublicState::Suspended),
772        "completed" => Ok(PublicState::Completed),
773        "failed" => Ok(PublicState::Failed),
774        "cancelled" => Ok(PublicState::Cancelled),
775        "expired" => Ok(PublicState::Expired),
776        "skipped" => Ok(PublicState::Skipped),
777        _ => Err(ScriptError::Parse {
778            fcall: "parse_public_state".into(),
779            execution_id: None,
780            message: format!("unknown public_state: {s}"),
781        }),
782    }
783}
784
785fn parse_attempt_type(s: &str) -> Result<AttemptType, ScriptError> {
786    match s {
787        "initial" => Ok(AttemptType::Initial),
788        "retry" => Ok(AttemptType::Retry),
789        "reclaim" => Ok(AttemptType::Reclaim),
790        "replay" => Ok(AttemptType::Replay),
791        "fallback" => Ok(AttemptType::Fallback),
792        _ => Err(ScriptError::Parse {
793            fcall: "parse_attempt_type".into(),
794            execution_id: None,
795            message: format!("unknown attempt_type: {s}"),
796        }),
797    }
798}
799
800// ─── Partial-type tests (RFC-011 §2.4 acceptance) ──────────────────────
801#[cfg(test)]
802mod partial_tests {
803    use super::*;
804    use ff_core::partition::PartitionConfig;
805
806    fn test_eid() -> ExecutionId {
807        ExecutionId::for_flow(&FlowId::new(), &PartitionConfig::default())
808    }
809
810    #[test]
811    fn claim_partial_complete_attaches_execution_id() {
812        let partial = ClaimExecutionResultPartial::Claimed(ClaimedExecutionPartial {
813            lease_id: LeaseId::new(),
814            lease_epoch: LeaseEpoch::new(1),
815            attempt_index: AttemptIndex::new(0),
816            attempt_id: AttemptId::new(),
817            attempt_type: AttemptType::Initial,
818            lease_expires_at: TimestampMs::from_millis(1000),
819        });
820        let eid = test_eid();
821        let full = partial.complete(eid.clone());
822        match full {
823            ClaimExecutionResult::Claimed(c) => assert_eq!(c.execution_id, eid),
824        }
825    }
826
827    #[test]
828    fn complete_partial_complete_attaches_execution_id() {
829        let partial = CompleteExecutionResultPartial::Completed {
830            public_state: PublicState::Completed,
831        };
832        let eid = test_eid();
833        let full = partial.complete(eid.clone());
834        match full {
835            CompleteExecutionResult::Completed { execution_id, .. } => {
836                assert_eq!(execution_id, eid)
837            }
838        }
839    }
840
841    #[test]
842    fn cancel_partial_complete_attaches_execution_id() {
843        let partial = CancelExecutionResultPartial::Cancelled {
844            public_state: PublicState::Cancelled,
845        };
846        let eid = test_eid();
847        let full = partial.complete(eid.clone());
848        match full {
849            CancelExecutionResult::Cancelled { execution_id, .. } => assert_eq!(execution_id, eid),
850        }
851    }
852
853    #[test]
854    fn delay_partial_complete_attaches_execution_id() {
855        let partial = DelayExecutionResultPartial::Delayed {
856            public_state: PublicState::Delayed,
857        };
858        let eid = test_eid();
859        let full = partial.complete(eid.clone());
860        match full {
861            DelayExecutionResult::Delayed { execution_id, .. } => assert_eq!(execution_id, eid),
862        }
863    }
864
865    #[test]
866    fn move_to_waiting_children_partial_complete_attaches_execution_id() {
867        let partial = MoveToWaitingChildrenResultPartial::Moved {
868            public_state: PublicState::WaitingChildren,
869        };
870        let eid = test_eid();
871        let full = partial.complete(eid.clone());
872        match full {
873            MoveToWaitingChildrenResult::Moved { execution_id, .. } => {
874                assert_eq!(execution_id, eid)
875            }
876        }
877    }
878
879    #[test]
880    fn expire_partial_expired_variant_attaches_execution_id() {
881        let partial = ExpireExecutionResultPartial::Expired;
882        let eid = test_eid();
883        let full = partial.complete(eid.clone());
884        match full {
885            ExpireExecutionResult::Expired { execution_id } => assert_eq!(execution_id, eid),
886            _ => panic!("expected Expired variant"),
887        }
888    }
889
890    #[test]
891    fn expire_partial_already_terminal_variant_ignores_execution_id() {
892        // Multi-variant exhaustiveness test: AlreadyTerminal has no
893        // execution_id field, so complete() passes it through without
894        // attaching. Verifies the variant-mirror pattern.
895        let partial = ExpireExecutionResultPartial::AlreadyTerminal;
896        let eid = test_eid();
897        let full = partial.complete(eid);
898        assert!(matches!(full, ExpireExecutionResult::AlreadyTerminal));
899    }
900}
901
902// ─── Tag-key validation tests (issue #58.4) ────────────────────────────
903#[cfg(test)]
904mod tag_key_tests {
905    use super::*;
906
907    #[test]
908    fn validate_accepts_caller_namespaced_key() {
909        assert!(validate_tag_key("cairn.task_id").is_ok());
910        assert!(validate_tag_key("my_app.run_123").is_ok());
911        assert!(validate_tag_key("app2.x.y").is_ok());
912    }
913
914    #[test]
915    fn validate_rejects_key_without_dot() {
916        let err = validate_tag_key("no_dot_here").unwrap_err();
917        assert!(matches!(err, ScriptError::InvalidTagKey(k) if k == "no_dot_here"));
918    }
919
920    #[test]
921    fn validate_rejects_empty_key() {
922        let err = validate_tag_key("").unwrap_err();
923        assert!(matches!(err, ScriptError::InvalidTagKey(k) if k.is_empty()));
924    }
925
926    #[test]
927    fn validate_rejects_uppercase_first_char() {
928        let err = validate_tag_key("Cairn.task_id").unwrap_err();
929        assert!(matches!(err, ScriptError::InvalidTagKey(_)));
930    }
931
932    #[test]
933    fn validate_rejects_leading_digit() {
934        let err = validate_tag_key("1cairn.task_id").unwrap_err();
935        assert!(matches!(err, ScriptError::InvalidTagKey(_)));
936    }
937
938    #[test]
939    fn validate_rejects_dash_in_prefix() {
940        let err = validate_tag_key("my-app.run").unwrap_err();
941        assert!(matches!(err, ScriptError::InvalidTagKey(_)));
942    }
943
944    #[test]
945    fn validate_accepts_single_char_prefix() {
946        assert!(validate_tag_key("a.x").is_ok());
947    }
948
949    #[test]
950    fn validate_rejects_trailing_dot() {
951        // `cairn.` has the prefix + dot but no field segment — the
952        // `<field>` part of `<caller>.<field>` is required.
953        let err = validate_tag_key("cairn.").unwrap_err();
954        assert!(matches!(err, ScriptError::InvalidTagKey(_)));
955    }
956
957    #[test]
958    fn validate_rejects_double_dot_after_prefix() {
959        // `cairn..x` has an empty field segment immediately after the
960        // namespace dot.
961        let err = validate_tag_key("cairn..x").unwrap_err();
962        assert!(matches!(err, ScriptError::InvalidTagKey(_)));
963    }
964
965    #[test]
966    fn validate_accepts_dots_in_suffix() {
967        // After the mandatory non-dot char post-namespace, further
968        // dots are fine — `app.sub.field` is a legal nested tag key.
969        assert!(validate_tag_key("app.sub.field").is_ok());
970    }
971
972    #[test]
973    fn from_code_invalid_tag_key_roundtrip() {
974        let err = ScriptError::from_code_with_detail("invalid_tag_key", "BadKey").unwrap();
975        match err {
976            ScriptError::InvalidTagKey(k) => assert_eq!(k, "BadKey"),
977            other => panic!("expected InvalidTagKey, got {other:?}"),
978        }
979    }
980}