Skip to main content

ff_sdk/
task.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::contracts::ReportUsageResult;
8use ff_script::error::ScriptError;
9use ff_core::keys::{usage_dedup_key, BudgetKeyContext, ExecKeyContext, IndexKeys};
10use ff_core::partition::{budget_partition, execution_partition, PartitionConfig};
11use ff_core::types::*;
12use tokio::sync::{Notify, OwnedSemaphorePermit};
13use tokio::task::JoinHandle;
14
15use crate::SdkError;
16
17// ── Phase 3: Suspend/Signal types ──
18
19/// Timeout behavior when a suspension deadline is reached.
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub enum TimeoutBehavior {
22    Fail,
23    Cancel,
24    Expire,
25    AutoResume,
26    Escalate,
27}
28
29impl TimeoutBehavior {
30    pub fn as_str(&self) -> &str {
31        match self {
32            Self::Fail => "fail",
33            Self::Cancel => "cancel",
34            Self::Expire => "expire",
35            Self::AutoResume => "auto_resume_with_timeout_signal",
36            Self::Escalate => "escalate",
37        }
38    }
39}
40
41impl std::fmt::Display for TimeoutBehavior {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        f.write_str(self.as_str())
44    }
45}
46
47impl std::str::FromStr for TimeoutBehavior {
48    type Err = String;
49
50    fn from_str(s: &str) -> Result<Self, Self::Err> {
51        match s {
52            "fail" => Ok(Self::Fail),
53            "cancel" => Ok(Self::Cancel),
54            "expire" => Ok(Self::Expire),
55            "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
56            "escalate" => Ok(Self::Escalate),
57            other => Err(format!("unknown timeout behavior: {other}")),
58        }
59    }
60}
61
62/// A condition matcher for the resume condition.
63#[derive(Clone, Debug)]
64pub struct ConditionMatcher {
65    /// Signal name to match (empty = wildcard).
66    pub signal_name: String,
67}
68
69/// Outcome of a `suspend()` call.
70#[derive(Clone, Debug, PartialEq, Eq)]
71pub enum SuspendOutcome {
72    /// Execution is now suspended, waitpoint active.
73    Suspended {
74        suspension_id: SuspensionId,
75        waitpoint_id: WaitpointId,
76        waitpoint_key: String,
77        /// HMAC token that signal deliverers must supply to this waitpoint.
78        waitpoint_token: WaitpointToken,
79    },
80    /// Buffered signals already satisfied the condition — suspension skipped.
81    /// The caller still holds the lease.
82    AlreadySatisfied {
83        suspension_id: SuspensionId,
84        waitpoint_id: WaitpointId,
85        waitpoint_key: String,
86        waitpoint_token: WaitpointToken,
87    },
88}
89
90/// A signal to deliver to a suspended execution's waitpoint.
91#[derive(Clone, Debug)]
92pub struct Signal {
93    pub signal_name: String,
94    pub signal_category: String,
95    pub payload: Option<Vec<u8>>,
96    pub source_type: String,
97    pub source_identity: String,
98    pub idempotency_key: Option<String>,
99    /// HMAC token issued when the waitpoint was created. Required for
100    /// authenticated signal delivery (RFC-004 §Waitpoint Security).
101    pub waitpoint_token: WaitpointToken,
102}
103
104/// Outcome of `deliver_signal()`.
105#[derive(Clone, Debug, PartialEq, Eq)]
106pub enum SignalOutcome {
107    /// Signal accepted, appended to waitpoint but condition not yet satisfied.
108    Accepted { signal_id: SignalId, effect: String },
109    /// Signal triggered resume — execution is now runnable.
110    TriggeredResume { signal_id: SignalId },
111    /// Duplicate signal (idempotency key matched).
112    Duplicate { existing_signal_id: String },
113}
114
115impl SignalOutcome {
116    /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
117    ///
118    /// Consuming packages that call `ff_deliver_signal` directly can use this
119    /// to interpret the Lua return value without depending on SDK internals.
120    pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
121        parse_signal_result(raw)
122    }
123}
124
125/// A signal that triggered a resume, readable by a worker after re-claim.
126///
127/// Returned by [`ClaimedTask::resume_signals`] when a suspended execution
128/// is resumed because one or more matched signals satisfied its waitpoint's
129/// resume condition. The worker can then inspect `signal_name` to branch
130/// behavior (approve / reject / etc.) and use `payload` for richer decision
131/// data instead of inferring intent from stream frames.
132///
133/// Returned only for signals whose matcher slot in the waitpoint's resume
134/// condition is marked satisfied. Pre-buffered-but-unmatched signals are
135/// not included.
136#[derive(Clone, Debug, PartialEq, Eq)]
137pub struct ResumeSignal {
138    pub signal_id: SignalId,
139    pub signal_name: String,
140    pub signal_category: String,
141    pub source_type: String,
142    pub source_identity: String,
143    pub correlation_id: String,
144    /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
145    /// accepted this signal. `0` if the stored `accepted_at` field is
146    /// missing or non-numeric (a Lua-side defect — not expected at
147    /// runtime).
148    pub accepted_at: TimestampMs,
149    /// Raw payload bytes, if the signal was delivered with one. `None`
150    /// for signals delivered without a payload. Note: the SDK's current
151    /// signal-delivery path (`FlowFabricWorker::deliver_signal`) writes
152    /// payloads as UTF-8 (lossy) with `payload_encoding="json"`; callers
153    /// that invoke `ff_deliver_signal` directly via FCALL with non-UTF-8
154    /// bytes will receive those bytes verbatim here.
155    pub payload: Option<Vec<u8>>,
156}
157
158/// Outcome of `append_frame()`.
159#[derive(Clone, Debug)]
160pub struct AppendFrameOutcome {
161    /// Valkey Stream entry ID assigned to this frame.
162    pub stream_id: String,
163    /// Total frame count in the stream after this append.
164    pub frame_count: u64,
165}
166
167/// Outcome of a `fail()` call.
168#[derive(Clone, Debug, PartialEq, Eq)]
169pub enum FailOutcome {
170    /// Retry was scheduled — execution is in delayed backoff.
171    RetryScheduled {
172        delay_until: TimestampMs,
173    },
174    /// No retries left — execution is terminal failed.
175    TerminalFailed,
176}
177
178/// A claimed execution with an active lease. The worker processes this task
179/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
180///
181/// The lease is automatically renewed in the background at `lease_ttl / 3`
182/// intervals. Renewal stops when the task is consumed or dropped.
183///
184/// `complete`, `fail`, and `cancel` consume `self` — this prevents
185/// double-complete bugs at the type level.
186pub struct ClaimedTask {
187    /// Shared Valkey client.
188    client: Client,
189    /// Partition config for key construction.
190    partition_config: PartitionConfig,
191    /// Execution identity.
192    execution_id: ExecutionId,
193    /// Current attempt.
194    attempt_index: AttemptIndex,
195    attempt_id: AttemptId,
196    /// Lease identity.
197    lease_id: LeaseId,
198    lease_epoch: LeaseEpoch,
199    /// Lease timing.
200    lease_ttl_ms: u64,
201    /// Lane used at claim time.
202    lane_id: LaneId,
203    /// Worker instance that holds this lease (for index cleanup keys).
204    worker_instance_id: WorkerInstanceId,
205    /// Execution data.
206    input_payload: Vec<u8>,
207    execution_kind: String,
208    tags: HashMap<String, String>,
209    /// Background renewal task handle.
210    renewal_handle: JoinHandle<()>,
211    /// Signal to stop renewal (used before consuming self).
212    renewal_stop: Arc<Notify>,
213    /// Consecutive lease renewal failures. Shared with the background renewal
214    /// task. Reset to 0 on each successful renewal. Workers should check
215    /// `is_lease_healthy()` before committing expensive side effects.
216    renewal_failures: Arc<AtomicU32>,
217    /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
218    /// response is received, just before `self` is consumed into `Drop`.
219    /// `Drop` reads this instead of `renewal_handle.is_finished()` to
220    /// suppress the false-positive "dropped without terminal operation"
221    /// warning: after `notify_one`, the renewal task has not yet been
222    /// polled by the runtime, so `is_finished()` is still `false` on the
223    /// happy path when self is being consumed.
224    ///
225    /// Note: the flag is set for any terminal-op path that reaches
226    /// `stop_renewal()`, which includes Lua-level script errors (the
227    /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
228    /// the caller already receives the `Err` via the op's return value,
229    /// so an additional `Drop` warning would be noise. The warning is
230    /// reserved for genuine drop-without-terminal-op cases (panic, early
231    /// return, transport failure before stop_renewal ran).
232    terminal_op_called: AtomicBool,
233    /// Concurrency permit from the worker's semaphore. Held for the lifetime
234    /// of the task; released on complete/fail/cancel/drop.
235    _concurrency_permit: Option<OwnedSemaphorePermit>,
236}
237
238impl ClaimedTask {
239    /// Construct a `ClaimedTask` from the results of a successful
240    /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
241    ///
242    /// # Arguments
243    ///
244    /// * `client` — shared Valkey client used for subsequent lease
245    ///   renewals, signal delivery, and the final
246    ///   complete/fail/cancel FCALL.
247    /// * `partition_config` — partition topology snapshot read at
248    ///   `FlowFabricWorker::connect`. Used for key construction on
249    ///   the lifetime of this task.
250    /// * `execution_id` — the claimed execution's UUID.
251    /// * `attempt_index` / `attempt_id` — current attempt identity.
252    ///   `attempt_index` is 0 on a fresh claim, preserved on a
253    ///   resumed claim.
254    /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
255    ///   is returned by the Lua FCALL and bumped on each resumed
256    ///   claim.
257    /// * `lease_ttl_ms` — lease TTL used to schedule the background
258    ///   renewal task (renews at `lease_ttl_ms / 3`).
259    /// * `lane_id` — the lane the task was claimed on. Used for
260    ///   index key construction (`lane_active`, `lease_expiry`,
261    ///   etc.).
262    /// * `worker_instance_id` — this worker's identity. Used to
263    ///   resolve `worker_leases` index entries during renewal and
264    ///   completion.
265    /// * `input_payload` / `execution_kind` / `tags` — pre-read
266    ///   execution metadata, exposed via getters so the worker
267    ///   doesn't round-trip to Valkey to inspect what it just
268    ///   claimed.
269    ///
270    /// # Invariant: constructor is `pub(crate)` on purpose
271    ///
272    /// **External callers cannot construct a `ClaimedTask`** — only
273    /// the in-crate claim entry points (`claim_next`,
274    /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
275    /// load-bearing: those entry points are the ONLY sites that
276    /// acquire a permit from the worker's concurrency semaphore
277    /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
278    /// it through `ClaimedTask::set_concurrency_permit` before
279    /// returning the task. Promoting `new` to `pub` would let
280    /// consumers build tasks that bypass the concurrency contract
281    /// the worker's `max_concurrent_tasks` config advertises — the
282    /// returned task would run alongside other in-flight work
283    /// without debiting the permit bank, and the
284    /// complete/fail/cancel/drop path would have nothing to
285    /// release.
286    ///
287    /// If an external callsite genuinely needs to rehydrate a
288    /// task from saved FCALL results, the right answer is a new
289    /// `FlowFabricWorker` entry point that wraps `new` + acquires a
290    /// permit — not promoting this constructor.
291    #[allow(clippy::too_many_arguments)]
292    pub(crate) fn new(
293        client: Client,
294        partition_config: PartitionConfig,
295        execution_id: ExecutionId,
296        attempt_index: AttemptIndex,
297        attempt_id: AttemptId,
298        lease_id: LeaseId,
299        lease_epoch: LeaseEpoch,
300        lease_ttl_ms: u64,
301        lane_id: LaneId,
302        worker_instance_id: WorkerInstanceId,
303        input_payload: Vec<u8>,
304        execution_kind: String,
305        tags: HashMap<String, String>,
306    ) -> Self {
307        let renewal_stop = Arc::new(Notify::new());
308        let renewal_failures = Arc::new(AtomicU32::new(0));
309
310        let renewal_handle = spawn_renewal_task(
311            client.clone(),
312            partition_config,
313            execution_id.clone(),
314            attempt_index,
315            attempt_id.clone(),
316            lease_id.clone(),
317            lease_epoch,
318            lease_ttl_ms,
319            renewal_stop.clone(),
320            renewal_failures.clone(),
321        );
322
323        Self {
324            client,
325            partition_config,
326            execution_id,
327            attempt_index,
328            attempt_id,
329            lease_id,
330            lease_epoch,
331            lease_ttl_ms,
332            lane_id,
333            worker_instance_id,
334            input_payload,
335            execution_kind,
336            tags,
337            renewal_handle,
338            renewal_stop,
339            renewal_failures,
340            terminal_op_called: AtomicBool::new(false),
341            _concurrency_permit: None,
342        }
343    }
344
345    /// Attach a concurrency permit from the worker's semaphore.
346    /// The permit is held for the task's lifetime and released on drop.
347    #[allow(dead_code)]
348    pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
349        self._concurrency_permit = Some(permit);
350    }
351
352    // ── Accessors ──
353
354    pub fn execution_id(&self) -> &ExecutionId {
355        &self.execution_id
356    }
357
358    pub fn attempt_index(&self) -> AttemptIndex {
359        self.attempt_index
360    }
361
362    pub fn attempt_id(&self) -> &AttemptId {
363        &self.attempt_id
364    }
365
366    pub fn lease_id(&self) -> &LeaseId {
367        &self.lease_id
368    }
369
370    pub fn lease_epoch(&self) -> LeaseEpoch {
371        self.lease_epoch
372    }
373
374    pub fn input_payload(&self) -> &[u8] {
375        &self.input_payload
376    }
377
378    pub fn execution_kind(&self) -> &str {
379        &self.execution_kind
380    }
381
382    pub fn tags(&self) -> &HashMap<String, String> {
383        &self.tags
384    }
385
386    pub fn lane_id(&self) -> &LaneId {
387        &self.lane_id
388    }
389
390    /// Check if the lease is likely still valid based on renewal success.
391    ///
392    /// Returns `false` if 3 or more consecutive renewal attempts have failed.
393    /// Workers should check this before committing expensive or irreversible
394    /// side effects. A `false` return means Valkey may have already expired
395    /// the lease and another worker could be processing this execution.
396    pub fn is_lease_healthy(&self) -> bool {
397        self.renewal_failures.load(Ordering::Relaxed) < 3
398    }
399
400    /// Number of consecutive lease renewal failures since the last success.
401    ///
402    /// Returns 0 when renewals are working normally. Useful for observability
403    /// and custom health policies beyond the default threshold of 3.
404    pub fn consecutive_renewal_failures(&self) -> u32 {
405        self.renewal_failures.load(Ordering::Relaxed)
406    }
407
408    // ── Terminal operations (consume self) ──
409
410    /// Delay the execution until `delay_until`.
411    ///
412    /// Releases the lease. The execution moves to `delayed` state.
413    /// Consumes self — the task cannot be used after delay.
414    pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
415        let partition = execution_partition(&self.execution_id, &self.partition_config);
416        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
417        let idx = IndexKeys::new(&partition);
418
419        // KEYS (9): exec_core, attempt_hash, lease_current, lease_history,
420        //           lease_expiry_zset, worker_leases, active_index,
421        //           delayed_zset, attempt_timeout_zset
422        let keys: Vec<String> = vec![
423            ctx.core(),
424            ctx.attempt_hash(self.attempt_index),
425            ctx.lease_current(),
426            ctx.lease_history(),
427            idx.lease_expiry(),
428            idx.worker_leases(&self.worker_instance_id),
429            idx.lane_active(&self.lane_id),
430            idx.lane_delayed(&self.lane_id),
431            idx.attempt_timeout(),
432        ];
433
434        // ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, delay_until
435        let args: Vec<String> = vec![
436            self.execution_id.to_string(),
437            self.lease_id.to_string(),
438            self.lease_epoch.to_string(),
439            self.attempt_id.to_string(),
440            delay_until.to_string(),
441        ];
442
443        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
444        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
445
446        let raw: Value = self
447            .client
448            .fcall("ff_delay_execution", &key_refs, &arg_refs)
449            .await
450            .map_err(SdkError::Valkey)?;
451
452        self.stop_renewal();
453        parse_success_result(&raw, "ff_delay_execution")
454    }
455
456    /// Move execution to waiting_children state.
457    ///
458    /// Releases the lease. The execution waits for child dependencies to complete.
459    /// Consumes self.
460    pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
461        let partition = execution_partition(&self.execution_id, &self.partition_config);
462        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
463        let idx = IndexKeys::new(&partition);
464
465        // KEYS (9): exec_core, attempt_hash, lease_current, lease_history,
466        //           lease_expiry_zset, worker_leases, active_index,
467        //           blocked_deps_zset, attempt_timeout_zset
468        let keys: Vec<String> = vec![
469            ctx.core(),
470            ctx.attempt_hash(self.attempt_index),
471            ctx.lease_current(),
472            ctx.lease_history(),
473            idx.lease_expiry(),
474            idx.worker_leases(&self.worker_instance_id),
475            idx.lane_active(&self.lane_id),
476            idx.lane_blocked_dependencies(&self.lane_id),
477            idx.attempt_timeout(),
478        ];
479
480        // ARGV (4): execution_id, lease_id, lease_epoch, attempt_id
481        let args: Vec<String> = vec![
482            self.execution_id.to_string(),
483            self.lease_id.to_string(),
484            self.lease_epoch.to_string(),
485            self.attempt_id.to_string(),
486        ];
487
488        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
489        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
490
491        let raw: Value = self
492            .client
493            .fcall("ff_move_to_waiting_children", &key_refs, &arg_refs)
494            .await
495            .map_err(SdkError::Valkey)?;
496
497        self.stop_renewal();
498        parse_success_result(&raw, "ff_move_to_waiting_children")
499    }
500
501    /// Complete the execution successfully.
502    ///
503    /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
504    /// Renewal continues during the FCALL to prevent lease expiry under
505    /// network latency — the Lua fences on lease_id+epoch atomically.
506    /// Consumes self — the task cannot be used after completion.
507    ///
508    /// # Connection errors
509    ///
510    /// If the Valkey connection drops during this call, the returned error
511    /// does **not** guarantee the operation failed — the Lua function may
512    /// have committed before the response was lost. Callers should treat
513    /// connection errors on `complete()` as "possibly succeeded" and verify
514    /// the execution state via `get_execution_state()` before retrying.
515    pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
516        let partition = execution_partition(&self.execution_id, &self.partition_config);
517        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
518        let idx = IndexKeys::new(&partition);
519
520        // KEYS (12): must match lua/execution.lua ff_complete_execution positional order
521        // exec_core, attempt_hash, lease_expiry_zset, worker_leases,
522        // terminal_zset, lease_current, lease_history, active_index,
523        // stream_meta, result_key, attempt_timeout_zset, execution_deadline_zset
524        let keys: Vec<String> = vec![
525            ctx.core(),                                        // 1  exec_core
526            ctx.attempt_hash(self.attempt_index),              // 2  attempt_hash
527            idx.lease_expiry(),                                // 3  lease_expiry_zset
528            idx.worker_leases(&self.worker_instance_id),       // 4  worker_leases
529            idx.lane_terminal(&self.lane_id),                  // 5  terminal_zset
530            ctx.lease_current(),                               // 6  lease_current
531            ctx.lease_history(),                               // 7  lease_history
532            idx.lane_active(&self.lane_id),                    // 8  active_index
533            ctx.stream_meta(self.attempt_index),               // 9  stream_meta
534            ctx.result(),                                      // 10 result_key
535            idx.attempt_timeout(),                             // 11 attempt_timeout_zset
536            idx.execution_deadline(),                          // 12 execution_deadline_zset
537        ];
538
539        let result_bytes = result_payload.unwrap_or_default();
540        let result_str = String::from_utf8_lossy(&result_bytes);
541
542        // ARGV (5): must match lua/execution.lua ff_complete_execution positional order
543        // execution_id, lease_id, lease_epoch, attempt_id, result_payload
544        let args: Vec<String> = vec![
545            self.execution_id.to_string(),
546            self.lease_id.to_string(),
547            self.lease_epoch.to_string(),
548            self.attempt_id.to_string(),
549            result_str.into_owned(),
550        ];
551
552        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
553        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
554
555        let raw: Value = self
556            .client
557            .fcall("ff_complete_execution", &key_refs, &arg_refs)
558            .await
559            .map_err(SdkError::Valkey)?;
560
561        self.stop_renewal();
562        parse_success_result(&raw, "ff_complete_execution")
563    }
564
565    /// Fail the execution with a reason and error category.
566    ///
567    /// If the execution policy allows retries, the engine schedules a retry
568    /// (delayed backoff). Otherwise, the execution becomes terminal failed.
569    /// Returns [`FailOutcome`] so the caller knows what happened.
570    ///
571    /// # Connection errors
572    ///
573    /// If the Valkey connection drops during this call, the returned error
574    /// does **not** guarantee the operation failed — the Lua function may
575    /// have committed before the response was lost. Callers should treat
576    /// connection errors on `fail()` as "possibly succeeded" and verify
577    /// the execution state via `get_execution_state()` before retrying.
578    pub async fn fail(
579        self,
580        reason: &str,
581        error_category: &str,
582    ) -> Result<FailOutcome, SdkError> {
583        let partition = execution_partition(&self.execution_id, &self.partition_config);
584        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
585        let idx = IndexKeys::new(&partition);
586
587        // KEYS (12): exec_core, attempt_hash, lease_expiry, worker_leases,
588        //            terminal_zset, delayed_zset, lease_current, lease_history,
589        //            active_index, stream_meta, attempt_timeout, execution_deadline
590        let keys: Vec<String> = vec![
591            ctx.core(),
592            ctx.attempt_hash(self.attempt_index),
593            idx.lease_expiry(),
594            idx.worker_leases(&self.worker_instance_id),
595            idx.lane_terminal(&self.lane_id),
596            idx.lane_delayed(&self.lane_id),
597            ctx.lease_current(),
598            ctx.lease_history(),
599            idx.lane_active(&self.lane_id),
600            ctx.stream_meta(self.attempt_index),
601            idx.attempt_timeout(),
602            idx.execution_deadline(),
603        ];
604
605        // ARGV (7): eid, lease_id, lease_epoch, attempt_id,
606        //           failure_reason, failure_category, retry_policy_json
607        let retry_policy_json = self.read_retry_policy_json(&ctx).await?;
608
609        let args: Vec<String> = vec![
610            self.execution_id.to_string(),
611            self.lease_id.to_string(),
612            self.lease_epoch.to_string(),
613            self.attempt_id.to_string(),
614            reason.to_owned(),
615            error_category.to_owned(),
616            retry_policy_json,
617        ];
618
619        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
620        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
621
622        let raw: Value = self
623            .client
624            .fcall("ff_fail_execution", &key_refs, &arg_refs)
625            .await
626            .map_err(SdkError::Valkey)?;
627
628        self.stop_renewal();
629        parse_fail_result(&raw)
630    }
631
632    /// Cancel the execution.
633    ///
634    /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
635    /// Consumes self.
636    ///
637    /// # Connection errors
638    ///
639    /// If the Valkey connection drops during this call, the returned error
640    /// does **not** guarantee the operation failed — the Lua function may
641    /// have committed before the response was lost. Callers should treat
642    /// connection errors on `cancel()` as "possibly succeeded" and verify
643    /// the execution state via `get_execution_state()` before retrying.
644    pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
645        self.cancel_inner(reason).await
646    }
647
648    /// Internal cancel implementation (shared by cancel and fail-fallback).
649    async fn cancel_inner(self, reason: &str) -> Result<(), SdkError> {
650        let partition = execution_partition(&self.execution_id, &self.partition_config);
651        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
652        let idx = IndexKeys::new(&partition);
653
654        // ff_cancel_execution needs 21 KEYS. The Lua constructs active_index
655        // and suspended_key dynamically (C2 fix). KEYS[9]/[10] (waitpoint_hash,
656        // wp_condition) need the real waitpoint_id — read from exec_core.
657        // If no current_waitpoint_id (not suspended), use a placeholder.
658        let wp_id_str: Option<String> = self
659            .client
660            .hget(&ctx.core(), "current_waitpoint_id")
661            .await
662            .map_err(|e| SdkError::ValkeyContext { source: e, context: "read current_waitpoint_id".into() })?;
663        let wp_id = match wp_id_str.as_deref().filter(|s| !s.is_empty()) {
664            Some(s) => match WaitpointId::parse(s) {
665                Ok(id) => id,
666                Err(e) => {
667                    tracing::warn!(
668                        execution_id = %self.execution_id,
669                        raw = %s,
670                        error = %e,
671                        "corrupt waitpoint_id in exec_core, using placeholder"
672                    );
673                    WaitpointId::new()
674                }
675            },
676            None => WaitpointId::default(),
677        };
678        let keys: Vec<String> = vec![
679            ctx.core(),                                        // 1
680            ctx.attempt_hash(self.attempt_index),              // 2
681            ctx.stream_meta(self.attempt_index),               // 3
682            ctx.lease_current(),                               // 4
683            ctx.lease_history(),                               // 5
684            idx.lease_expiry(),                                // 6
685            idx.worker_leases(&self.worker_instance_id),       // 7
686            ctx.suspension_current(),                          // 8
687            ctx.waitpoint(&wp_id),                             // 9
688            ctx.waitpoint_condition(&wp_id),                   // 10
689            idx.suspension_timeout(),                          // 11
690            idx.lane_terminal(&self.lane_id),                  // 12
691            idx.attempt_timeout(),                             // 13
692            idx.execution_deadline(),                          // 14
693            idx.lane_eligible(&self.lane_id),                  // 15
694            idx.lane_delayed(&self.lane_id),                   // 16
695            idx.lane_blocked_dependencies(&self.lane_id),      // 17
696            idx.lane_blocked_budget(&self.lane_id),            // 18
697            idx.lane_blocked_quota(&self.lane_id),             // 19
698            idx.lane_blocked_route(&self.lane_id),             // 20
699            idx.lane_blocked_operator(&self.lane_id),          // 21
700        ];
701
702        // ARGV (5): must match lua/execution.lua ff_cancel_execution positional order
703        // execution_id, reason, source, lease_id, lease_epoch
704        let args: Vec<String> = vec![
705            self.execution_id.to_string(),     // 1  execution_id
706            reason.to_owned(),                 // 2  reason
707            "worker".to_owned(),               // 3  source
708            self.lease_id.to_string(),         // 4  lease_id
709            self.lease_epoch.to_string(),      // 5  lease_epoch
710        ];
711
712        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
713        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
714
715        let raw: Value = self
716            .client
717            .fcall("ff_cancel_execution", &key_refs, &arg_refs)
718            .await
719            .map_err(SdkError::Valkey)?;
720
721        self.stop_renewal();
722        parse_success_result(&raw, "ff_cancel_execution")
723    }
724
725    // ── Non-terminal operations ──
726
727    /// Manually renew the lease. Also called by the background renewal task.
728    pub async fn renew_lease(&self) -> Result<(), SdkError> {
729        renew_lease_inner(
730            &self.client,
731            &self.partition_config,
732            &self.execution_id,
733            self.attempt_index,
734            &self.attempt_id,
735            &self.lease_id,
736            self.lease_epoch,
737            self.lease_ttl_ms,
738        )
739        .await
740    }
741
742    /// Update progress (pct 0-100 and optional message).
743    pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
744        let partition = execution_partition(&self.execution_id, &self.partition_config);
745        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
746
747        // KEYS (1): exec_core
748        let keys: Vec<String> = vec![ctx.core()];
749
750        // ARGV (5): execution_id, lease_id, lease_epoch,
751        //           progress_pct, progress_message
752        let args: Vec<String> = vec![
753            self.execution_id.to_string(),
754            self.lease_id.to_string(),
755            self.lease_epoch.to_string(),
756            pct.to_string(),
757            message.to_owned(),
758        ];
759
760        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
761        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
762
763        let raw: Value = self
764            .client
765            .fcall("ff_update_progress", &key_refs, &arg_refs)
766            .await
767            .map_err(SdkError::Valkey)?;
768
769        parse_success_result(&raw, "ff_update_progress")
770    }
771
772    /// Report usage against a budget and check limits.
773    ///
774    /// Non-consuming — the worker can report usage multiple times.
775    /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
776    /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
777    pub async fn report_usage(
778        &self,
779        budget_id: &BudgetId,
780        dimensions: &[(&str, u64)],
781        dedup_key: Option<&str>,
782    ) -> Result<ReportUsageResult, SdkError> {
783        let partition = budget_partition(budget_id, &self.partition_config);
784        let bctx = BudgetKeyContext::new(&partition, budget_id);
785
786        // KEYS (3): budget_usage, budget_limits, budget_def
787        let keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
788
789        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
790        let now = TimestampMs::now();
791        let dim_count = dimensions.len();
792        let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
793        argv.push(dim_count.to_string());
794        for (dim, _) in dimensions {
795            argv.push((*dim).to_string());
796        }
797        for (_, delta) in dimensions {
798            argv.push(delta.to_string());
799        }
800        argv.push(now.to_string());
801        let dedup_key_val = dedup_key
802            .filter(|k| !k.is_empty())
803            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
804            .unwrap_or_default();
805        argv.push(dedup_key_val);
806
807        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
808        let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
809
810        let raw: Value = self
811            .client
812            .fcall("ff_report_usage_and_check", &key_refs, &argv_refs)
813            .await
814            .map_err(SdkError::Valkey)?;
815
816        parse_report_usage_result(&raw)
817    }
818
819    /// Create a pending waitpoint for future signal delivery.
820    ///
821    /// Non-consuming — the worker keeps the lease. Signals delivered to the
822    /// waitpoint are buffered. When the worker later calls `suspend()` with
823    /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
824    /// resume condition.
825    ///
826    /// Returns both the waitpoint_id AND the HMAC token required by external
827    /// callers to buffer signals against this pending waitpoint
828    /// (RFC-004 §Waitpoint Security).
829    pub async fn create_pending_waitpoint(
830        &self,
831        waitpoint_key: &str,
832        expires_in_ms: u64,
833    ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
834        let partition = execution_partition(&self.execution_id, &self.partition_config);
835        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
836        let idx = IndexKeys::new(&partition);
837
838        let waitpoint_id = WaitpointId::new();
839        let expires_at = TimestampMs::from_millis(TimestampMs::now().0 + expires_in_ms as i64);
840
841        // KEYS (4): exec_core, waitpoint_hash, pending_wp_expiry_zset, hmac_secrets
842        let keys: Vec<String> = vec![
843            ctx.core(),
844            ctx.waitpoint(&waitpoint_id),
845            idx.pending_waitpoint_expiry(),
846            idx.waitpoint_hmac_secrets(),
847        ];
848
849        // ARGV (5): execution_id, attempt_index, waitpoint_id, waitpoint_key, expires_at
850        let args: Vec<String> = vec![
851            self.execution_id.to_string(),
852            self.attempt_index.to_string(),
853            waitpoint_id.to_string(),
854            waitpoint_key.to_owned(),
855            expires_at.to_string(),
856        ];
857
858        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
859        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
860
861        let raw: Value = self
862            .client
863            .fcall("ff_create_pending_waitpoint", &key_refs, &arg_refs)
864            .await
865            .map_err(SdkError::Valkey)?;
866
867        // Response: {1, "OK", waitpoint_id, waitpoint_key, waitpoint_token}.
868        // Extract the token (the waitpoint_id is the one we generated locally).
869        let token = extract_pending_waitpoint_token(&raw)?;
870        Ok((waitpoint_id, token))
871    }
872
873    // ── Phase 4: Streaming ──
874
875    /// Append a frame to the current attempt's output stream.
876    ///
877    /// Non-consuming — the worker can append many frames during execution.
878    /// The stream is created lazily on the first append.
879    pub async fn append_frame(
880        &self,
881        frame_type: &str,
882        payload: &[u8],
883        metadata: Option<&str>,
884    ) -> Result<AppendFrameOutcome, SdkError> {
885        let partition = execution_partition(&self.execution_id, &self.partition_config);
886        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
887
888        let now = TimestampMs::now();
889
890        // KEYS (3): exec_core, stream_key, stream_meta
891        let keys: Vec<String> = vec![
892            ctx.core(),
893            ctx.stream(self.attempt_index),
894            ctx.stream_meta(self.attempt_index),
895        ];
896
897        let payload_str = String::from_utf8_lossy(payload);
898
899        // ARGV (13): execution_id, attempt_index, lease_id, lease_epoch,
900        //            frame_type, ts, payload, encoding, correlation_id,
901        //            source, retention_maxlen, attempt_id, max_payload_bytes
902        let args: Vec<String> = vec![
903            self.execution_id.to_string(),     // 1
904            self.attempt_index.to_string(),    // 2
905            self.lease_id.to_string(),         // 3
906            self.lease_epoch.to_string(),      // 4
907            frame_type.to_owned(),             // 5
908            now.to_string(),                   // 6 ts
909            payload_str.into_owned(),          // 7
910            "utf8".to_owned(),                 // 8 encoding
911            metadata.unwrap_or("").to_owned(), // 9 correlation_id
912            "worker".to_owned(),               // 10 source
913            "10000".to_owned(),                // 11 retention_maxlen
914            self.attempt_id.to_string(),       // 12
915            "65536".to_owned(),                // 13 max_payload_bytes
916        ];
917
918        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
919        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
920
921        let raw: Value = self
922            .client
923            .fcall("ff_append_frame", &key_refs, &arg_refs)
924            .await
925            .map_err(SdkError::Valkey)?;
926
927        parse_append_frame_result(&raw)
928    }
929
930    // ── Phase 3: Suspend ──
931
932    /// Suspend the execution, releasing the lease and creating a waitpoint.
933    ///
934    /// The execution transitions to `suspended` and the worker loses ownership.
935    /// An external signal matching the condition will resume the execution.
936    ///
937    /// If `condition_matchers` is empty, a wildcard matcher is created that
938    /// matches ANY signal name. To require an explicit operator resume with
939    /// no signal match, pass a sentinel name that no real signal will use.
940    ///
941    /// If buffered signals on a pending waitpoint already satisfy the condition,
942    /// returns `AlreadySatisfied` and the lease is NOT released.
943    ///
944    /// Consumes self — the task cannot be used after suspension.
945    pub async fn suspend(
946        self,
947        reason_code: &str,
948        condition_matchers: &[ConditionMatcher],
949        timeout_ms: Option<u64>,
950        timeout_behavior: TimeoutBehavior,
951    ) -> Result<SuspendOutcome, SdkError> {
952        let partition = execution_partition(&self.execution_id, &self.partition_config);
953        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
954        let idx = IndexKeys::new(&partition);
955
956        let suspension_id = SuspensionId::new();
957        let waitpoint_id = WaitpointId::new();
958        // For now, use a simple opaque key (real implementation would use HMAC token)
959        let waitpoint_key = format!("wpk:{}", waitpoint_id);
960
961        let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
962
963        // Build resume condition JSON
964        let required_signal_names: Vec<&str> = condition_matchers
965            .iter()
966            .map(|m| m.signal_name.as_str())
967            .collect();
968        let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
969        let resume_condition_json = serde_json::json!({
970            "condition_type": "signal_set",
971            "required_signal_names": required_signal_names,
972            "signal_match_mode": match_mode,
973            "minimum_signal_count": 1,
974            "timeout_behavior": timeout_behavior.as_str(),
975            "allow_operator_override": true,
976        }).to_string();
977
978        let resume_policy_json = serde_json::json!({
979            "resume_target": "runnable",
980            "close_waitpoint_on_resume": true,
981            "consume_matched_signals": true,
982            "retain_signal_buffer_until_closed": true,
983        }).to_string();
984
985        // KEYS (17): exec_core, attempt_record, lease_current, lease_history,
986        //            lease_expiry, worker_leases, suspension_current, waitpoint_hash,
987        //            waitpoint_signals, suspension_timeout, pending_wp_expiry,
988        //            active_index, suspended_index, waitpoint_history, wp_condition,
989        //            attempt_timeout, hmac_secrets
990        let keys: Vec<String> = vec![
991            ctx.core(),                                          // 1
992            ctx.attempt_hash(self.attempt_index),                // 2
993            ctx.lease_current(),                                 // 3
994            ctx.lease_history(),                                 // 4
995            idx.lease_expiry(),                                  // 5
996            idx.worker_leases(&self.worker_instance_id),         // 6
997            ctx.suspension_current(),                            // 7
998            ctx.waitpoint(&waitpoint_id),                        // 8
999            ctx.waitpoint_signals(&waitpoint_id),                // 9
1000            idx.suspension_timeout(),                            // 10
1001            idx.pending_waitpoint_expiry(),                      // 11
1002            idx.lane_active(&self.lane_id),                      // 12
1003            idx.lane_suspended(&self.lane_id),                   // 13
1004            ctx.waitpoints(),                                    // 14
1005            ctx.waitpoint_condition(&waitpoint_id),              // 15
1006            idx.attempt_timeout(),                               // 16
1007            idx.waitpoint_hmac_secrets(),                        // 17
1008        ];
1009
1010        // ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
1011        //            lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
1012        //            reason_code, requested_by, timeout_at, resume_condition_json,
1013        //            resume_policy_json, continuation_metadata_pointer,
1014        //            use_pending_waitpoint, timeout_behavior, lease_history_maxlen
1015        let args: Vec<String> = vec![
1016            self.execution_id.to_string(),                          // 1
1017            self.attempt_index.to_string(),                         // 2
1018            self.attempt_id.to_string(),                            // 3
1019            self.lease_id.to_string(),                              // 4
1020            self.lease_epoch.to_string(),                           // 5
1021            suspension_id.to_string(),                              // 6
1022            waitpoint_id.to_string(),                               // 7
1023            waitpoint_key.clone(),                                  // 8
1024            reason_code.to_owned(),                                 // 9
1025            "worker".to_owned(),                                    // 10
1026            timeout_at.map_or(String::new(), |t| t.to_string()),   // 11
1027            resume_condition_json,                                  // 12
1028            resume_policy_json,                                     // 13
1029            String::new(),                                          // 14 continuation_metadata_ptr
1030            String::new(),                                          // 15 use_pending_waitpoint
1031            timeout_behavior.as_str().to_owned(),                   // 16
1032            "1000".to_owned(),                                      // 17 lease_history_maxlen
1033        ];
1034
1035        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1036        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1037
1038        let raw: Value = self
1039            .client
1040            .fcall("ff_suspend_execution", &key_refs, &arg_refs)
1041            .await
1042            .map_err(SdkError::Valkey)?;
1043
1044        self.stop_renewal();
1045        parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
1046    }
1047
1048    /// Read the signals that satisfied the waitpoint and triggered this
1049    /// resume.
1050    ///
1051    /// Non-consuming. Intended to be called immediately after re-claim via
1052    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
1053    /// subsequent `suspend()` (which replaces `suspension:current`).
1054    ///
1055    /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
1056    ///
1057    /// - No prior suspension on this execution.
1058    /// - The prior suspension belonged to an earlier attempt (e.g. the
1059    ///   attempt was cancelled/failed and a retry is now claiming).
1060    /// - The prior suspension was closed by timeout / cancel / operator
1061    ///   override rather than by a matched signal.
1062    ///
1063    /// Reads `suspension:current` once, filters by `attempt_index` to
1064    /// guard against stale prior-attempt records, then fetches the matched
1065    /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
1066    /// fields and reads each signal's metadata + payload directly.
1067    pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
1068        let partition = execution_partition(&self.execution_id, &self.partition_config);
1069        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
1070
1071        let susp: HashMap<String, String> = self
1072            .client
1073            .hgetall(&ctx.suspension_current())
1074            .await
1075            .map_err(|e| SdkError::ValkeyContext {
1076                source: e,
1077                context: "HGETALL suspension_current".into(),
1078            })?;
1079
1080        let Some(waitpoint_id) =
1081            resume_waitpoint_id_from_suspension(&susp, self.attempt_index)?
1082        else {
1083            return Ok(Vec::new());
1084        };
1085
1086        // Bounded read of waitpoint_condition: HGET total_matchers first,
1087        // then HMGET exactly the fields we need per matcher slot. Avoids an
1088        // unbounded HGETALL (matcher:N:* fields grow with condition size).
1089        let wp_cond_key = ctx.waitpoint_condition(&waitpoint_id);
1090        let total_str: Option<String> = self
1091            .client
1092            .hget(&wp_cond_key, "total_matchers")
1093            .await
1094            .map_err(|e| SdkError::ValkeyContext {
1095                source: e,
1096                context: "HGET total_matchers".into(),
1097            })?;
1098        let total: usize = total_str
1099            .as_deref()
1100            .and_then(|s| s.parse().ok())
1101            .unwrap_or(0);
1102
1103        let mut signal_ids: Vec<SignalId> = Vec::new();
1104        for i in 0..total {
1105            let fields: Vec<Option<String>> = self
1106                .client
1107                .cmd("HMGET")
1108                .arg(&wp_cond_key)
1109                .arg(format!("matcher:{i}:satisfied"))
1110                .arg(format!("matcher:{i}:signal_id"))
1111                .execute()
1112                .await
1113                .map_err(|e| SdkError::ValkeyContext {
1114                    source: e,
1115                    context: "HMGET matcher slot".into(),
1116                })?;
1117            let satisfied = fields.first().and_then(|o| o.as_deref());
1118            if satisfied != Some("1") {
1119                continue;
1120            }
1121            let Some(raw) = fields.get(1).and_then(|o| o.as_deref()).filter(|s| !s.is_empty())
1122            else {
1123                continue;
1124            };
1125            match SignalId::parse(raw) {
1126                Ok(sid) => signal_ids.push(sid),
1127                Err(e) => {
1128                    tracing::warn!(
1129                        execution_id = %self.execution_id,
1130                        waitpoint_id = %waitpoint_id,
1131                        raw = %raw,
1132                        error = %e,
1133                        "resume_signals: matcher signal_id failed to parse, skipping"
1134                    );
1135                }
1136            }
1137        }
1138
1139        let mut out: Vec<ResumeSignal> = Vec::with_capacity(signal_ids.len());
1140        for signal_id in signal_ids {
1141            let sig: HashMap<String, String> = self
1142                .client
1143                .hgetall(&ctx.signal(&signal_id))
1144                .await
1145                .map_err(|e| SdkError::ValkeyContext {
1146                    source: e,
1147                    context: "HGETALL signal_hash".into(),
1148                })?;
1149            if sig.is_empty() {
1150                // Signal hash was GC'd (not currently a code path, but
1151                // defensive against future cleanup scanners). Skip.
1152                continue;
1153            }
1154
1155            // Read payload as raw Value so non-UTF-8 bytes survive intact.
1156            // Previously this was `get::<Option<String>>` + `into_bytes`,
1157            // which would panic/error on any non-UTF-8 payload (reachable
1158            // via direct-FCALL callers that bypass the SDK's lossy
1159            // UTF-8 encode path in deliver_signal).
1160            let payload_raw: Option<Value> = self
1161                .client
1162                .cmd("GET")
1163                .arg(ctx.signal_payload(&signal_id))
1164                .execute()
1165                .await
1166                .map_err(|e| SdkError::ValkeyContext {
1167                    source: e,
1168                    context: "GET signal_payload".into(),
1169                })?;
1170            let payload: Option<Vec<u8>> = match payload_raw {
1171                Some(Value::BulkString(b)) => Some(b.to_vec()),
1172                Some(Value::SimpleString(s)) => Some(s.into_bytes()),
1173                _ => None,
1174            };
1175
1176            let accepted_at = sig
1177                .get("accepted_at")
1178                .and_then(|s| s.parse::<i64>().ok())
1179                .map(TimestampMs::from_millis)
1180                .unwrap_or_else(|| TimestampMs::from_millis(0));
1181
1182            out.push(ResumeSignal {
1183                signal_id,
1184                signal_name: sig.get("signal_name").cloned().unwrap_or_default(),
1185                signal_category: sig.get("signal_category").cloned().unwrap_or_default(),
1186                source_type: sig.get("source_type").cloned().unwrap_or_default(),
1187                source_identity: sig.get("source_identity").cloned().unwrap_or_default(),
1188                correlation_id: sig.get("correlation_id").cloned().unwrap_or_default(),
1189                accepted_at,
1190                payload,
1191            });
1192        }
1193
1194        Ok(out)
1195    }
1196
1197    /// Signal the renewal task to stop. Called by every terminal op
1198    /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
1199    /// `move_to_waiting_children`) after the FCALL returns. Also marks
1200    /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
1201    /// consumption from a genuine drop-without-terminal-op.
1202    fn stop_renewal(&self) {
1203        self.terminal_op_called.store(true, Ordering::Release);
1204        self.renewal_stop.notify_one();
1205    }
1206
1207    /// Read the retry policy JSON from the execution's policy key.
1208    async fn read_retry_policy_json(&self, ctx: &ExecKeyContext) -> Result<String, SdkError> {
1209        let policy_str: Option<String> = self
1210            .client
1211            .get(&ctx.policy())
1212            .await
1213            .map_err(|e| SdkError::ValkeyContext { source: e, context: "read retry policy".into() })?;
1214
1215        match policy_str {
1216            Some(json) => {
1217                match serde_json::from_str::<serde_json::Value>(&json) {
1218                    Ok(policy) => {
1219                        if let Some(retry) = policy.get("retry_policy") {
1220                            return Ok(serde_json::to_string(retry).unwrap_or_default());
1221                        }
1222                        Ok(String::new())
1223                    }
1224                    Err(e) => {
1225                        tracing::warn!(
1226                            execution_id = %self.execution_id,
1227                            error = %e,
1228                            "malformed retry policy JSON, treating as no policy"
1229                        );
1230                        Ok(String::new())
1231                    }
1232                }
1233            }
1234            None => Ok(String::new()),
1235        }
1236    }
1237}
1238
1239impl Drop for ClaimedTask {
1240    fn drop(&mut self) {
1241        // Abort the background renewal task on drop.
1242        // This is a safety net — complete/fail/cancel already stop renewal
1243        // via notify before consuming self. But if the task is dropped
1244        // without being consumed (e.g., panic), abort prevents leaked renewals.
1245        //
1246        // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1247        // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1248        // and then self is consumed into Drop immediately. The renewal task
1249        // has not yet been polled by the runtime, so `is_finished()` is still
1250        // `false` here — which previously fired the warning on every
1251        // complete/fail/cancel/suspend call. `terminal_op_called` is the
1252        // authoritative signal that a terminal-op path ran to the point of
1253        // stopping renewal; it does not by itself certify the Lua side
1254        // succeeded (see the field doc). The caller surfaces any error via
1255        // the op's return value, so a `Drop` warning is unneeded there.
1256        if !self.terminal_op_called.load(Ordering::Acquire) {
1257            tracing::warn!(
1258                execution_id = %self.execution_id,
1259                "ClaimedTask dropped without terminal operation — lease will expire"
1260            );
1261        }
1262        self.renewal_handle.abort();
1263    }
1264}
1265
1266// ── Lease renewal ──
1267
1268/// Perform a single lease renewal via FCALL ff_renew_lease.
1269///
1270/// The span is named `renew_lease` with target `ff_sdk::task` so bench
1271/// harnesses can attach a `tracing_subscriber::Layer` and measure
1272/// renewal count + per-call duration via `on_enter` / `on_exit`
1273/// without polluting the hot path with additional instrumentation.
1274/// See `benches/harness/src/bin/long_running.rs` for the consumer.
1275#[allow(clippy::too_many_arguments)]
1276#[tracing::instrument(
1277    name = "renew_lease",
1278    skip_all,
1279    fields(execution_id = %execution_id)
1280)]
1281async fn renew_lease_inner(
1282    client: &Client,
1283    partition_config: &PartitionConfig,
1284    execution_id: &ExecutionId,
1285    attempt_index: AttemptIndex,
1286    attempt_id: &AttemptId,
1287    lease_id: &LeaseId,
1288    lease_epoch: LeaseEpoch,
1289    lease_ttl_ms: u64,
1290) -> Result<(), SdkError> {
1291    let partition = execution_partition(execution_id, partition_config);
1292    let ctx = ExecKeyContext::new(&partition, execution_id);
1293    let idx = IndexKeys::new(&partition);
1294
1295    // KEYS: exec_core, lease_current, lease_history, lease_expiry_zset
1296    let keys: Vec<String> = vec![
1297        ctx.core(),
1298        ctx.lease_current(),
1299        ctx.lease_history(),
1300        idx.lease_expiry(),
1301    ];
1302
1303    // ARGV: execution_id, attempt_index, attempt_id, lease_id, lease_epoch,
1304    //       lease_ttl_ms, lease_history_grace_ms
1305    let lease_history_grace_ms = 5000_u64; // 5s grace for cleanup
1306    let args: Vec<String> = vec![
1307        execution_id.to_string(),
1308        attempt_index.to_string(),
1309        attempt_id.to_string(),
1310        lease_id.to_string(),
1311        lease_epoch.to_string(),
1312        lease_ttl_ms.to_string(),
1313        lease_history_grace_ms.to_string(),
1314    ];
1315
1316    let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
1317    let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
1318
1319    let raw: Value = client
1320        .fcall("ff_renew_lease", &key_refs, &arg_refs)
1321        .await
1322        .map_err(SdkError::Valkey)?;
1323
1324    parse_success_result(&raw, "ff_renew_lease")
1325}
1326
1327/// Spawn a background tokio task that renews the lease at `ttl / 3` intervals.
1328///
1329/// Stops when:
1330/// - `stop_signal` is notified (complete/fail/cancel called)
1331/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1332/// - The task handle is aborted (ClaimedTask dropped)
1333#[allow(clippy::too_many_arguments, dead_code)]
1334fn spawn_renewal_task(
1335    client: Client,
1336    partition_config: PartitionConfig,
1337    execution_id: ExecutionId,
1338    attempt_index: AttemptIndex,
1339    attempt_id: AttemptId,
1340    lease_id: LeaseId,
1341    lease_epoch: LeaseEpoch,
1342    lease_ttl_ms: u64,
1343    stop_signal: Arc<Notify>,
1344    failure_counter: Arc<AtomicU32>,
1345) -> JoinHandle<()> {
1346    let interval = Duration::from_millis(lease_ttl_ms / 3);
1347
1348    tokio::spawn(async move {
1349        let mut tick = tokio::time::interval(interval);
1350        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1351        // Skip the first immediate tick — the lease was just acquired.
1352        tick.tick().await;
1353
1354        loop {
1355            tokio::select! {
1356                _ = stop_signal.notified() => {
1357                    tracing::debug!(
1358                        execution_id = %execution_id,
1359                        "lease renewal stopped by signal"
1360                    );
1361                    return;
1362                }
1363                _ = tick.tick() => {
1364                    match renew_lease_inner(
1365                        &client,
1366                        &partition_config,
1367                        &execution_id,
1368                        attempt_index,
1369                        &attempt_id,
1370                        &lease_id,
1371                        lease_epoch,
1372                        lease_ttl_ms,
1373                    )
1374                    .await
1375                    {
1376                        Ok(()) => {
1377                            failure_counter.store(0, Ordering::Relaxed);
1378                            tracing::trace!(
1379                                execution_id = %execution_id,
1380                                "lease renewed"
1381                            );
1382                        }
1383                        Err(SdkError::Script(ref e)) if is_terminal_renewal_error(e) => {
1384                            failure_counter.fetch_add(1, Ordering::Relaxed);
1385                            tracing::warn!(
1386                                execution_id = %execution_id,
1387                                error = %e,
1388                                "lease renewal failed with terminal error, stopping renewal"
1389                            );
1390                            return;
1391                        }
1392                        Err(e) => {
1393                            let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1394                            tracing::warn!(
1395                                execution_id = %execution_id,
1396                                error = %e,
1397                                consecutive_failures = count,
1398                                "lease renewal failed (will retry next interval)"
1399                            );
1400                        }
1401                    }
1402                }
1403            }
1404        }
1405    })
1406}
1407
1408/// Check if a script error means renewal should stop permanently.
1409#[allow(dead_code)]
1410fn is_terminal_renewal_error(err: &ScriptError) -> bool {
1411    matches!(
1412        err,
1413        ScriptError::StaleLease
1414            | ScriptError::LeaseExpired
1415            | ScriptError::LeaseRevoked
1416            | ScriptError::ExecutionNotActive
1417            | ScriptError::ExecutionNotFound
1418    )
1419}
1420
1421// ── FCALL result parsing ──
1422
1423/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1424/// function into a typed [`ReportUsageResult`].
1425///
1426/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1427///                  `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1428/// Status code `!= 1` is parsed as a [`ScriptError`] via
1429/// [`ScriptError::from_code_with_detail`].
1430///
1431/// Exposed as `pub` so downstream SDKs that speak the same wire format
1432/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1433/// call this directly instead of re-implementing the parse. Keeping one
1434/// parser paired with the producer (the Lua function registered at
1435/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1436/// against silent format drift between producer and consumer.
1437pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1438    let arr = match raw {
1439        Value::Array(arr) => arr,
1440        _ => {
1441            return Err(SdkError::Script(ScriptError::Parse(
1442                "ff_report_usage_and_check: expected Array".into(),
1443            )));
1444        }
1445    };
1446    let status_code = match arr.first() {
1447        Some(Ok(Value::Int(n))) => *n,
1448        _ => {
1449            return Err(SdkError::Script(ScriptError::Parse(
1450                "ff_report_usage_and_check: expected Int status code".into(),
1451            )));
1452        }
1453    };
1454    if status_code != 1 {
1455        let error_code = usage_field_str(arr, 1);
1456        let detail = usage_field_str(arr, 2);
1457        return Err(SdkError::Script(
1458            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1459                ScriptError::Parse(format!("ff_report_usage_and_check: {error_code}"))
1460            }),
1461        ));
1462    }
1463    let sub_status = usage_field_str(arr, 1);
1464    match sub_status.as_str() {
1465        "OK" => Ok(ReportUsageResult::Ok),
1466        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1467        "SOFT_BREACH" => {
1468            let dim = usage_field_str(arr, 2);
1469            let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1470            let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1471            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1472        }
1473        "HARD_BREACH" => {
1474            let dim = usage_field_str(arr, 2);
1475            let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1476            let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1477            Ok(ReportUsageResult::HardBreach {
1478                dimension: dim,
1479                current_usage: current,
1480                hard_limit: limit,
1481            })
1482        }
1483        _ => Err(SdkError::Script(ScriptError::Parse(format!(
1484            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1485        )))),
1486    }
1487}
1488
1489fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1490    match arr.get(index) {
1491        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1492        Some(Ok(Value::SimpleString(s))) => s.clone(),
1493        Some(Ok(Value::Int(n))) => n.to_string(),
1494        _ => String::new(),
1495    }
1496}
1497
1498/// Parse a required numeric usage field (u64) from the wire array at
1499/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1500/// holds a non-string/non-int value, or contains a string that does
1501/// not parse as u64.
1502///
1503/// Rationale: the Lua producer (`lua/budget.lua:99`,
1504/// `ff_report_usage_and_check`) always emits
1505/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1506/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1507/// non-numeric value here means the Lua and Rust sides drifted;
1508/// silently coercing to `0` would surface drift as "zero-usage breach"
1509/// — arithmetically correct but semantically nonsense. Fail loudly
1510/// instead so drift shows up as a parse error at the first call site.
1511fn parse_usage_u64(
1512    arr: &[Result<Value, ferriskey::Error>],
1513    index: usize,
1514    sub_status: &str,
1515    field_name: &str,
1516) -> Result<u64, SdkError> {
1517    match arr.get(index) {
1518        Some(Ok(Value::Int(n))) => {
1519            u64::try_from(*n).map_err(|_| {
1520                SdkError::Script(ScriptError::Parse(format!(
1521                    "ff_report_usage_and_check {sub_status}: {field_name} \
1522                     (index {index}) negative int {n} cannot be u64"
1523                )))
1524            })
1525        }
1526        Some(Ok(Value::BulkString(b))) => {
1527            let s = String::from_utf8_lossy(b);
1528            s.parse::<u64>().map_err(|_| {
1529                SdkError::Script(ScriptError::Parse(format!(
1530                    "ff_report_usage_and_check {sub_status}: {field_name} \
1531                     (index {index}) not a u64 string: {s:?}"
1532                )))
1533            })
1534        }
1535        Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1536            SdkError::Script(ScriptError::Parse(format!(
1537                "ff_report_usage_and_check {sub_status}: {field_name} \
1538                 (index {index}) not a u64 string: {s:?}"
1539            )))
1540        }),
1541        Some(_) => Err(SdkError::Script(ScriptError::Parse(format!(
1542            "ff_report_usage_and_check {sub_status}: {field_name} \
1543             (index {index}) wrong wire type (expected Int or String)"
1544        )))),
1545        None => Err(SdkError::Script(ScriptError::Parse(format!(
1546            "ff_report_usage_and_check {sub_status}: {field_name} \
1547             (index {index}) missing from response"
1548        )))),
1549    }
1550}
1551
1552/// Parse a standard {1, "OK", ...} / {0, "error", ...} FCALL result.
1553/// Extract the waitpoint_token (field index 4 in the 0-indexed response array)
1554/// from an `ff_create_pending_waitpoint` reply. Runs `parse_success_result`
1555/// first so error cases produce the usual typed error, not a parse failure.
1556fn extract_pending_waitpoint_token(raw: &Value) -> Result<WaitpointToken, SdkError> {
1557    parse_success_result(raw, "ff_create_pending_waitpoint")?;
1558    let arr = match raw {
1559        Value::Array(arr) => arr,
1560        _ => unreachable!("parse_success_result would have rejected non-array"),
1561    };
1562    // Layout: [0]=1, [1]="OK", [2]=waitpoint_id, [3]=waitpoint_key, [4]=waitpoint_token
1563    let token_str = arr
1564        .get(4)
1565        .and_then(|v| match v {
1566            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1567            Ok(Value::SimpleString(s)) => Some(s.clone()),
1568            _ => None,
1569        })
1570        .ok_or_else(|| {
1571            SdkError::Script(ScriptError::Parse(
1572                "ff_create_pending_waitpoint: missing waitpoint_token in response".into(),
1573            ))
1574        })?;
1575    Ok(WaitpointToken::new(token_str))
1576}
1577
1578/// Pure helper: decide whether `suspension:current` represents a
1579/// signal-driven resume for the currently-claimed attempt, and extract
1580/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1581/// (no record, stale prior-attempt, non-resumed close). Returns an error
1582/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1583/// bug rather than a missing-data case.
1584fn resume_waitpoint_id_from_suspension(
1585    susp: &HashMap<String, String>,
1586    claimed_attempt: AttemptIndex,
1587) -> Result<Option<WaitpointId>, SdkError> {
1588    if susp.is_empty() {
1589        return Ok(None);
1590    }
1591    let susp_att: u32 = susp
1592        .get("attempt_index")
1593        .and_then(|s| s.parse().ok())
1594        .unwrap_or(u32::MAX);
1595    if susp_att != claimed_attempt.0 {
1596        return Ok(None);
1597    }
1598    let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1599    if close_reason != "resumed" {
1600        return Ok(None);
1601    }
1602    let wp_id_str = susp
1603        .get("waitpoint_id")
1604        .map(String::as_str)
1605        .unwrap_or_default();
1606    if wp_id_str.is_empty() {
1607        return Ok(None);
1608    }
1609    let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1610        SdkError::Script(ScriptError::Parse(format!(
1611            "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1612        )))
1613    })?;
1614    Ok(Some(waitpoint_id))
1615}
1616
1617pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1618    let arr = match raw {
1619        Value::Array(arr) => arr,
1620        _ => {
1621            return Err(SdkError::Script(ScriptError::Parse(format!(
1622                "{function_name}: expected Array, got non-array"
1623            ))));
1624        }
1625    };
1626
1627    if arr.is_empty() {
1628        return Err(SdkError::Script(ScriptError::Parse(format!(
1629            "{function_name}: empty result array"
1630        ))));
1631    }
1632
1633    let status_code = match arr.first() {
1634        Some(Ok(Value::Int(n))) => *n,
1635        _ => {
1636            return Err(SdkError::Script(ScriptError::Parse(format!(
1637                "{function_name}: expected Int at index 0"
1638            ))));
1639        }
1640    };
1641
1642    if status_code == 1 {
1643        Ok(())
1644    } else {
1645        // Extract error code from index 1 and optional detail from index 2
1646        // (e.g. `capability_mismatch` ships missing tokens there). Variants
1647        // that carry a String payload pick the detail up via
1648        // `from_code_with_detail`; other variants ignore it.
1649        let field_str = |idx: usize| -> String {
1650            arr.get(idx)
1651                .and_then(|v| match v {
1652                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1653                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1654                    _ => None,
1655                })
1656                .unwrap_or_default()
1657        };
1658        let error_code = {
1659            let s = field_str(1);
1660            if s.is_empty() { "unknown".to_owned() } else { s }
1661        };
1662        let detail = field_str(2);
1663
1664        let script_err = ScriptError::from_code_with_detail(&error_code, &detail)
1665            .unwrap_or_else(|| {
1666                ScriptError::Parse(format!("{function_name}: unknown error: {error_code}"))
1667            });
1668
1669        Err(SdkError::Script(script_err))
1670    }
1671}
1672
1673/// Parse ff_suspend_execution result:
1674///   ok(suspension_id, waitpoint_id, waitpoint_key)
1675///   ok_already_satisfied(suspension_id, waitpoint_id, waitpoint_key)
1676fn parse_suspend_result(
1677    raw: &Value,
1678    suspension_id: SuspensionId,
1679    waitpoint_id: WaitpointId,
1680    waitpoint_key: String,
1681) -> Result<SuspendOutcome, SdkError> {
1682    let arr = match raw {
1683        Value::Array(arr) => arr,
1684        _ => {
1685            return Err(SdkError::Script(ScriptError::Parse(
1686                "ff_suspend_execution: expected Array".into(),
1687            )));
1688        }
1689    };
1690
1691    let status_code = match arr.first() {
1692        Some(Ok(Value::Int(n))) => *n,
1693        _ => {
1694            return Err(SdkError::Script(ScriptError::Parse(
1695                "ff_suspend_execution: bad status code".into(),
1696            )));
1697        }
1698    };
1699
1700    if status_code != 1 {
1701        let err_field_str = |idx: usize| -> String {
1702            arr.get(idx)
1703                .and_then(|v| match v {
1704                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1705                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1706                    _ => None,
1707                })
1708                .unwrap_or_default()
1709        };
1710        let error_code = {
1711            let s = err_field_str(1);
1712            if s.is_empty() { "unknown".to_owned() } else { s }
1713        };
1714        let detail = err_field_str(2);
1715        return Err(SdkError::Script(
1716            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1717                ScriptError::Parse(format!("ff_suspend_execution: {error_code}"))
1718            }),
1719        ));
1720    }
1721
1722    // Check sub-status at index 1
1723    let sub_status = arr
1724        .get(1)
1725        .and_then(|v| match v {
1726            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1727            Ok(Value::SimpleString(s)) => Some(s.clone()),
1728            _ => None,
1729        })
1730        .unwrap_or_default();
1731
1732    // Lua returns: {1, status, suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
1733    // The suspension_id/waitpoint_id/waitpoint_key values the worker passed in are
1734    // authoritative (Lua echoes them); waitpoint_token however is MINTED by Lua and
1735    // must be read from the response.
1736    let waitpoint_token = arr
1737        .get(5)
1738        .and_then(|v| match v {
1739            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1740            Ok(Value::SimpleString(s)) => Some(s.clone()),
1741            _ => None,
1742        })
1743        .map(WaitpointToken::new)
1744        .ok_or_else(|| {
1745            SdkError::Script(ScriptError::Parse(
1746                "ff_suspend_execution: missing waitpoint_token in response".into(),
1747            ))
1748        })?;
1749
1750    if sub_status == "ALREADY_SATISFIED" {
1751        Ok(SuspendOutcome::AlreadySatisfied {
1752            suspension_id,
1753            waitpoint_id,
1754            waitpoint_key,
1755            waitpoint_token,
1756        })
1757    } else {
1758        Ok(SuspendOutcome::Suspended {
1759            suspension_id,
1760            waitpoint_id,
1761            waitpoint_key,
1762            waitpoint_token,
1763        })
1764    }
1765}
1766
1767/// Parse ff_deliver_signal result:
1768///   ok(signal_id, effect)
1769///   ok_duplicate(existing_signal_id)
1770pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1771    let arr = match raw {
1772        Value::Array(arr) => arr,
1773        _ => {
1774            return Err(SdkError::Script(ScriptError::Parse(
1775                "ff_deliver_signal: expected Array".into(),
1776            )));
1777        }
1778    };
1779
1780    let status_code = match arr.first() {
1781        Some(Ok(Value::Int(n))) => *n,
1782        _ => {
1783            return Err(SdkError::Script(ScriptError::Parse(
1784                "ff_deliver_signal: bad status code".into(),
1785            )));
1786        }
1787    };
1788
1789    if status_code != 1 {
1790        let err_field_str = |idx: usize| -> String {
1791            arr.get(idx)
1792                .and_then(|v| match v {
1793                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1794                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1795                    _ => None,
1796                })
1797                .unwrap_or_default()
1798        };
1799        let error_code = {
1800            let s = err_field_str(1);
1801            if s.is_empty() { "unknown".to_owned() } else { s }
1802        };
1803        let detail = err_field_str(2);
1804        return Err(SdkError::Script(
1805            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1806                ScriptError::Parse(format!("ff_deliver_signal: {error_code}"))
1807            }),
1808        ));
1809    }
1810
1811    let sub_status = arr
1812        .get(1)
1813        .and_then(|v| match v {
1814            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1815            Ok(Value::SimpleString(s)) => Some(s.clone()),
1816            _ => None,
1817        })
1818        .unwrap_or_default();
1819
1820    if sub_status == "DUPLICATE" {
1821        let existing_id = arr
1822            .get(2)
1823            .and_then(|v| match v {
1824                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1825                Ok(Value::SimpleString(s)) => Some(s.clone()),
1826                _ => None,
1827            })
1828            .unwrap_or_default();
1829        return Ok(SignalOutcome::Duplicate {
1830            existing_signal_id: existing_id,
1831        });
1832    }
1833
1834    // Parse: {1, "OK", signal_id, effect}
1835    let signal_id_str = arr
1836        .get(2)
1837        .and_then(|v| match v {
1838            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1839            Ok(Value::SimpleString(s)) => Some(s.clone()),
1840            _ => None,
1841        })
1842        .unwrap_or_default();
1843
1844    let effect = arr
1845        .get(3)
1846        .and_then(|v| match v {
1847            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1848            Ok(Value::SimpleString(s)) => Some(s.clone()),
1849            _ => None,
1850        })
1851        .unwrap_or_default();
1852
1853    let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1854        SdkError::Script(ScriptError::Parse(format!(
1855            "ff_deliver_signal: invalid signal_id from Lua: {e}"
1856        )))
1857    })?;
1858
1859    if effect == "resume_condition_satisfied" {
1860        Ok(SignalOutcome::TriggeredResume { signal_id })
1861    } else {
1862        Ok(SignalOutcome::Accepted { signal_id, effect })
1863    }
1864}
1865
1866/// Parse ff_append_frame result: ok(entry_id, frame_count)
1867fn parse_append_frame_result(raw: &Value) -> Result<AppendFrameOutcome, SdkError> {
1868    let arr = match raw {
1869        Value::Array(arr) => arr,
1870        _ => {
1871            return Err(SdkError::Script(ScriptError::Parse(
1872                "ff_append_frame: expected Array".into(),
1873            )));
1874        }
1875    };
1876
1877    let status_code = match arr.first() {
1878        Some(Ok(Value::Int(n))) => *n,
1879        _ => {
1880            return Err(SdkError::Script(ScriptError::Parse(
1881                "ff_append_frame: bad status code".into(),
1882            )));
1883        }
1884    };
1885
1886    if status_code != 1 {
1887        let err_field_str = |idx: usize| -> String {
1888            arr.get(idx)
1889                .and_then(|v| match v {
1890                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1891                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1892                    _ => None,
1893                })
1894                .unwrap_or_default()
1895        };
1896        let error_code = {
1897            let s = err_field_str(1);
1898            if s.is_empty() { "unknown".to_owned() } else { s }
1899        };
1900        let detail = err_field_str(2);
1901        return Err(SdkError::Script(
1902            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1903                ScriptError::Parse(format!("ff_append_frame: {error_code}"))
1904            }),
1905        ));
1906    }
1907
1908    // ok(entry_id, frame_count)  → arr[2] = entry_id, arr[3] = frame_count
1909    let stream_id = arr
1910        .get(2)
1911        .and_then(|v| match v {
1912            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1913            Ok(Value::SimpleString(s)) => Some(s.clone()),
1914            _ => None,
1915        })
1916        .unwrap_or_default();
1917
1918    let frame_count = arr
1919        .get(3)
1920        .and_then(|v| match v {
1921            Ok(Value::BulkString(b)) => String::from_utf8_lossy(b).parse::<u64>().ok(),
1922            Ok(Value::SimpleString(s)) => s.parse::<u64>().ok(),
1923            Ok(Value::Int(n)) => Some(*n as u64),
1924            _ => None,
1925        })
1926        .unwrap_or(0);
1927
1928    Ok(AppendFrameOutcome {
1929        stream_id,
1930        frame_count,
1931    })
1932}
1933
1934/// Parse ff_fail_execution result:
1935///   ok("retry_scheduled", delay_until)
1936///   ok("terminal_failed")
1937fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1938    let arr = match raw {
1939        Value::Array(arr) => arr,
1940        _ => {
1941            return Err(SdkError::Script(ScriptError::Parse(
1942                "ff_fail_execution: expected Array".into(),
1943            )));
1944        }
1945    };
1946
1947    let status_code = match arr.first() {
1948        Some(Ok(Value::Int(n))) => *n,
1949        _ => {
1950            return Err(SdkError::Script(ScriptError::Parse(
1951                "ff_fail_execution: bad status code".into(),
1952            )));
1953        }
1954    };
1955
1956    if status_code != 1 {
1957        let err_field_str = |idx: usize| -> String {
1958            arr.get(idx)
1959                .and_then(|v| match v {
1960                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1961                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1962                    _ => None,
1963                })
1964                .unwrap_or_default()
1965        };
1966        let error_code = {
1967            let s = err_field_str(1);
1968            if s.is_empty() { "unknown".to_owned() } else { s }
1969        };
1970        let detail = err_field_str(2);
1971        return Err(SdkError::Script(
1972            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1973                ScriptError::Parse(format!("ff_fail_execution: {error_code}"))
1974            }),
1975        ));
1976    }
1977
1978    // Parse sub-status from field[2] (index 2 = first field after status+OK)
1979    let sub_status = arr
1980        .get(2)
1981        .and_then(|v| match v {
1982            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1983            Ok(Value::SimpleString(s)) => Some(s.clone()),
1984            _ => None,
1985        })
1986        .unwrap_or_default();
1987
1988    match sub_status.as_str() {
1989        "retry_scheduled" => {
1990            // Lua returns: ok("retry_scheduled", tostring(delay_until))
1991            // arr[3] = delay_until
1992            let delay_str = arr
1993                .get(3)
1994                .and_then(|v| match v {
1995                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1996                    Ok(Value::Int(n)) => Some(n.to_string()),
1997                    _ => None,
1998                })
1999                .unwrap_or_default();
2000            let delay_until = delay_str.parse::<i64>().unwrap_or(0);
2001
2002            Ok(FailOutcome::RetryScheduled {
2003                delay_until: TimestampMs::from_millis(delay_until),
2004            })
2005        }
2006        "terminal_failed" => Ok(FailOutcome::TerminalFailed),
2007        _ => Err(SdkError::Script(ScriptError::Parse(format!(
2008            "ff_fail_execution: unexpected sub-status: {sub_status}"
2009        )))),
2010    }
2011}
2012
2013// ── Stream read / tail (consumer API, RFC-006 #2) ──
2014
2015/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
2016/// endpoint ceiling so SDK callers can't wedge a connection longer than the
2017/// server would accept.
2018pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
2019
2020/// Maximum frames per read/tail call. Mirrors
2021/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
2022/// callers don't need to import ff-core just to read the bound.
2023pub use ff_core::contracts::STREAM_READ_HARD_CAP;
2024
2025/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
2026/// signal so polling consumers can exit cleanly.
2027///
2028/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
2029pub use ff_core::contracts::StreamFrames;
2030
2031fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
2032    if count_limit == 0 {
2033        return Err(SdkError::Config("count_limit must be >= 1".to_owned()));
2034    }
2035    if count_limit > STREAM_READ_HARD_CAP {
2036        return Err(SdkError::Config(format!(
2037            "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
2038        )));
2039    }
2040    Ok(())
2041}
2042
2043/// Read frames from a completed or in-flight attempt's stream.
2044///
2045/// `from_id` / `to_id` accept XRANGE special markers (`"-"`, `"+"`) or
2046/// entry IDs. `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
2047/// `0` returns [`SdkError::Config`].
2048///
2049/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
2050/// consumers know when the producer has finalized the stream. A
2051/// never-written attempt and an in-progress stream are indistinguishable
2052/// here — both present as `frames=[]`, `closed_at=None`.
2053///
2054/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
2055/// client but are not the lease-holding worker — no lease check is
2056/// performed.
2057///
2058/// # Head-of-line note
2059///
2060/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
2061/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
2062/// calling this on a `client` that is also serving FCALLs stalls those
2063/// FCALLs behind the reply. The REST server isolates reads on its
2064/// `tail_client`; direct SDK callers should either use a dedicated
2065/// client OR paginate through smaller `count_limit` slices.
2066pub async fn read_stream(
2067    client: &Client,
2068    partition_config: &PartitionConfig,
2069    execution_id: &ExecutionId,
2070    attempt_index: AttemptIndex,
2071    from_id: &str,
2072    to_id: &str,
2073    count_limit: u64,
2074) -> Result<StreamFrames, SdkError> {
2075    use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2076    validate_stream_read_count(count_limit)?;
2077
2078    let partition = execution_partition(execution_id, partition_config);
2079    let ctx = ExecKeyContext::new(&partition, execution_id);
2080    let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2081
2082    let args = ReadFramesArgs {
2083        execution_id: execution_id.clone(),
2084        attempt_index,
2085        from_id: from_id.to_owned(),
2086        to_id: to_id.to_owned(),
2087        count_limit,
2088    };
2089
2090    let ReadFramesResult::Frames(f) =
2091        ff_script::functions::stream::ff_read_attempt_stream(client, &keys, &args)
2092            .await
2093            .map_err(SdkError::Script)?;
2094    Ok(f)
2095}
2096
2097/// Tail a live attempt's stream.
2098///
2099/// `last_id` is exclusive — XREAD returns entries with id strictly greater
2100/// than `last_id`. Pass `"0-0"` to start from the beginning.
2101///
2102/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
2103/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
2104/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
2105/// SDK and REST ceilings aligned.
2106///
2107/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
2108/// polling consumers should loop until `result.is_closed()` is true, then
2109/// drain and exit. Timeout with no new frames presents as
2110/// `frames=[], closed_at=None`.
2111///
2112/// # Head-of-line warning — use a dedicated client
2113///
2114/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
2115/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
2116/// the read side until a frame arrives or the block elapses. If the
2117/// `client` you pass here is ALSO used for claims, completes, fails,
2118/// appends, or any other FCALL, a 30-second tail will stall all those
2119/// calls for up to 30 seconds.
2120///
2121/// **Strongly recommended**: build a separate `ferriskey::Client` for
2122/// tail callers — mirrors the `Server::tail_client` split that the REST
2123/// server uses internally (see `crates/ff-server/src/server.rs` and
2124/// RFC-006 Impl Notes §"Dedicated stream-op connection").
2125///
2126/// # Tail parallelism caveat (same mux)
2127///
2128/// Even a dedicated tail client is still one multiplexed TCP connection.
2129/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
2130/// ferriskey's per-call `request_timeout` starts at future-poll — so
2131/// two concurrent tails against the same client can time out spuriously:
2132/// the second call's BLOCK budget elapses while it waits for the first
2133/// BLOCK to return. The REST server handles this internally with a
2134/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
2135/// each call its full `block_ms` budget at the server.
2136///
2137/// **Direct SDK callers that need concurrent tails**: either
2138///   (1) build ONE `ferriskey::Client` per concurrent tail call (a small
2139///       pool of clients, rotated by the caller), OR
2140///   (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
2141///       only one BLOCK is in flight per client at a time.
2142/// If you need the REST-side backpressure (429 on contention) and the
2143/// built-in serializer, go through the
2144/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
2145/// than calling this directly.
2146///
2147/// This SDK does not enforce either pattern — the mutex belongs at the
2148/// application layer, and the connection pool belongs at the SDK
2149/// caller's DI layer; neither has a structured place inside this
2150/// helper.
2151///
2152/// # Timeout handling
2153///
2154/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
2155/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
2156/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
2157/// 500ms)`, overriding the client's default per-call. A tail with
2158/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
2159/// the client was built with a shorter `request_timeout`. No custom client
2160/// configuration is required for timeout reasons — only for head-of-line
2161/// isolation above.
2162pub async fn tail_stream(
2163    client: &Client,
2164    partition_config: &PartitionConfig,
2165    execution_id: &ExecutionId,
2166    attempt_index: AttemptIndex,
2167    last_id: &str,
2168    block_ms: u64,
2169    count_limit: u64,
2170) -> Result<StreamFrames, SdkError> {
2171    if block_ms > MAX_TAIL_BLOCK_MS {
2172        return Err(SdkError::Config(format!(
2173            "block_ms exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"
2174        )));
2175    }
2176    validate_stream_read_count(count_limit)?;
2177
2178    let partition = execution_partition(execution_id, partition_config);
2179    let ctx = ExecKeyContext::new(&partition, execution_id);
2180    let stream_key = ctx.stream(attempt_index);
2181    let stream_meta_key = ctx.stream_meta(attempt_index);
2182
2183    ff_script::stream_tail::xread_block(
2184        client,
2185        &stream_key,
2186        &stream_meta_key,
2187        last_id,
2188        block_ms,
2189        count_limit,
2190    )
2191    .await
2192    .map_err(SdkError::Script)
2193}
2194
2195#[cfg(test)]
2196mod parse_report_usage_result_tests {
2197    use super::*;
2198
2199    /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
2200    /// BulkString and SimpleString uniformly (see
2201    /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
2202    /// `Value::SimpleString(s)` → clone). SimpleString avoids a
2203    /// dev-dependency on `bytes` just for test construction.
2204    fn s(v: &str) -> Result<Value, ferriskey::Error> {
2205        Ok(Value::SimpleString(v.to_owned()))
2206    }
2207
2208    fn int(n: i64) -> Result<Value, ferriskey::Error> {
2209        Ok(Value::Int(n))
2210    }
2211
2212    fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2213        Value::Array(items)
2214    }
2215
2216    #[test]
2217    fn ok_status() {
2218        let raw = arr(vec![int(1), s("OK")]);
2219        assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2220    }
2221
2222    #[test]
2223    fn already_applied_status() {
2224        let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2225        assert_eq!(
2226            parse_report_usage_result(&raw).unwrap(),
2227            ReportUsageResult::AlreadyApplied
2228        );
2229    }
2230
2231    #[test]
2232    fn soft_breach_status() {
2233        let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2234        match parse_report_usage_result(&raw).unwrap() {
2235            ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2236                assert_eq!(dimension, "tokens");
2237                assert_eq!(current_usage, 150);
2238                assert_eq!(soft_limit, 100);
2239            }
2240            other => panic!("expected SoftBreach, got {other:?}"),
2241        }
2242    }
2243
2244    #[test]
2245    fn hard_breach_status() {
2246        let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2247        match parse_report_usage_result(&raw).unwrap() {
2248            ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2249                assert_eq!(dimension, "requests");
2250                assert_eq!(current_usage, 10001);
2251                assert_eq!(hard_limit, 10000);
2252            }
2253            other => panic!("expected HardBreach, got {other:?}"),
2254        }
2255    }
2256
2257    /// Negative case: non-Array input. Guards against a future Lua refactor
2258    /// that accidentally returns a bare string/int — the parser must fail
2259    /// loudly rather than silently succeed or panic.
2260    #[test]
2261    fn non_array_input_is_parse_error() {
2262        let raw = Value::SimpleString("OK".to_owned());
2263        let err = parse_report_usage_result(&raw).unwrap_err();
2264        let msg = format!("{err}");
2265        assert!(
2266            msg.to_lowercase().contains("expected array"),
2267            "error should mention expected shape, got: {msg}"
2268        );
2269    }
2270
2271    /// Negative case: Array whose first element isn't an Int status code.
2272    /// The Lua function's first return slot is always `status_code` (1 on
2273    /// success, an error code otherwise); a non-Int there is a wire-format
2274    /// break that must surface as a parse error.
2275    #[test]
2276    fn first_element_non_int_is_parse_error() {
2277        let raw = arr(vec![s("not_an_int"), s("OK")]);
2278        let err = parse_report_usage_result(&raw).unwrap_err();
2279        let msg = format!("{err}");
2280        assert!(
2281            msg.to_lowercase().contains("int"),
2282            "error should mention Int status code, got: {msg}"
2283        );
2284    }
2285
2286    /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2287    /// field. Guards against the silent-coercion defect cross-review
2288    /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2289    /// which would have surfaced Lua-side wire-format drift as a
2290    /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2291    /// but semantically wrong (a "breach with zero usage" is nonsense
2292    /// and masks the real error).
2293    #[test]
2294    fn soft_breach_non_numeric_current_is_parse_error() {
2295        let raw = arr(vec![
2296            int(1),
2297            s("SOFT_BREACH"),
2298            s("tokens"),
2299            s("not_a_number"), // current_usage — must fail, not coerce to 0
2300            s("100"),
2301        ]);
2302        let err = parse_report_usage_result(&raw).unwrap_err();
2303        let msg = format!("{err}");
2304        assert!(
2305            msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2306            "error should identify sub-status + field, got: {msg}"
2307        );
2308        assert!(
2309            msg.to_lowercase().contains("u64"),
2310            "error should mention the expected type (u64), got: {msg}"
2311        );
2312    }
2313
2314    /// Negative case: HARD_BREACH with the limit slot missing
2315    /// entirely. Same defence as the non-numeric test above: a
2316    /// truncated response must fail loudly rather than coerce to 0.
2317    #[test]
2318    fn hard_breach_missing_limit_is_parse_error() {
2319        let raw = arr(vec![
2320            int(1),
2321            s("HARD_BREACH"),
2322            s("requests"),
2323            s("10001"),
2324            // no index 4 — hard_limit missing
2325        ]);
2326        let err = parse_report_usage_result(&raw).unwrap_err();
2327        let msg = format!("{err}");
2328        assert!(
2329            msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2330            "error should identify sub-status + field, got: {msg}"
2331        );
2332        assert!(
2333            msg.to_lowercase().contains("missing"),
2334            "error should say 'missing', got: {msg}"
2335        );
2336    }
2337}
2338
2339
2340#[cfg(test)]
2341mod resume_signals_tests {
2342    use super::*;
2343
2344    fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2345        pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2346    }
2347
2348    #[test]
2349    fn empty_suspension_returns_none() {
2350        let susp = m(&[]);
2351        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2352        assert!(out.is_none(), "no suspension record → None");
2353    }
2354
2355    #[test]
2356    fn stale_prior_attempt_returns_none() {
2357        let wp = WaitpointId::new();
2358        let susp = m(&[
2359            ("attempt_index", "0"),
2360            ("close_reason", "resumed"),
2361            ("waitpoint_id", &wp.to_string()),
2362        ]);
2363        // Claimed attempt is 1; suspension belongs to 0 → stale.
2364        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2365        assert!(out.is_none(), "attempt_index mismatch → None");
2366    }
2367
2368    #[test]
2369    fn non_resumed_close_returns_none() {
2370        let wp = WaitpointId::new();
2371        for reason in ["timeout", "cancelled", "", "expired"] {
2372            let susp = m(&[
2373                ("attempt_index", "0"),
2374                ("close_reason", reason),
2375                ("waitpoint_id", &wp.to_string()),
2376            ]);
2377            let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2378            assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2379        }
2380    }
2381
2382    #[test]
2383    fn resumed_same_attempt_returns_waitpoint() {
2384        let wp = WaitpointId::new();
2385        let susp = m(&[
2386            ("attempt_index", "2"),
2387            ("close_reason", "resumed"),
2388            ("waitpoint_id", &wp.to_string()),
2389        ]);
2390        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2391        assert_eq!(out, Some(wp));
2392    }
2393
2394    #[test]
2395    fn malformed_waitpoint_id_is_error() {
2396        let susp = m(&[
2397            ("attempt_index", "0"),
2398            ("close_reason", "resumed"),
2399            ("waitpoint_id", "not-a-uuid"),
2400        ]);
2401        let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2402        assert!(
2403            format!("{err}").contains("not a valid UUID"),
2404            "error should mention invalid UUID, got: {err}"
2405        );
2406    }
2407
2408    #[test]
2409    fn empty_waitpoint_id_returns_none() {
2410        // Defensive: an empty waitpoint_id field (shouldn't happen on
2411        // resumed records, but guard against partial writes) is None, not an error.
2412        let susp = m(&[
2413            ("attempt_index", "0"),
2414            ("close_reason", "resumed"),
2415            ("waitpoint_id", ""),
2416        ]);
2417        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2418        assert!(out.is_none());
2419    }
2420
2421    // The previous `matched_signal_ids_from_condition` helper was
2422    // removed when the production path switched from unbounded
2423    // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2424    // feedback on unbounded condition-hash reply size). That loop
2425    // is exercised by the integration tests in `ff-test`.
2426}