Skip to main content

ff_sdk/
task.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_script::error::ScriptError;
11use ff_core::keys::{usage_dedup_key, BudgetKeyContext, ExecKeyContext, IndexKeys};
12use ff_core::partition::{budget_partition, execution_partition, PartitionConfig};
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21/// Timeout behavior when a suspension deadline is reached.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum TimeoutBehavior {
24    Fail,
25    Cancel,
26    Expire,
27    AutoResume,
28    Escalate,
29}
30
31impl TimeoutBehavior {
32    pub fn as_str(&self) -> &str {
33        match self {
34            Self::Fail => "fail",
35            Self::Cancel => "cancel",
36            Self::Expire => "expire",
37            Self::AutoResume => "auto_resume_with_timeout_signal",
38            Self::Escalate => "escalate",
39        }
40    }
41}
42
43impl std::fmt::Display for TimeoutBehavior {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.write_str(self.as_str())
46    }
47}
48
49impl std::str::FromStr for TimeoutBehavior {
50    type Err = String;
51
52    fn from_str(s: &str) -> Result<Self, Self::Err> {
53        match s {
54            "fail" => Ok(Self::Fail),
55            "cancel" => Ok(Self::Cancel),
56            "expire" => Ok(Self::Expire),
57            "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
58            "escalate" => Ok(Self::Escalate),
59            other => Err(format!("unknown timeout behavior: {other}")),
60        }
61    }
62}
63
64/// A condition matcher for the resume condition.
65#[derive(Clone, Debug)]
66pub struct ConditionMatcher {
67    /// Signal name to match (empty = wildcard).
68    pub signal_name: String,
69}
70
71/// Outcome of a `suspend()` call.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub enum SuspendOutcome {
74    /// Execution is now suspended, waitpoint active.
75    Suspended {
76        suspension_id: SuspensionId,
77        waitpoint_id: WaitpointId,
78        waitpoint_key: String,
79        /// HMAC token that signal deliverers must supply to this waitpoint.
80        waitpoint_token: WaitpointToken,
81    },
82    /// Buffered signals already satisfied the condition — suspension skipped.
83    /// The caller still holds the lease.
84    AlreadySatisfied {
85        suspension_id: SuspensionId,
86        waitpoint_id: WaitpointId,
87        waitpoint_key: String,
88        waitpoint_token: WaitpointToken,
89    },
90}
91
92/// A signal to deliver to a suspended execution's waitpoint.
93#[derive(Clone, Debug)]
94pub struct Signal {
95    pub signal_name: String,
96    pub signal_category: String,
97    pub payload: Option<Vec<u8>>,
98    pub source_type: String,
99    pub source_identity: String,
100    pub idempotency_key: Option<String>,
101    /// HMAC token issued when the waitpoint was created. Required for
102    /// authenticated signal delivery (RFC-004 §Waitpoint Security).
103    pub waitpoint_token: WaitpointToken,
104}
105
106/// Outcome of `deliver_signal()`.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum SignalOutcome {
109    /// Signal accepted, appended to waitpoint but condition not yet satisfied.
110    Accepted { signal_id: SignalId, effect: String },
111    /// Signal triggered resume — execution is now runnable.
112    TriggeredResume { signal_id: SignalId },
113    /// Duplicate signal (idempotency key matched).
114    Duplicate { existing_signal_id: String },
115}
116
117impl SignalOutcome {
118    /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
119    ///
120    /// Consuming packages that call `ff_deliver_signal` directly can use this
121    /// to interpret the Lua return value without depending on SDK internals.
122    pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
123        parse_signal_result(raw)
124    }
125}
126
127/// A signal that triggered a resume, readable by a worker after re-claim.
128///
129/// **RFC-012 Stage 0:** the canonical definition has moved to
130/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
131/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
132/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
133/// scheduled for removal in 0.5.0.
134pub use ff_core::backend::ResumeSignal;
135
136/// Outcome of `append_frame()`.
137#[derive(Clone, Debug)]
138pub struct AppendFrameOutcome {
139    /// Valkey Stream entry ID assigned to this frame.
140    pub stream_id: String,
141    /// Total frame count in the stream after this append.
142    pub frame_count: u64,
143}
144
145/// Outcome of a `fail()` call.
146///
147/// **RFC-012 Stage 1a:** canonical definition moved to
148/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
149/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
150/// re-export in `lib.rs`) through the 0.4.x window. Existing
151/// consumers construct + match exhaustively without change.
152pub use ff_core::backend::FailOutcome;
153
154/// A claimed execution with an active lease. The worker processes this task
155/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
156///
157/// The lease is automatically renewed in the background at `lease_ttl / 3`
158/// intervals. Renewal stops when the task is consumed or dropped.
159///
160/// `complete`, `fail`, and `cancel` consume `self` — this prevents
161/// double-complete bugs at the type level.
162pub struct ClaimedTask {
163    /// Shared Valkey client.
164    client: Client,
165    /// `EngineBackend` the trait-migrated ops forward through.
166    ///
167    /// **RFC-012 Stage 1b.** Today this is always a `ValkeyBackend`
168    /// wrapping `client`. 8 of 12 ops now route through
169    /// `backend.*()`; the remaining 4 (`create_pending_waitpoint`,
170    /// `append_frame`, `suspend`, `report_usage`) still use
171    /// `client.fcall(...)` directly — see issue #117 for the
172    /// trait-shape alignment that unblocks them.
173    ///
174    /// Stage 1c will migrate the FlowFabricWorker hot paths (claim,
175    /// deliver_signal) through the same trait surface; Stage 1d
176    /// refactors this struct to carry a single `Handle` rather than
177    /// synthesising one per op via `synth_handle`.
178    backend: Arc<dyn EngineBackend>,
179    /// Partition config for key construction.
180    partition_config: PartitionConfig,
181    /// Execution identity.
182    execution_id: ExecutionId,
183    /// Current attempt.
184    attempt_index: AttemptIndex,
185    attempt_id: AttemptId,
186    /// Lease identity.
187    lease_id: LeaseId,
188    lease_epoch: LeaseEpoch,
189    /// Lease timing.
190    lease_ttl_ms: u64,
191    /// Lane used at claim time.
192    lane_id: LaneId,
193    /// Worker instance that holds this lease (for index cleanup keys).
194    worker_instance_id: WorkerInstanceId,
195    /// Execution data.
196    input_payload: Vec<u8>,
197    execution_kind: String,
198    tags: HashMap<String, String>,
199    /// Background renewal task handle.
200    renewal_handle: JoinHandle<()>,
201    /// Signal to stop renewal (used before consuming self).
202    renewal_stop: Arc<Notify>,
203    /// Consecutive lease renewal failures. Shared with the background renewal
204    /// task. Reset to 0 on each successful renewal. Workers should check
205    /// `is_lease_healthy()` before committing expensive side effects.
206    renewal_failures: Arc<AtomicU32>,
207    /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
208    /// response is received, just before `self` is consumed into `Drop`.
209    /// `Drop` reads this instead of `renewal_handle.is_finished()` to
210    /// suppress the false-positive "dropped without terminal operation"
211    /// warning: after `notify_one`, the renewal task has not yet been
212    /// polled by the runtime, so `is_finished()` is still `false` on the
213    /// happy path when self is being consumed.
214    ///
215    /// Note: the flag is set for any terminal-op path that reaches
216    /// `stop_renewal()`, which includes Lua-level script errors (the
217    /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
218    /// the caller already receives the `Err` via the op's return value,
219    /// so an additional `Drop` warning would be noise. The warning is
220    /// reserved for genuine drop-without-terminal-op cases (panic, early
221    /// return, transport failure before stop_renewal ran).
222    terminal_op_called: AtomicBool,
223    /// Concurrency permit from the worker's semaphore. Held for the lifetime
224    /// of the task; released on complete/fail/cancel/drop.
225    _concurrency_permit: Option<OwnedSemaphorePermit>,
226}
227
228impl ClaimedTask {
229    /// Construct a `ClaimedTask` from the results of a successful
230    /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
231    ///
232    /// # Arguments
233    ///
234    /// * `client` — shared Valkey client used for subsequent lease
235    ///   renewals, signal delivery, and the final
236    ///   complete/fail/cancel FCALL.
237    /// * `partition_config` — partition topology snapshot read at
238    ///   `FlowFabricWorker::connect`. Used for key construction on
239    ///   the lifetime of this task.
240    /// * `execution_id` — the claimed execution's UUID.
241    /// * `attempt_index` / `attempt_id` — current attempt identity.
242    ///   `attempt_index` is 0 on a fresh claim, preserved on a
243    ///   resumed claim.
244    /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
245    ///   is returned by the Lua FCALL and bumped on each resumed
246    ///   claim.
247    /// * `lease_ttl_ms` — lease TTL used to schedule the background
248    ///   renewal task (renews at `lease_ttl_ms / 3`).
249    /// * `lane_id` — the lane the task was claimed on. Used for
250    ///   index key construction (`lane_active`, `lease_expiry`,
251    ///   etc.).
252    /// * `worker_instance_id` — this worker's identity. Used to
253    ///   resolve `worker_leases` index entries during renewal and
254    ///   completion.
255    /// * `input_payload` / `execution_kind` / `tags` — pre-read
256    ///   execution metadata, exposed via getters so the worker
257    ///   doesn't round-trip to Valkey to inspect what it just
258    ///   claimed.
259    ///
260    /// # Invariant: constructor is `pub(crate)` on purpose
261    ///
262    /// **External callers cannot construct a `ClaimedTask`** — only
263    /// the in-crate claim entry points (`claim_next`,
264    /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
265    /// load-bearing: those entry points are the ONLY sites that
266    /// acquire a permit from the worker's concurrency semaphore
267    /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
268    /// it through `ClaimedTask::set_concurrency_permit` before
269    /// returning the task. Promoting `new` to `pub` would let
270    /// consumers build tasks that bypass the concurrency contract
271    /// the worker's `max_concurrent_tasks` config advertises — the
272    /// returned task would run alongside other in-flight work
273    /// without debiting the permit bank, and the
274    /// complete/fail/cancel/drop path would have nothing to
275    /// release.
276    ///
277    /// If an external callsite genuinely needs to rehydrate a
278    /// task from saved FCALL results, the right answer is a new
279    /// `FlowFabricWorker` entry point that wraps `new` + acquires a
280    /// permit — not promoting this constructor.
281    #[allow(clippy::too_many_arguments)]
282    pub(crate) fn new(
283        client: Client,
284        backend: Arc<dyn EngineBackend>,
285        partition_config: PartitionConfig,
286        execution_id: ExecutionId,
287        attempt_index: AttemptIndex,
288        attempt_id: AttemptId,
289        lease_id: LeaseId,
290        lease_epoch: LeaseEpoch,
291        lease_ttl_ms: u64,
292        lane_id: LaneId,
293        worker_instance_id: WorkerInstanceId,
294        input_payload: Vec<u8>,
295        execution_kind: String,
296        tags: HashMap<String, String>,
297    ) -> Self {
298        let renewal_stop = Arc::new(Notify::new());
299        let renewal_failures = Arc::new(AtomicU32::new(0));
300
301        // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
302        // Build the handle once at construction time and hand both Arc clones
303        // into the spawned task.
304        let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
305            execution_id.clone(),
306            attempt_index,
307            attempt_id.clone(),
308            lease_id.clone(),
309            lease_epoch,
310            lease_ttl_ms,
311            lane_id.clone(),
312            worker_instance_id.clone(),
313            HandleKind::Fresh,
314        );
315        let renewal_handle = spawn_renewal_task(
316            backend.clone(),
317            renewal_handle_cookie,
318            execution_id.clone(),
319            lease_ttl_ms,
320            renewal_stop.clone(),
321            renewal_failures.clone(),
322        );
323
324        Self {
325            client,
326            backend,
327            partition_config,
328            execution_id,
329            attempt_index,
330            attempt_id,
331            lease_id,
332            lease_epoch,
333            lease_ttl_ms,
334            lane_id,
335            worker_instance_id,
336            input_payload,
337            execution_kind,
338            tags,
339            renewal_handle,
340            renewal_stop,
341            renewal_failures,
342            terminal_op_called: AtomicBool::new(false),
343            _concurrency_permit: None,
344        }
345    }
346
347    /// Attach a concurrency permit from the worker's semaphore.
348    /// The permit is held for the task's lifetime and released on drop.
349    #[allow(dead_code)]
350    pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
351        self._concurrency_permit = Some(permit);
352    }
353
354    // ── Accessors ──
355
356    pub fn execution_id(&self) -> &ExecutionId {
357        &self.execution_id
358    }
359
360    pub fn attempt_index(&self) -> AttemptIndex {
361        self.attempt_index
362    }
363
364    pub fn attempt_id(&self) -> &AttemptId {
365        &self.attempt_id
366    }
367
368    pub fn lease_id(&self) -> &LeaseId {
369        &self.lease_id
370    }
371
372    pub fn lease_epoch(&self) -> LeaseEpoch {
373        self.lease_epoch
374    }
375
376    pub fn input_payload(&self) -> &[u8] {
377        &self.input_payload
378    }
379
380    pub fn execution_kind(&self) -> &str {
381        &self.execution_kind
382    }
383
384    pub fn tags(&self) -> &HashMap<String, String> {
385        &self.tags
386    }
387
388    pub fn lane_id(&self) -> &LaneId {
389        &self.lane_id
390    }
391
392    /// Synthesise a Valkey-backend `Handle` encoding this task's
393    /// attempt-cookie fields. Private to the SDK — the trait
394    /// forwarders call this per op to produce the `&Handle` argument
395    /// the `EngineBackend` methods take.
396    ///
397    /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
398    /// carry one cached `Handle` instead of synthesising per op is
399    /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
400    /// a single allocation of a small byte buffer — negligible
401    /// compared to the FCALL round-trip that follows.
402    ///
403    /// Kind is always `HandleKind::Fresh` because today the 9
404    /// Stage-1b ops all treat the backend tag + opaque bytes as
405    /// authoritative; they do not dispatch on kind. Stage 1d routes
406    /// resumed-claim callers through a `Resumed` synth.
407    fn synth_handle(&self) -> Handle {
408        ff_backend_valkey::ValkeyBackend::encode_handle(
409            self.execution_id.clone(),
410            self.attempt_index,
411            self.attempt_id.clone(),
412            self.lease_id.clone(),
413            self.lease_epoch,
414            self.lease_ttl_ms,
415            self.lane_id.clone(),
416            self.worker_instance_id.clone(),
417            HandleKind::Fresh,
418        )
419    }
420
421    /// Check if the lease is likely still valid based on renewal success.
422    ///
423    /// Returns `false` if 3 or more consecutive renewal attempts have failed.
424    /// Workers should check this before committing expensive or irreversible
425    /// side effects. A `false` return means Valkey may have already expired
426    /// the lease and another worker could be processing this execution.
427    pub fn is_lease_healthy(&self) -> bool {
428        self.renewal_failures.load(Ordering::Relaxed) < 3
429    }
430
431    /// Number of consecutive lease renewal failures since the last success.
432    ///
433    /// Returns 0 when renewals are working normally. Useful for observability
434    /// and custom health policies beyond the default threshold of 3.
435    pub fn consecutive_renewal_failures(&self) -> u32 {
436        self.renewal_failures.load(Ordering::Relaxed)
437    }
438
439    // ── Terminal operations (consume self) ──
440
441    /// Delay the execution until `delay_until`.
442    ///
443    /// Releases the lease. The execution moves to `delayed` state.
444    /// Consumes self — the task cannot be used after delay.
445    pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
446        // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
447        // the pre-migration `stop_renewal` contract: called only when
448        // the FCALL round-tripped (Ok or a typed Lua/engine error);
449        // raw transport errors bubble up without stopping renewal so
450        // the `Drop` warning still fires if the caller later drops
451        // `self` without retrying.
452        let handle = self.synth_handle();
453        let out = self.backend.delay(&handle, delay_until).await;
454        if fcall_landed(&out) {
455            self.stop_renewal();
456        }
457        out.map_err(SdkError::from)
458    }
459
460    /// Move execution to waiting_children state.
461    ///
462    /// Releases the lease. The execution waits for child dependencies to complete.
463    /// Consumes self.
464    pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
465        // RFC-012 Stage 1b: forwards through `backend.wait_children`.
466        // See `delay_execution` for the stop_renewal contract.
467        let handle = self.synth_handle();
468        let out = self.backend.wait_children(&handle).await;
469        if fcall_landed(&out) {
470            self.stop_renewal();
471        }
472        out.map_err(SdkError::from)
473    }
474
475    /// Complete the execution successfully.
476    ///
477    /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
478    /// Renewal continues during the FCALL to prevent lease expiry under
479    /// network latency — the Lua fences on lease_id+epoch atomically.
480    /// Consumes self — the task cannot be used after completion.
481    ///
482    /// # Connection errors and replay
483    ///
484    /// If the Valkey connection drops after the Lua commit but before the
485    /// client reads the response, retrying `complete()` with the same
486    /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
487    /// reconciles the "already terminal with matching outcome" response
488    /// into `Ok(())`. A retry after a different terminal op has raced in
489    /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
490    /// populated `terminal_outcome` so the caller can see what actually
491    /// happened.
492    pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
493        // RFC-012 Stage 1b: forwards through `backend.complete`.
494        // The FCALL body + the `ExecutionNotActive` replay reconciliation
495        // (match same epoch + attempt_id + outcome=="success" → Ok)
496        // moved to `ff_backend_valkey::complete_impl`.
497        let handle = self.synth_handle();
498        let out = self.backend.complete(&handle, result_payload).await;
499        if fcall_landed(&out) {
500            self.stop_renewal();
501        }
502        out.map_err(SdkError::from)
503    }
504
505    /// Fail the execution with a reason and error category.
506    ///
507    /// If the execution policy allows retries, the engine schedules a retry
508    /// (delayed backoff). Otherwise, the execution becomes terminal failed.
509    /// Returns [`FailOutcome`] so the caller knows what happened.
510    ///
511    /// # Connection errors and replay
512    ///
513    /// `fail()` is replay-safe under the same conditions as `complete()`:
514    /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
515    /// reconciled into the outcome the server actually committed
516    /// (`TerminalFailed` if no retries left; `RetryScheduled` with
517    /// `delay_until = 0` if a retry was scheduled — the exact delay is
518    /// not recovered on the replay path).
519    pub async fn fail(
520        self,
521        reason: &str,
522        error_category: &str,
523    ) -> Result<FailOutcome, SdkError> {
524        // RFC-012 Stage 1b: forwards through `backend.fail`.
525        // The FCALL body + retry-policy read + the two-shape replay
526        // reconciliation (terminal/failed → TerminalFailed, runnable
527        // → RetryScheduled{0}) moved to
528        // `ff_backend_valkey::fail_impl`.
529        //
530        // **Preserving the caller's raw category string.** The
531        // pre-Stage-1b code passed `error_category` straight to the
532        // Lua `failure_category` ARGV. Real callers use arbitrary
533        // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
534        // `"inference_error"`, …) that do not correspond to the 5
535        // named `FailureClass` variants. A lossy enum conversion
536        // would rewrite every non-canonical category to
537        // `Transient` — a real behavior change for
538        // ff-readiness-tests + the examples crates.
539        //
540        // Instead: pack the raw category bytes into
541        // `FailureReason.detail`; `fail_impl` reads them back and
542        // threads them to Lua verbatim. The `classification` enum
543        // is derived from the string for the few consumers who
544        // match on the trait return today (none in-workspace).
545        // Stage 1d (or issue #117) widens `FailureClass` with a
546        // `Custom(String)` arm; this stash carrier retires then.
547        let handle = self.synth_handle();
548        let failure_reason = ff_core::backend::FailureReason::with_detail(
549            reason.to_owned(),
550            error_category.as_bytes().to_vec(),
551        );
552        let classification = error_category_to_class(error_category);
553        let out = self.backend.fail(&handle, failure_reason, classification).await;
554        if fcall_landed(&out) {
555            self.stop_renewal();
556        }
557        out.map_err(SdkError::from)
558    }
559
560    /// Cancel the execution.
561    ///
562    /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
563    /// Consumes self.
564    ///
565    /// # Connection errors and replay
566    ///
567    /// `cancel()` is replay-safe under the same conditions as `complete()`:
568    /// a retry by the same caller matching `lease_epoch` + `attempt_id`
569    /// returns `Ok(())` if the server's stored `terminal_outcome` is
570    /// `cancelled`. A retry that finds a different outcome (because a
571    /// concurrent `complete()` or `fail()` won the race) surfaces as
572    /// `ExecutionNotActive` with the populated `terminal_outcome` so the
573    /// caller can see that the cancel intent was NOT honored.
574    pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
575        // RFC-012 Stage 1b: forwards through `backend.cancel`.
576        // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
577        // and the `ExecutionNotActive` → outcome=="cancelled" replay
578        // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
579        let handle = self.synth_handle();
580        let out = self.backend.cancel(&handle, reason).await;
581        if fcall_landed(&out) {
582            self.stop_renewal();
583        }
584        out.map_err(SdkError::from)
585    }
586
587    // ── Non-terminal operations ──
588
589    /// Manually renew the lease.
590    ///
591    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
592    /// trait. The background renewal task calls
593    /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
594    /// this method is the public entry point for workers that want
595    /// to force a renew out-of-band.
596    pub async fn renew_lease(&self) -> Result<(), SdkError> {
597        let handle = self.synth_handle();
598        self.backend
599            .renew(&handle)
600            .await
601            .map(|_renewal| ())
602            .map_err(SdkError::from)
603    }
604
605    /// Update progress (pct 0-100 and optional message).
606    ///
607    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
608    /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
609    pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
610        let handle = self.synth_handle();
611        self.backend
612            .progress(&handle, Some(pct), Some(message.to_owned()))
613            .await
614            .map_err(SdkError::from)
615    }
616
617    /// Report usage against a budget and check limits.
618    ///
619    /// Non-consuming — the worker can report usage multiple times.
620    /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
621    /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
622    // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b Tranche 3.
623    // `AdmissionDecision` (trait return) cannot losslessly encode
624    // `ReportUsageResult::{SoftBreach, HardBreach, AlreadyApplied}`
625    // — `SoftBreach` is a warning (not a rejection) and has no home
626    // in `Admitted / Throttled / Rejected`, and `HardBreach`'s
627    // structured dim/current/limit fields collapse to `Rejected { reason: String }`.
628    // Tracked in #117.
629    pub async fn report_usage(
630        &self,
631        budget_id: &BudgetId,
632        dimensions: &[(&str, u64)],
633        dedup_key: Option<&str>,
634    ) -> Result<ReportUsageResult, SdkError> {
635        let partition = budget_partition(budget_id, &self.partition_config);
636        let bctx = BudgetKeyContext::new(&partition, budget_id);
637
638        // KEYS (3): budget_usage, budget_limits, budget_def
639        let keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
640
641        // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
642        let now = TimestampMs::now();
643        let dim_count = dimensions.len();
644        let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
645        argv.push(dim_count.to_string());
646        for (dim, _) in dimensions {
647            argv.push((*dim).to_string());
648        }
649        for (_, delta) in dimensions {
650            argv.push(delta.to_string());
651        }
652        argv.push(now.to_string());
653        let dedup_key_val = dedup_key
654            .filter(|k| !k.is_empty())
655            .map(|k| usage_dedup_key(bctx.hash_tag(), k))
656            .unwrap_or_default();
657        argv.push(dedup_key_val);
658
659        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
660        let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
661
662        let raw: Value = self
663            .client
664            .fcall("ff_report_usage_and_check", &key_refs, &argv_refs)
665            .await
666            .map_err(SdkError::Valkey)?;
667
668        parse_report_usage_result(&raw)
669    }
670
671    /// Create a pending waitpoint for future signal delivery.
672    ///
673    /// Non-consuming — the worker keeps the lease. Signals delivered to the
674    /// waitpoint are buffered. When the worker later calls `suspend()` with
675    /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
676    /// resume condition.
677    ///
678    /// Returns both the waitpoint_id AND the HMAC token required by external
679    /// callers to buffer signals against this pending waitpoint
680    /// (RFC-004 §Waitpoint Security).
681    // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. The
682    // `EngineBackend` trait has no `create_pending_waitpoint` slot —
683    // RFC-012 §3.1 inventory does not list this op today. Needs
684    // either a new trait method or a reshape of `suspend` that covers
685    // the lease-retaining flow. Tracked in #117.
686    pub async fn create_pending_waitpoint(
687        &self,
688        waitpoint_key: &str,
689        expires_in_ms: u64,
690    ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
691        let partition = execution_partition(&self.execution_id, &self.partition_config);
692        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
693        let idx = IndexKeys::new(&partition);
694
695        let waitpoint_id = WaitpointId::new();
696        let expires_at = TimestampMs::from_millis(TimestampMs::now().0 + expires_in_ms as i64);
697
698        // KEYS (4): exec_core, waitpoint_hash, pending_wp_expiry_zset, hmac_secrets
699        let keys: Vec<String> = vec![
700            ctx.core(),
701            ctx.waitpoint(&waitpoint_id),
702            idx.pending_waitpoint_expiry(),
703            idx.waitpoint_hmac_secrets(),
704        ];
705
706        // ARGV (5): execution_id, attempt_index, waitpoint_id, waitpoint_key, expires_at
707        let args: Vec<String> = vec![
708            self.execution_id.to_string(),
709            self.attempt_index.to_string(),
710            waitpoint_id.to_string(),
711            waitpoint_key.to_owned(),
712            expires_at.to_string(),
713        ];
714
715        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
716        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
717
718        let raw: Value = self
719            .client
720            .fcall("ff_create_pending_waitpoint", &key_refs, &arg_refs)
721            .await
722            .map_err(SdkError::Valkey)?;
723
724        // Response: {1, "OK", waitpoint_id, waitpoint_key, waitpoint_token}.
725        // Extract the token (the waitpoint_id is the one we generated locally).
726        let token = extract_pending_waitpoint_token(&raw)?;
727        Ok((waitpoint_id, token))
728    }
729
730    // ── Phase 4: Streaming ──
731
732    /// Append a frame to the current attempt's output stream.
733    ///
734    /// Non-consuming — the worker can append many frames during execution.
735    /// The stream is created lazily on the first append.
736    // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
737    // method `append_frame` returns `()`; this SDK method returns
738    // `AppendFrameOutcome { stream_id, frame_count }` — the return-type
739    // delta is a hard ABI break on the Stage-1a trait. Tracked in #117.
740    pub async fn append_frame(
741        &self,
742        frame_type: &str,
743        payload: &[u8],
744        metadata: Option<&str>,
745    ) -> Result<AppendFrameOutcome, SdkError> {
746        let partition = execution_partition(&self.execution_id, &self.partition_config);
747        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
748
749        let now = TimestampMs::now();
750
751        // KEYS (3): exec_core, stream_key, stream_meta
752        let keys: Vec<String> = vec![
753            ctx.core(),
754            ctx.stream(self.attempt_index),
755            ctx.stream_meta(self.attempt_index),
756        ];
757
758        let payload_str = String::from_utf8_lossy(payload);
759
760        // ARGV (13): execution_id, attempt_index, lease_id, lease_epoch,
761        //            frame_type, ts, payload, encoding, correlation_id,
762        //            source, retention_maxlen, attempt_id, max_payload_bytes
763        let args: Vec<String> = vec![
764            self.execution_id.to_string(),     // 1
765            self.attempt_index.to_string(),    // 2
766            self.lease_id.to_string(),         // 3
767            self.lease_epoch.to_string(),      // 4
768            frame_type.to_owned(),             // 5
769            now.to_string(),                   // 6 ts
770            payload_str.into_owned(),          // 7
771            "utf8".to_owned(),                 // 8 encoding
772            metadata.unwrap_or("").to_owned(), // 9 correlation_id
773            "worker".to_owned(),               // 10 source
774            "10000".to_owned(),                // 11 retention_maxlen
775            self.attempt_id.to_string(),       // 12
776            "65536".to_owned(),                // 13 max_payload_bytes
777        ];
778
779        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
780        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
781
782        let raw: Value = self
783            .client
784            .fcall("ff_append_frame", &key_refs, &arg_refs)
785            .await
786            .map_err(SdkError::Valkey)?;
787
788        parse_append_frame_result(&raw)
789    }
790
791    // ── Phase 3: Suspend ──
792
793    /// Suspend the execution, releasing the lease and creating a waitpoint.
794    ///
795    /// The execution transitions to `suspended` and the worker loses ownership.
796    /// An external signal matching the condition will resume the execution.
797    ///
798    /// If `condition_matchers` is empty, a wildcard matcher is created that
799    /// matches ANY signal name. To require an explicit operator resume with
800    /// no signal match, pass a sentinel name that no real signal will use.
801    ///
802    /// If buffered signals on a pending waitpoint already satisfy the condition,
803    /// returns `AlreadySatisfied` and the lease is NOT released.
804    ///
805    /// Consumes self — the task cannot be used after suspension.
806    // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
807    // method `suspend` returns `Handle` (a resumed-kind attempt
808    // cookie); this SDK method returns `SuspendOutcome { suspension_id,
809    // waitpoint_id, waitpoint_key, waitpoint_token }` — semantically
810    // distinct return shapes. Also: SDK takes `&[ConditionMatcher]` +
811    // `TimeoutBehavior` with no `WaitpointSpec` mapping. Tracked in #117.
812    pub async fn suspend(
813        self,
814        reason_code: &str,
815        condition_matchers: &[ConditionMatcher],
816        timeout_ms: Option<u64>,
817        timeout_behavior: TimeoutBehavior,
818    ) -> Result<SuspendOutcome, SdkError> {
819        let partition = execution_partition(&self.execution_id, &self.partition_config);
820        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
821        let idx = IndexKeys::new(&partition);
822
823        let suspension_id = SuspensionId::new();
824        let waitpoint_id = WaitpointId::new();
825        // For now, use a simple opaque key (real implementation would use HMAC token)
826        let waitpoint_key = format!("wpk:{}", waitpoint_id);
827
828        let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
829
830        // Build resume condition JSON
831        let required_signal_names: Vec<&str> = condition_matchers
832            .iter()
833            .map(|m| m.signal_name.as_str())
834            .collect();
835        let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
836        let resume_condition_json = serde_json::json!({
837            "condition_type": "signal_set",
838            "required_signal_names": required_signal_names,
839            "signal_match_mode": match_mode,
840            "minimum_signal_count": 1,
841            "timeout_behavior": timeout_behavior.as_str(),
842            "allow_operator_override": true,
843        }).to_string();
844
845        let resume_policy_json = serde_json::json!({
846            "resume_target": "runnable",
847            "close_waitpoint_on_resume": true,
848            "consume_matched_signals": true,
849            "retain_signal_buffer_until_closed": true,
850        }).to_string();
851
852        // KEYS (17): exec_core, attempt_record, lease_current, lease_history,
853        //            lease_expiry, worker_leases, suspension_current, waitpoint_hash,
854        //            waitpoint_signals, suspension_timeout, pending_wp_expiry,
855        //            active_index, suspended_index, waitpoint_history, wp_condition,
856        //            attempt_timeout, hmac_secrets
857        let keys: Vec<String> = vec![
858            ctx.core(),                                          // 1
859            ctx.attempt_hash(self.attempt_index),                // 2
860            ctx.lease_current(),                                 // 3
861            ctx.lease_history(),                                 // 4
862            idx.lease_expiry(),                                  // 5
863            idx.worker_leases(&self.worker_instance_id),         // 6
864            ctx.suspension_current(),                            // 7
865            ctx.waitpoint(&waitpoint_id),                        // 8
866            ctx.waitpoint_signals(&waitpoint_id),                // 9
867            idx.suspension_timeout(),                            // 10
868            idx.pending_waitpoint_expiry(),                      // 11
869            idx.lane_active(&self.lane_id),                      // 12
870            idx.lane_suspended(&self.lane_id),                   // 13
871            ctx.waitpoints(),                                    // 14
872            ctx.waitpoint_condition(&waitpoint_id),              // 15
873            idx.attempt_timeout(),                               // 16
874            idx.waitpoint_hmac_secrets(),                        // 17
875        ];
876
877        // ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
878        //            lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
879        //            reason_code, requested_by, timeout_at, resume_condition_json,
880        //            resume_policy_json, continuation_metadata_pointer,
881        //            use_pending_waitpoint, timeout_behavior, lease_history_maxlen
882        let args: Vec<String> = vec![
883            self.execution_id.to_string(),                          // 1
884            self.attempt_index.to_string(),                         // 2
885            self.attempt_id.to_string(),                            // 3
886            self.lease_id.to_string(),                              // 4
887            self.lease_epoch.to_string(),                           // 5
888            suspension_id.to_string(),                              // 6
889            waitpoint_id.to_string(),                               // 7
890            waitpoint_key.clone(),                                  // 8
891            reason_code.to_owned(),                                 // 9
892            "worker".to_owned(),                                    // 10
893            timeout_at.map_or(String::new(), |t| t.to_string()),   // 11
894            resume_condition_json,                                  // 12
895            resume_policy_json,                                     // 13
896            String::new(),                                          // 14 continuation_metadata_ptr
897            String::new(),                                          // 15 use_pending_waitpoint
898            timeout_behavior.as_str().to_owned(),                   // 16
899            "1000".to_owned(),                                      // 17 lease_history_maxlen
900        ];
901
902        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
903        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
904
905        let raw: Value = self
906            .client
907            .fcall("ff_suspend_execution", &key_refs, &arg_refs)
908            .await
909            .map_err(SdkError::Valkey)?;
910
911        self.stop_renewal();
912        parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
913    }
914
915    /// Read the signals that satisfied the waitpoint and triggered this
916    /// resume.
917    ///
918    /// Non-consuming. Intended to be called immediately after re-claim via
919    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
920    /// subsequent `suspend()` (which replaces `suspension:current`).
921    ///
922    /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
923    ///
924    /// - No prior suspension on this execution.
925    /// - The prior suspension belonged to an earlier attempt (e.g. the
926    ///   attempt was cancelled/failed and a retry is now claiming).
927    /// - The prior suspension was closed by timeout / cancel / operator
928    ///   override rather than by a matched signal.
929    ///
930    /// Reads `suspension:current` once, filters by `attempt_index` to
931    /// guard against stale prior-attempt records, then fetches the matched
932    /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
933    /// fields and reads each signal's metadata + payload directly.
934    pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
935        // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
936        // Pre-migration body (HGETALL suspension_current + HMGET
937        // matchers + pipelined HGETALL signal_hash / GET
938        // signal_payload) lives in
939        // `ff_backend_valkey::observe_signals_impl`.
940        let handle = self.synth_handle();
941        self.backend
942            .observe_signals(&handle)
943            .await
944            .map_err(SdkError::from)
945    }
946
947    /// Signal the renewal task to stop. Called by every terminal op
948    /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
949    /// `move_to_waiting_children`) after the FCALL returns. Also marks
950    /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
951    /// consumption from a genuine drop-without-terminal-op.
952    fn stop_renewal(&self) {
953        self.terminal_op_called.store(true, Ordering::Release);
954        self.renewal_stop.notify_one();
955    }
956
957}
958
959/// True iff the backend's FCALL result represents a round-trip that
960/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
961/// contention, conflict, state, bug) all count as "landed" — the
962/// server either committed or rejected with a typed response. Only
963/// raw `Transport` errors (connection drops, request timeouts, parse
964/// failures) count as "did not land", which matches the pre-Stage-1b
965/// SDK's `fcall(...).await.map_err(SdkError::Valkey)?` short-circuit
966/// — those errors returned before `stop_renewal()` ran, preserving
967/// the `Drop` warning for genuine "lease will leak" cases.
968///
969/// Stage 1b terminal-op forwarders use this predicate to decide
970/// whether to call `stop_renewal()`: yes for landed responses, no
971/// for transport errors so the caller's retry path still sees a
972/// running renewal task.
973fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
974    match r {
975        Ok(_) => true,
976        Err(crate::EngineError::Transport { .. }) => false,
977        Err(_) => true,
978    }
979}
980
981/// Map the SDK's free-form `error_category: &str` to the typed
982/// `FailureClass` the trait's `fail` method takes. Unknown categories
983/// fall through to `Transient` — the Lua side already tolerated
984/// arbitrary strings, so the worst a category drift does under the
985/// Stage 1b forwarder is reclassify a novel category as transient.
986/// Stage 1d (or issue #117) widens `FailureClass` with a
987/// `Custom(String)` arm for exact round-trip.
988fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
989    use ff_core::backend::FailureClass;
990    match s {
991        "transient" => FailureClass::Transient,
992        "permanent" => FailureClass::Permanent,
993        "infra_crash" => FailureClass::InfraCrash,
994        "timeout" => FailureClass::Timeout,
995        "cancelled" => FailureClass::Cancelled,
996        _ => FailureClass::Transient,
997    }
998}
999
1000impl Drop for ClaimedTask {
1001    fn drop(&mut self) {
1002        // Abort the background renewal task on drop.
1003        // This is a safety net — complete/fail/cancel already stop renewal
1004        // via notify before consuming self. But if the task is dropped
1005        // without being consumed (e.g., panic), abort prevents leaked renewals.
1006        //
1007        // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1008        // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1009        // and then self is consumed into Drop immediately. The renewal task
1010        // has not yet been polled by the runtime, so `is_finished()` is still
1011        // `false` here — which previously fired the warning on every
1012        // complete/fail/cancel/suspend call. `terminal_op_called` is the
1013        // authoritative signal that a terminal-op path ran to the point of
1014        // stopping renewal; it does not by itself certify the Lua side
1015        // succeeded (see the field doc). The caller surfaces any error via
1016        // the op's return value, so a `Drop` warning is unneeded there.
1017        if !self.terminal_op_called.load(Ordering::Acquire) {
1018            tracing::warn!(
1019                execution_id = %self.execution_id,
1020                "ClaimedTask dropped without terminal operation — lease will expire"
1021            );
1022        }
1023        self.renewal_handle.abort();
1024    }
1025}
1026
1027// ── Lease renewal ──
1028
1029/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1030/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1031/// hooks still see one span per renewal (restores PR #119 Cursor
1032/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1033/// construction time, not per-tick).
1034///
1035/// See `benches/harness/src/bin/long_running.rs` for the bench
1036/// consumer that depends on this span naming.
1037#[tracing::instrument(
1038    name = "renew_lease",
1039    skip_all,
1040    fields(execution_id = %execution_id)
1041)]
1042async fn renew_once(
1043    backend: &dyn EngineBackend,
1044    handle: &Handle,
1045    execution_id: &ExecutionId,
1046) -> Result<(), crate::EngineError> {
1047    backend.renew(handle).await.map(|_| ())
1048}
1049
1050/// Spawn a background tokio task that renews the lease at `ttl / 3`
1051/// intervals.
1052///
1053/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1054/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1055/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1056/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1057/// + the encoded `Handle` instead. Per-tick tracing lives on
1058///   [`renew_once`]; this function itself is sync + one-shot.
1059///
1060/// Stops when:
1061/// - `stop_signal` is notified (complete/fail/cancel called)
1062/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1063/// - The task handle is aborted (ClaimedTask dropped)
1064fn spawn_renewal_task(
1065    backend: Arc<dyn EngineBackend>,
1066    handle: Handle,
1067    execution_id: ExecutionId,
1068    lease_ttl_ms: u64,
1069    stop_signal: Arc<Notify>,
1070    failure_counter: Arc<AtomicU32>,
1071) -> JoinHandle<()> {
1072    // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1073    // panics if a caller (or a misconfigured test) passes a
1074    // lease_ttl_ms < 3. The SDK config validator already enforces
1075    // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1076    // is a cheap belt-and-suspenders (Copilot review finding on
1077    // PR #119).
1078    let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1079
1080    tokio::spawn(async move {
1081        let mut tick = tokio::time::interval(interval);
1082        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1083        // Skip the first immediate tick — the lease was just acquired.
1084        tick.tick().await;
1085
1086        loop {
1087            tokio::select! {
1088                _ = stop_signal.notified() => {
1089                    tracing::debug!(
1090                        execution_id = %execution_id,
1091                        "lease renewal stopped by signal"
1092                    );
1093                    return;
1094                }
1095                _ = tick.tick() => {
1096                    match renew_once(backend.as_ref(), &handle, &execution_id).await {
1097                        Ok(_renewal) => {
1098                            failure_counter.store(0, Ordering::Relaxed);
1099                            tracing::trace!(
1100                                execution_id = %execution_id,
1101                                "lease renewed"
1102                            );
1103                        }
1104                        Err(e) if is_terminal_renewal_error(&e) => {
1105                            failure_counter.fetch_add(1, Ordering::Relaxed);
1106                            tracing::warn!(
1107                                execution_id = %execution_id,
1108                                error = %e,
1109                                "lease renewal failed with terminal error, stopping renewal"
1110                            );
1111                            return;
1112                        }
1113                        Err(e) => {
1114                            let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1115                            tracing::warn!(
1116                                execution_id = %execution_id,
1117                                error = %e,
1118                                consecutive_failures = count,
1119                                "lease renewal failed (will retry next interval)"
1120                            );
1121                        }
1122                    }
1123                }
1124            }
1125        }
1126    })
1127}
1128
1129/// Check if an engine error means renewal should stop permanently.
1130#[allow(dead_code)]
1131fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1132    use crate::{ContentionKind, EngineError, StateKind};
1133    matches!(
1134        err,
1135        EngineError::State(
1136            StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1137        ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1138            | EngineError::NotFound { entity: "execution" }
1139    )
1140}
1141
1142// ── FCALL result parsing ──
1143
1144/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1145/// function into a typed [`ReportUsageResult`].
1146///
1147/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1148///                  `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1149/// Status code `!= 1` is parsed as a [`ScriptError`] via
1150/// [`ScriptError::from_code_with_detail`].
1151///
1152/// Exposed as `pub` so downstream SDKs that speak the same wire format
1153/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1154/// call this directly instead of re-implementing the parse. Keeping one
1155/// parser paired with the producer (the Lua function registered at
1156/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1157/// against silent format drift between producer and consumer.
1158pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1159    let arr = match raw {
1160        Value::Array(arr) => arr,
1161        _ => {
1162            return Err(SdkError::from(ScriptError::Parse {
1163                fcall: "parse_report_usage_result".into(),
1164                execution_id: None,
1165                message: "ff_report_usage_and_check: expected Array".into(),
1166            }));
1167        }
1168    };
1169    let status_code = match arr.first() {
1170        Some(Ok(Value::Int(n))) => *n,
1171        _ => {
1172            return Err(SdkError::from(ScriptError::Parse {
1173                fcall: "parse_report_usage_result".into(),
1174                execution_id: None,
1175                message: "ff_report_usage_and_check: expected Int status code".into(),
1176            }));
1177        }
1178    };
1179    if status_code != 1 {
1180        let error_code = usage_field_str(arr, 1);
1181        let detail = usage_field_str(arr, 2);
1182        return Err(SdkError::from(
1183            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1184                ScriptError::Parse {
1185                    fcall: "parse_report_usage_result".into(),
1186                    execution_id: None,
1187                    message: format!("ff_report_usage_and_check: {error_code}"),
1188                }
1189            }),
1190        ));
1191    }
1192    let sub_status = usage_field_str(arr, 1);
1193    match sub_status.as_str() {
1194        "OK" => Ok(ReportUsageResult::Ok),
1195        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1196        "SOFT_BREACH" => {
1197            let dim = usage_field_str(arr, 2);
1198            let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1199            let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1200            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1201        }
1202        "HARD_BREACH" => {
1203            let dim = usage_field_str(arr, 2);
1204            let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1205            let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1206            Ok(ReportUsageResult::HardBreach {
1207                dimension: dim,
1208                current_usage: current,
1209                hard_limit: limit,
1210            })
1211        }
1212        _ => Err(SdkError::from(ScriptError::Parse {
1213            fcall: "parse_report_usage_result".into(),
1214            execution_id: None,
1215            message: format!(
1216            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1217        ),
1218        })),
1219    }
1220}
1221
1222fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1223    match arr.get(index) {
1224        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1225        Some(Ok(Value::SimpleString(s))) => s.clone(),
1226        Some(Ok(Value::Int(n))) => n.to_string(),
1227        _ => String::new(),
1228    }
1229}
1230
1231/// Parse a required numeric usage field (u64) from the wire array at
1232/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1233/// holds a non-string/non-int value, or contains a string that does
1234/// not parse as u64.
1235///
1236/// Rationale: the Lua producer (`lua/budget.lua:99`,
1237/// `ff_report_usage_and_check`) always emits
1238/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1239/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1240/// non-numeric value here means the Lua and Rust sides drifted;
1241/// silently coercing to `0` would surface drift as "zero-usage breach"
1242/// — arithmetically correct but semantically nonsense. Fail loudly
1243/// instead so drift shows up as a parse error at the first call site.
1244fn parse_usage_u64(
1245    arr: &[Result<Value, ferriskey::Error>],
1246    index: usize,
1247    sub_status: &str,
1248    field_name: &str,
1249) -> Result<u64, SdkError> {
1250    match arr.get(index) {
1251        Some(Ok(Value::Int(n))) => {
1252            u64::try_from(*n).map_err(|_| {
1253                SdkError::from(ScriptError::Parse {
1254                    fcall: "parse_usage_u64".into(),
1255                    execution_id: None,
1256                    message: format!(
1257                    "ff_report_usage_and_check {sub_status}: {field_name} \
1258                     (index {index}) negative int {n} cannot be u64"
1259                ),
1260                })
1261            })
1262        }
1263        Some(Ok(Value::BulkString(b))) => {
1264            let s = String::from_utf8_lossy(b);
1265            s.parse::<u64>().map_err(|_| {
1266                SdkError::from(ScriptError::Parse {
1267                    fcall: "parse_usage_u64".into(),
1268                    execution_id: None,
1269                    message: format!(
1270                    "ff_report_usage_and_check {sub_status}: {field_name} \
1271                     (index {index}) not a u64 string: {s:?}"
1272                ),
1273                })
1274            })
1275        }
1276        Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1277            SdkError::from(ScriptError::Parse {
1278                fcall: "parse_usage_u64".into(),
1279                execution_id: None,
1280                message: format!(
1281                "ff_report_usage_and_check {sub_status}: {field_name} \
1282                 (index {index}) not a u64 string: {s:?}"
1283            ),
1284            })
1285        }),
1286        Some(_) => Err(SdkError::from(ScriptError::Parse {
1287            fcall: "parse_usage_u64".into(),
1288            execution_id: None,
1289            message: format!(
1290            "ff_report_usage_and_check {sub_status}: {field_name} \
1291             (index {index}) wrong wire type (expected Int or String)"
1292        ),
1293        })),
1294        None => Err(SdkError::from(ScriptError::Parse {
1295            fcall: "parse_usage_u64".into(),
1296            execution_id: None,
1297            message: format!(
1298            "ff_report_usage_and_check {sub_status}: {field_name} \
1299             (index {index}) missing from response"
1300        ),
1301        })),
1302    }
1303}
1304
1305/// Parse a standard {1, "OK", ...} / {0, "error", ...} FCALL result.
1306/// Extract the waitpoint_token (field index 4 in the 0-indexed response array)
1307/// from an `ff_create_pending_waitpoint` reply. Runs `parse_success_result`
1308/// first so error cases produce the usual typed error, not a parse failure.
1309fn extract_pending_waitpoint_token(raw: &Value) -> Result<WaitpointToken, SdkError> {
1310    parse_success_result(raw, "ff_create_pending_waitpoint")?;
1311    let arr = match raw {
1312        Value::Array(arr) => arr,
1313        _ => unreachable!("parse_success_result would have rejected non-array"),
1314    };
1315    // Layout: [0]=1, [1]="OK", [2]=waitpoint_id, [3]=waitpoint_key, [4]=waitpoint_token
1316    let token_str = arr
1317        .get(4)
1318        .and_then(|v| match v {
1319            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1320            Ok(Value::SimpleString(s)) => Some(s.clone()),
1321            _ => None,
1322        })
1323        .ok_or_else(|| {
1324            SdkError::from(ScriptError::Parse {
1325                fcall: "extract_pending_waitpoint_token".into(),
1326                execution_id: None,
1327                message: "ff_create_pending_waitpoint: missing waitpoint_token in response".into(),
1328            })
1329        })?;
1330    Ok(WaitpointToken::new(token_str))
1331}
1332
1333/// Pure helper: decide whether `suspension:current` represents a
1334/// signal-driven resume for the currently-claimed attempt, and extract
1335/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1336/// (no record, stale prior-attempt, non-resumed close). Returns an error
1337/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1338/// bug rather than a missing-data case.
1339///
1340/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1341/// `EngineBackend::observe_signals`, which re-implements this invariant
1342/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1343/// tests so the parsing contract stays exercised at the SDK layer —
1344/// Stage 1d will consolidate (either promote the helper into ff-core
1345/// or drop these tests once the backend-side tests cover equivalent
1346/// ground).
1347#[allow(dead_code)]
1348fn resume_waitpoint_id_from_suspension(
1349    susp: &HashMap<String, String>,
1350    claimed_attempt: AttemptIndex,
1351) -> Result<Option<WaitpointId>, SdkError> {
1352    if susp.is_empty() {
1353        return Ok(None);
1354    }
1355    let susp_att: u32 = susp
1356        .get("attempt_index")
1357        .and_then(|s| s.parse().ok())
1358        .unwrap_or(u32::MAX);
1359    if susp_att != claimed_attempt.0 {
1360        return Ok(None);
1361    }
1362    let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1363    if close_reason != "resumed" {
1364        return Ok(None);
1365    }
1366    let wp_id_str = susp
1367        .get("waitpoint_id")
1368        .map(String::as_str)
1369        .unwrap_or_default();
1370    if wp_id_str.is_empty() {
1371        return Ok(None);
1372    }
1373    let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1374        SdkError::from(ScriptError::Parse {
1375            fcall: "resume_waitpoint_id_from_suspension".into(),
1376            execution_id: None,
1377            message: format!(
1378            "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1379        ),
1380        })
1381    })?;
1382    Ok(Some(waitpoint_id))
1383}
1384
1385pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1386    let arr = match raw {
1387        Value::Array(arr) => arr,
1388        _ => {
1389            return Err(SdkError::from(ScriptError::Parse {
1390                fcall: "parse_success_result".into(),
1391                execution_id: None,
1392                message: format!(
1393                "{function_name}: expected Array, got non-array"
1394            ),
1395            }));
1396        }
1397    };
1398
1399    if arr.is_empty() {
1400        return Err(SdkError::from(ScriptError::Parse {
1401            fcall: "parse_success_result".into(),
1402            execution_id: None,
1403            message: format!(
1404            "{function_name}: empty result array"
1405        ),
1406        }));
1407    }
1408
1409    let status_code = match arr.first() {
1410        Some(Ok(Value::Int(n))) => *n,
1411        _ => {
1412            return Err(SdkError::from(ScriptError::Parse {
1413                fcall: "parse_success_result".into(),
1414                execution_id: None,
1415                message: format!(
1416                "{function_name}: expected Int at index 0"
1417            ),
1418            }));
1419        }
1420    };
1421
1422    if status_code == 1 {
1423        Ok(())
1424    } else {
1425        // Extract error code from index 1 and optional detail from index 2
1426        // (e.g. `capability_mismatch` ships missing tokens there). Variants
1427        // that carry a String payload pick the detail up via
1428        // `from_code_with_detail`; other variants ignore it.
1429        let field_str = |idx: usize| -> String {
1430            arr.get(idx)
1431                .and_then(|v| match v {
1432                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1433                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1434                    _ => None,
1435                })
1436                .unwrap_or_default()
1437        };
1438        let error_code = {
1439            let s = field_str(1);
1440            if s.is_empty() { "unknown".to_owned() } else { s }
1441        };
1442        // Collect all detail slots (idx >= 2). Most variants only read
1443        // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1444        // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1445        // reconciliation after a network drop.
1446        let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1447        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1448
1449        let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1450            .unwrap_or_else(|| {
1451                ScriptError::Parse {
1452                    fcall: "parse_success_result".into(),
1453                    execution_id: None,
1454                    message: format!("{function_name}: unknown error: {error_code}"),
1455                }
1456            });
1457
1458        Err(SdkError::from(script_err))
1459    }
1460}
1461
1462/// Parse ff_suspend_execution result:
1463///   ok(suspension_id, waitpoint_id, waitpoint_key)
1464///   ok_already_satisfied(suspension_id, waitpoint_id, waitpoint_key)
1465fn parse_suspend_result(
1466    raw: &Value,
1467    suspension_id: SuspensionId,
1468    waitpoint_id: WaitpointId,
1469    waitpoint_key: String,
1470) -> Result<SuspendOutcome, SdkError> {
1471    let arr = match raw {
1472        Value::Array(arr) => arr,
1473        _ => {
1474            return Err(SdkError::from(ScriptError::Parse {
1475                fcall: "parse_suspend_result".into(),
1476                execution_id: None,
1477                message: "ff_suspend_execution: expected Array".into(),
1478            }));
1479        }
1480    };
1481
1482    let status_code = match arr.first() {
1483        Some(Ok(Value::Int(n))) => *n,
1484        _ => {
1485            return Err(SdkError::from(ScriptError::Parse {
1486                fcall: "parse_suspend_result".into(),
1487                execution_id: None,
1488                message: "ff_suspend_execution: bad status code".into(),
1489            }));
1490        }
1491    };
1492
1493    if status_code != 1 {
1494        let err_field_str = |idx: usize| -> String {
1495            arr.get(idx)
1496                .and_then(|v| match v {
1497                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1498                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1499                    _ => None,
1500                })
1501                .unwrap_or_default()
1502        };
1503        let error_code = {
1504            let s = err_field_str(1);
1505            if s.is_empty() { "unknown".to_owned() } else { s }
1506        };
1507        let detail = err_field_str(2);
1508        return Err(SdkError::from(
1509            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1510                ScriptError::Parse {
1511                    fcall: "parse_suspend_result".into(),
1512                    execution_id: None,
1513                    message: format!("ff_suspend_execution: {error_code}"),
1514                }
1515            }),
1516        ));
1517    }
1518
1519    // Check sub-status at index 1
1520    let sub_status = arr
1521        .get(1)
1522        .and_then(|v| match v {
1523            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1524            Ok(Value::SimpleString(s)) => Some(s.clone()),
1525            _ => None,
1526        })
1527        .unwrap_or_default();
1528
1529    // Lua returns: {1, status, suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
1530    // The suspension_id/waitpoint_id/waitpoint_key values the worker passed in are
1531    // authoritative (Lua echoes them); waitpoint_token however is MINTED by Lua and
1532    // must be read from the response.
1533    let waitpoint_token = arr
1534        .get(5)
1535        .and_then(|v| match v {
1536            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1537            Ok(Value::SimpleString(s)) => Some(s.clone()),
1538            _ => None,
1539        })
1540        .map(WaitpointToken::new)
1541        .ok_or_else(|| {
1542            SdkError::from(ScriptError::Parse {
1543                fcall: "parse_suspend_result".into(),
1544                execution_id: None,
1545                message: "ff_suspend_execution: missing waitpoint_token in response".into(),
1546            })
1547        })?;
1548
1549    if sub_status == "ALREADY_SATISFIED" {
1550        Ok(SuspendOutcome::AlreadySatisfied {
1551            suspension_id,
1552            waitpoint_id,
1553            waitpoint_key,
1554            waitpoint_token,
1555        })
1556    } else {
1557        Ok(SuspendOutcome::Suspended {
1558            suspension_id,
1559            waitpoint_id,
1560            waitpoint_key,
1561            waitpoint_token,
1562        })
1563    }
1564}
1565
1566/// Parse ff_deliver_signal result:
1567///   ok(signal_id, effect)
1568///   ok_duplicate(existing_signal_id)
1569pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1570    let arr = match raw {
1571        Value::Array(arr) => arr,
1572        _ => {
1573            return Err(SdkError::from(ScriptError::Parse {
1574                fcall: "parse_signal_result".into(),
1575                execution_id: None,
1576                message: "ff_deliver_signal: expected Array".into(),
1577            }));
1578        }
1579    };
1580
1581    let status_code = match arr.first() {
1582        Some(Ok(Value::Int(n))) => *n,
1583        _ => {
1584            return Err(SdkError::from(ScriptError::Parse {
1585                fcall: "parse_signal_result".into(),
1586                execution_id: None,
1587                message: "ff_deliver_signal: bad status code".into(),
1588            }));
1589        }
1590    };
1591
1592    if status_code != 1 {
1593        let err_field_str = |idx: usize| -> String {
1594            arr.get(idx)
1595                .and_then(|v| match v {
1596                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1597                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1598                    _ => None,
1599                })
1600                .unwrap_or_default()
1601        };
1602        let error_code = {
1603            let s = err_field_str(1);
1604            if s.is_empty() { "unknown".to_owned() } else { s }
1605        };
1606        let detail = err_field_str(2);
1607        return Err(SdkError::from(
1608            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1609                ScriptError::Parse {
1610                    fcall: "parse_signal_result".into(),
1611                    execution_id: None,
1612                    message: format!("ff_deliver_signal: {error_code}"),
1613                }
1614            }),
1615        ));
1616    }
1617
1618    let sub_status = arr
1619        .get(1)
1620        .and_then(|v| match v {
1621            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1622            Ok(Value::SimpleString(s)) => Some(s.clone()),
1623            _ => None,
1624        })
1625        .unwrap_or_default();
1626
1627    if sub_status == "DUPLICATE" {
1628        let existing_id = arr
1629            .get(2)
1630            .and_then(|v| match v {
1631                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1632                Ok(Value::SimpleString(s)) => Some(s.clone()),
1633                _ => None,
1634            })
1635            .unwrap_or_default();
1636        return Ok(SignalOutcome::Duplicate {
1637            existing_signal_id: existing_id,
1638        });
1639    }
1640
1641    // Parse: {1, "OK", signal_id, effect}
1642    let signal_id_str = arr
1643        .get(2)
1644        .and_then(|v| match v {
1645            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1646            Ok(Value::SimpleString(s)) => Some(s.clone()),
1647            _ => None,
1648        })
1649        .unwrap_or_default();
1650
1651    let effect = arr
1652        .get(3)
1653        .and_then(|v| match v {
1654            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1655            Ok(Value::SimpleString(s)) => Some(s.clone()),
1656            _ => None,
1657        })
1658        .unwrap_or_default();
1659
1660    let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1661        SdkError::from(ScriptError::Parse {
1662            fcall: "parse_signal_result".into(),
1663            execution_id: None,
1664            message: format!(
1665            "ff_deliver_signal: invalid signal_id from Lua: {e}"
1666        ),
1667        })
1668    })?;
1669
1670    if effect == "resume_condition_satisfied" {
1671        Ok(SignalOutcome::TriggeredResume { signal_id })
1672    } else {
1673        Ok(SignalOutcome::Accepted { signal_id, effect })
1674    }
1675}
1676
1677/// Parse ff_append_frame result: ok(entry_id, frame_count)
1678fn parse_append_frame_result(raw: &Value) -> Result<AppendFrameOutcome, SdkError> {
1679    let arr = match raw {
1680        Value::Array(arr) => arr,
1681        _ => {
1682            return Err(SdkError::from(ScriptError::Parse {
1683                fcall: "parse_append_frame_result".into(),
1684                execution_id: None,
1685                message: "ff_append_frame: expected Array".into(),
1686            }));
1687        }
1688    };
1689
1690    let status_code = match arr.first() {
1691        Some(Ok(Value::Int(n))) => *n,
1692        _ => {
1693            return Err(SdkError::from(ScriptError::Parse {
1694                fcall: "parse_append_frame_result".into(),
1695                execution_id: None,
1696                message: "ff_append_frame: bad status code".into(),
1697            }));
1698        }
1699    };
1700
1701    if status_code != 1 {
1702        let err_field_str = |idx: usize| -> String {
1703            arr.get(idx)
1704                .and_then(|v| match v {
1705                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1706                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1707                    _ => None,
1708                })
1709                .unwrap_or_default()
1710        };
1711        let error_code = {
1712            let s = err_field_str(1);
1713            if s.is_empty() { "unknown".to_owned() } else { s }
1714        };
1715        let detail = err_field_str(2);
1716        return Err(SdkError::from(
1717            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1718                ScriptError::Parse {
1719                    fcall: "parse_append_frame_result".into(),
1720                    execution_id: None,
1721                    message: format!("ff_append_frame: {error_code}"),
1722                }
1723            }),
1724        ));
1725    }
1726
1727    // ok(entry_id, frame_count)  → arr[2] = entry_id, arr[3] = frame_count
1728    let stream_id = arr
1729        .get(2)
1730        .and_then(|v| match v {
1731            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1732            Ok(Value::SimpleString(s)) => Some(s.clone()),
1733            _ => None,
1734        })
1735        .unwrap_or_default();
1736
1737    let frame_count = arr
1738        .get(3)
1739        .and_then(|v| match v {
1740            Ok(Value::BulkString(b)) => String::from_utf8_lossy(b).parse::<u64>().ok(),
1741            Ok(Value::SimpleString(s)) => s.parse::<u64>().ok(),
1742            Ok(Value::Int(n)) => Some(*n as u64),
1743            _ => None,
1744        })
1745        .unwrap_or(0);
1746
1747    Ok(AppendFrameOutcome {
1748        stream_id,
1749        frame_count,
1750    })
1751}
1752
1753/// Parse ff_fail_execution result:
1754///   ok("retry_scheduled", delay_until)
1755///   ok("terminal_failed")
1756#[allow(dead_code)]
1757fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1758    let arr = match raw {
1759        Value::Array(arr) => arr,
1760        _ => {
1761            return Err(SdkError::from(ScriptError::Parse {
1762                fcall: "parse_fail_result".into(),
1763                execution_id: None,
1764                message: "ff_fail_execution: expected Array".into(),
1765            }));
1766        }
1767    };
1768
1769    let status_code = match arr.first() {
1770        Some(Ok(Value::Int(n))) => *n,
1771        _ => {
1772            return Err(SdkError::from(ScriptError::Parse {
1773                fcall: "parse_fail_result".into(),
1774                execution_id: None,
1775                message: "ff_fail_execution: bad status code".into(),
1776            }));
1777        }
1778    };
1779
1780    if status_code != 1 {
1781        let err_field_str = |idx: usize| -> String {
1782            arr.get(idx)
1783                .and_then(|v| match v {
1784                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1785                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1786                    _ => None,
1787                })
1788                .unwrap_or_default()
1789        };
1790        let error_code = {
1791            let s = err_field_str(1);
1792            if s.is_empty() { "unknown".to_owned() } else { s }
1793        };
1794        let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1795        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1796        return Err(SdkError::from(
1797            ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1798                ScriptError::Parse {
1799                    fcall: "parse_fail_result".into(),
1800                    execution_id: None,
1801                    message: format!("ff_fail_execution: {error_code}"),
1802                }
1803            }),
1804        ));
1805    }
1806
1807    // Parse sub-status from field[2] (index 2 = first field after status+OK)
1808    let sub_status = arr
1809        .get(2)
1810        .and_then(|v| match v {
1811            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1812            Ok(Value::SimpleString(s)) => Some(s.clone()),
1813            _ => None,
1814        })
1815        .unwrap_or_default();
1816
1817    match sub_status.as_str() {
1818        "retry_scheduled" => {
1819            // Lua returns: ok("retry_scheduled", tostring(delay_until))
1820            // arr[3] = delay_until
1821            let delay_str = arr
1822                .get(3)
1823                .and_then(|v| match v {
1824                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1825                    Ok(Value::Int(n)) => Some(n.to_string()),
1826                    _ => None,
1827                })
1828                .unwrap_or_default();
1829            let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1830
1831            Ok(FailOutcome::RetryScheduled {
1832                delay_until: TimestampMs::from_millis(delay_until),
1833            })
1834        }
1835        "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1836        _ => Err(SdkError::from(ScriptError::Parse {
1837            fcall: "parse_fail_result".into(),
1838            execution_id: None,
1839            message: format!(
1840            "ff_fail_execution: unexpected sub-status: {sub_status}"
1841        ),
1842        })),
1843    }
1844}
1845
1846// ── Stream read / tail (consumer API, RFC-006 #2) ──
1847
1848/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1849/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1850/// server would accept.
1851pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1852
1853/// Maximum frames per read/tail call. Mirrors
1854/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1855/// callers don't need to import ff-core just to read the bound.
1856pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1857
1858/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1859/// signal so polling consumers can exit cleanly.
1860///
1861/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1862pub use ff_core::contracts::StreamFrames;
1863
1864/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1865/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1866/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1867/// `StreamCursor::Start` / `StreamCursor::End` instead.
1868pub use ff_core::contracts::StreamCursor;
1869
1870/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1871/// — XREAD does not accept the open markers. Pulled out as a bare
1872/// function so unit tests can exercise the guard without constructing a
1873/// live `ferriskey::Client`.
1874fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1875    if !after.is_concrete() {
1876        return Err(SdkError::Config {
1877            context: "tail_stream".into(),
1878            field: Some("after".into()),
1879            message: "XREAD cursor must be a concrete entry id; pass \
1880                      StreamCursor::from_beginning() to start from the \
1881                      beginning"
1882                .into(),
1883        });
1884    }
1885    Ok(())
1886}
1887
1888fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1889    if count_limit == 0 {
1890        return Err(SdkError::Config {
1891            context: "read_stream_frames".into(),
1892            field: Some("count_limit".into()),
1893            message: "count_limit must be >= 1".into(),
1894        });
1895    }
1896    if count_limit > STREAM_READ_HARD_CAP {
1897        return Err(SdkError::Config {
1898            context: "read_stream_frames".into(),
1899            field: Some("count_limit".into()),
1900            message: format!(
1901                "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1902            ),
1903        });
1904    }
1905    Ok(())
1906}
1907
1908/// Read frames from a completed or in-flight attempt's stream.
1909///
1910/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1911/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1912/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1913/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1914/// `0` returns [`SdkError::Config`].
1915///
1916/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1917/// consumers know when the producer has finalized the stream. A
1918/// never-written attempt and an in-progress stream are indistinguishable
1919/// here — both present as `frames=[]`, `closed_at=None`.
1920///
1921/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1922/// client but are not the lease-holding worker — no lease check is
1923/// performed.
1924///
1925/// # Head-of-line note
1926///
1927/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1928/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1929/// calling this on a `client` that is also serving FCALLs stalls those
1930/// FCALLs behind the reply. The REST server isolates reads on its
1931/// `tail_client`; direct SDK callers should either use a dedicated
1932/// client OR paginate through smaller `count_limit` slices.
1933pub async fn read_stream(
1934    client: &Client,
1935    partition_config: &PartitionConfig,
1936    execution_id: &ExecutionId,
1937    attempt_index: AttemptIndex,
1938    from: StreamCursor,
1939    to: StreamCursor,
1940    count_limit: u64,
1941) -> Result<StreamFrames, SdkError> {
1942    use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
1943    validate_stream_read_count(count_limit)?;
1944
1945    let partition = execution_partition(execution_id, partition_config);
1946    let ctx = ExecKeyContext::new(&partition, execution_id);
1947    let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
1948
1949    // Lower the opaque cursor to the Valkey XRANGE marker at the
1950    // adapter edge. `ReadFramesArgs` is string-typed — the Lua ABI is
1951    // untouched.
1952    let args = ReadFramesArgs {
1953        execution_id: execution_id.clone(),
1954        attempt_index,
1955        from_id: from.into_wire_string(),
1956        to_id: to.into_wire_string(),
1957        count_limit,
1958    };
1959
1960    let ReadFramesResult::Frames(f) =
1961        ff_script::functions::stream::ff_read_attempt_stream(client, &keys, &args)
1962            .await
1963            .map_err(SdkError::from)?;
1964    Ok(f)
1965}
1966
1967/// Tail a live attempt's stream.
1968///
1969/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1970/// with id strictly greater than `after`. Pass
1971/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1972/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1973/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1974/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1975///
1976/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1977/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1978/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1979/// SDK and REST ceilings aligned.
1980///
1981/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1982/// polling consumers should loop until `result.is_closed()` is true, then
1983/// drain and exit. Timeout with no new frames presents as
1984/// `frames=[], closed_at=None`.
1985///
1986/// # Head-of-line warning — use a dedicated client
1987///
1988/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1989/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1990/// the read side until a frame arrives or the block elapses. If the
1991/// `client` you pass here is ALSO used for claims, completes, fails,
1992/// appends, or any other FCALL, a 30-second tail will stall all those
1993/// calls for up to 30 seconds.
1994///
1995/// **Strongly recommended**: build a separate `ferriskey::Client` for
1996/// tail callers — mirrors the `Server::tail_client` split that the REST
1997/// server uses internally (see `crates/ff-server/src/server.rs` and
1998/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1999///
2000/// # Tail parallelism caveat (same mux)
2001///
2002/// Even a dedicated tail client is still one multiplexed TCP connection.
2003/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
2004/// ferriskey's per-call `request_timeout` starts at future-poll — so
2005/// two concurrent tails against the same client can time out spuriously:
2006/// the second call's BLOCK budget elapses while it waits for the first
2007/// BLOCK to return. The REST server handles this internally with a
2008/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
2009/// each call its full `block_ms` budget at the server.
2010///
2011/// **Direct SDK callers that need concurrent tails**: either
2012///   (1) build ONE `ferriskey::Client` per concurrent tail call (a small
2013///       pool of clients, rotated by the caller), OR
2014///   (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
2015///       only one BLOCK is in flight per client at a time.
2016/// If you need the REST-side backpressure (429 on contention) and the
2017/// built-in serializer, go through the
2018/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
2019/// than calling this directly.
2020///
2021/// This SDK does not enforce either pattern — the mutex belongs at the
2022/// application layer, and the connection pool belongs at the SDK
2023/// caller's DI layer; neither has a structured place inside this
2024/// helper.
2025///
2026/// # Timeout handling
2027///
2028/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
2029/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
2030/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
2031/// 500ms)`, overriding the client's default per-call. A tail with
2032/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
2033/// the client was built with a shorter `request_timeout`. No custom client
2034/// configuration is required for timeout reasons — only for head-of-line
2035/// isolation above.
2036pub async fn tail_stream(
2037    client: &Client,
2038    partition_config: &PartitionConfig,
2039    execution_id: &ExecutionId,
2040    attempt_index: AttemptIndex,
2041    after: StreamCursor,
2042    block_ms: u64,
2043    count_limit: u64,
2044) -> Result<StreamFrames, SdkError> {
2045    if block_ms > MAX_TAIL_BLOCK_MS {
2046        return Err(SdkError::Config {
2047            context: "tail_stream".into(),
2048            field: Some("block_ms".into()),
2049            message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
2050        });
2051    }
2052    validate_stream_read_count(count_limit)?;
2053    // XREAD does not accept `-` / `+` markers as cursors — reject at
2054    // the SDK boundary with `SdkError::Config` rather than forwarding
2055    // an invalid `-`/`+` into ferriskey (which would surface as an
2056    // opaque server error).
2057    validate_tail_cursor(&after)?;
2058
2059    let partition = execution_partition(execution_id, partition_config);
2060    let ctx = ExecKeyContext::new(&partition, execution_id);
2061    let stream_key = ctx.stream(attempt_index);
2062    let stream_meta_key = ctx.stream_meta(attempt_index);
2063
2064    ff_script::stream_tail::xread_block(
2065        client,
2066        &stream_key,
2067        &stream_meta_key,
2068        after.to_wire(),
2069        block_ms,
2070        count_limit,
2071    )
2072    .await
2073    .map_err(SdkError::from)
2074}
2075
2076#[cfg(test)]
2077mod tail_stream_boundary_tests {
2078    use super::*;
2079
2080    // `validate_tail_cursor` rejects `StreamCursor::Start` and
2081    // `StreamCursor::End` before `tail_stream` touches the client —
2082    // same shape as the `count_limit` guard above. The matching
2083    // full-path rejection on the REST layer is covered by
2084    // `ff-server::api`.
2085
2086    #[test]
2087    fn rejects_start_cursor() {
2088        let err = validate_tail_cursor(&StreamCursor::Start)
2089            .expect_err("Start must be rejected");
2090        match err {
2091            SdkError::Config { field, context, .. } => {
2092                assert_eq!(field.as_deref(), Some("after"));
2093                assert_eq!(context, "tail_stream");
2094            }
2095            other => panic!("expected SdkError::Config, got {other:?}"),
2096        }
2097    }
2098
2099    #[test]
2100    fn rejects_end_cursor() {
2101        let err = validate_tail_cursor(&StreamCursor::End)
2102            .expect_err("End must be rejected");
2103        assert!(matches!(err, SdkError::Config { .. }));
2104    }
2105
2106    #[test]
2107    fn accepts_at_cursor() {
2108        validate_tail_cursor(&StreamCursor::At("0-0".into()))
2109            .expect("At cursor must be accepted");
2110        validate_tail_cursor(&StreamCursor::from_beginning())
2111            .expect("from_beginning() must be accepted");
2112        validate_tail_cursor(&StreamCursor::At("123-0".into()))
2113            .expect("concrete id must be accepted");
2114    }
2115}
2116
2117#[cfg(test)]
2118mod parse_report_usage_result_tests {
2119    use super::*;
2120
2121    /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
2122    /// BulkString and SimpleString uniformly (see
2123    /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
2124    /// `Value::SimpleString(s)` → clone). SimpleString avoids a
2125    /// dev-dependency on `bytes` just for test construction.
2126    fn s(v: &str) -> Result<Value, ferriskey::Error> {
2127        Ok(Value::SimpleString(v.to_owned()))
2128    }
2129
2130    fn int(n: i64) -> Result<Value, ferriskey::Error> {
2131        Ok(Value::Int(n))
2132    }
2133
2134    fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2135        Value::Array(items)
2136    }
2137
2138    #[test]
2139    fn ok_status() {
2140        let raw = arr(vec![int(1), s("OK")]);
2141        assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2142    }
2143
2144    #[test]
2145    fn already_applied_status() {
2146        let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2147        assert_eq!(
2148            parse_report_usage_result(&raw).unwrap(),
2149            ReportUsageResult::AlreadyApplied
2150        );
2151    }
2152
2153    #[test]
2154    fn soft_breach_status() {
2155        let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2156        match parse_report_usage_result(&raw).unwrap() {
2157            ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2158                assert_eq!(dimension, "tokens");
2159                assert_eq!(current_usage, 150);
2160                assert_eq!(soft_limit, 100);
2161            }
2162            other => panic!("expected SoftBreach, got {other:?}"),
2163        }
2164    }
2165
2166    #[test]
2167    fn hard_breach_status() {
2168        let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2169        match parse_report_usage_result(&raw).unwrap() {
2170            ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2171                assert_eq!(dimension, "requests");
2172                assert_eq!(current_usage, 10001);
2173                assert_eq!(hard_limit, 10000);
2174            }
2175            other => panic!("expected HardBreach, got {other:?}"),
2176        }
2177    }
2178
2179    /// Negative case: non-Array input. Guards against a future Lua refactor
2180    /// that accidentally returns a bare string/int — the parser must fail
2181    /// loudly rather than silently succeed or panic.
2182    #[test]
2183    fn non_array_input_is_parse_error() {
2184        let raw = Value::SimpleString("OK".to_owned());
2185        let err = parse_report_usage_result(&raw).unwrap_err();
2186        let msg = format!("{err}");
2187        assert!(
2188            msg.to_lowercase().contains("expected array"),
2189            "error should mention expected shape, got: {msg}"
2190        );
2191    }
2192
2193    /// Negative case: Array whose first element isn't an Int status code.
2194    /// The Lua function's first return slot is always `status_code` (1 on
2195    /// success, an error code otherwise); a non-Int there is a wire-format
2196    /// break that must surface as a parse error.
2197    #[test]
2198    fn first_element_non_int_is_parse_error() {
2199        let raw = arr(vec![s("not_an_int"), s("OK")]);
2200        let err = parse_report_usage_result(&raw).unwrap_err();
2201        let msg = format!("{err}");
2202        assert!(
2203            msg.to_lowercase().contains("int"),
2204            "error should mention Int status code, got: {msg}"
2205        );
2206    }
2207
2208    /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2209    /// field. Guards against the silent-coercion defect cross-review
2210    /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2211    /// which would have surfaced Lua-side wire-format drift as a
2212    /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2213    /// but semantically wrong (a "breach with zero usage" is nonsense
2214    /// and masks the real error).
2215    #[test]
2216    fn soft_breach_non_numeric_current_is_parse_error() {
2217        let raw = arr(vec![
2218            int(1),
2219            s("SOFT_BREACH"),
2220            s("tokens"),
2221            s("not_a_number"), // current_usage — must fail, not coerce to 0
2222            s("100"),
2223        ]);
2224        let err = parse_report_usage_result(&raw).unwrap_err();
2225        let msg = format!("{err}");
2226        assert!(
2227            msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2228            "error should identify sub-status + field, got: {msg}"
2229        );
2230        assert!(
2231            msg.to_lowercase().contains("u64"),
2232            "error should mention the expected type (u64), got: {msg}"
2233        );
2234    }
2235
2236    /// Negative case: HARD_BREACH with the limit slot missing
2237    /// entirely. Same defence as the non-numeric test above: a
2238    /// truncated response must fail loudly rather than coerce to 0.
2239    #[test]
2240    fn hard_breach_missing_limit_is_parse_error() {
2241        let raw = arr(vec![
2242            int(1),
2243            s("HARD_BREACH"),
2244            s("requests"),
2245            s("10001"),
2246            // no index 4 — hard_limit missing
2247        ]);
2248        let err = parse_report_usage_result(&raw).unwrap_err();
2249        let msg = format!("{err}");
2250        assert!(
2251            msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2252            "error should identify sub-status + field, got: {msg}"
2253        );
2254        assert!(
2255            msg.to_lowercase().contains("missing"),
2256            "error should say 'missing', got: {msg}"
2257        );
2258    }
2259}
2260
2261
2262#[cfg(test)]
2263mod resume_signals_tests {
2264    use super::*;
2265
2266    fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2267        pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2268    }
2269
2270    #[test]
2271    fn empty_suspension_returns_none() {
2272        let susp = m(&[]);
2273        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2274        assert!(out.is_none(), "no suspension record → None");
2275    }
2276
2277    #[test]
2278    fn stale_prior_attempt_returns_none() {
2279        let wp = WaitpointId::new();
2280        let susp = m(&[
2281            ("attempt_index", "0"),
2282            ("close_reason", "resumed"),
2283            ("waitpoint_id", &wp.to_string()),
2284        ]);
2285        // Claimed attempt is 1; suspension belongs to 0 → stale.
2286        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2287        assert!(out.is_none(), "attempt_index mismatch → None");
2288    }
2289
2290    #[test]
2291    fn non_resumed_close_returns_none() {
2292        let wp = WaitpointId::new();
2293        for reason in ["timeout", "cancelled", "", "expired"] {
2294            let susp = m(&[
2295                ("attempt_index", "0"),
2296                ("close_reason", reason),
2297                ("waitpoint_id", &wp.to_string()),
2298            ]);
2299            let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2300            assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2301        }
2302    }
2303
2304    #[test]
2305    fn resumed_same_attempt_returns_waitpoint() {
2306        let wp = WaitpointId::new();
2307        let susp = m(&[
2308            ("attempt_index", "2"),
2309            ("close_reason", "resumed"),
2310            ("waitpoint_id", &wp.to_string()),
2311        ]);
2312        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2313        assert_eq!(out, Some(wp));
2314    }
2315
2316    #[test]
2317    fn malformed_waitpoint_id_is_error() {
2318        let susp = m(&[
2319            ("attempt_index", "0"),
2320            ("close_reason", "resumed"),
2321            ("waitpoint_id", "not-a-uuid"),
2322        ]);
2323        let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2324        assert!(
2325            format!("{err}").contains("not a valid UUID"),
2326            "error should mention invalid UUID, got: {err}"
2327        );
2328    }
2329
2330    #[test]
2331    fn empty_waitpoint_id_returns_none() {
2332        // Defensive: an empty waitpoint_id field (shouldn't happen on
2333        // resumed records, but guard against partial writes) is None, not an error.
2334        let susp = m(&[
2335            ("attempt_index", "0"),
2336            ("close_reason", "resumed"),
2337            ("waitpoint_id", ""),
2338        ]);
2339        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2340        assert!(out.is_none());
2341    }
2342
2343    // The previous `matched_signal_ids_from_condition` helper was
2344    // removed when the production path switched from unbounded
2345    // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2346    // feedback on unbounded condition-hash reply size). That loop
2347    // is exercised by the integration tests in `ff-test`.
2348}
2349
2350#[cfg(test)]
2351mod terminal_replay_parsing_tests {
2352    //! Unit tests for the SDK's parse path of the enriched
2353    //! `execution_not_active` error returned on a terminal-op replay.
2354    //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2355    //! the Lua side emits the 4-slot detail; these tests prove the
2356    //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2357    //! variant so the reconciler in complete()/fail()/cancel() can
2358    //! match on them.
2359
2360    use super::*;
2361    use ferriskey::Value;
2362
2363    // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2364    // in the test harness. parse_success_result + parse_fail_result handle both.
2365    fn bulk(s: &str) -> Value {
2366        Value::SimpleString(s.to_owned())
2367    }
2368
2369    /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2370    #[test]
2371    fn parse_success_result_extracts_all_four_detail_slots() {
2372        let raw = Value::Array(vec![
2373            Ok(Value::Int(0)),
2374            Ok(bulk("execution_not_active")),
2375            Ok(bulk("success")),
2376            Ok(bulk("42")),
2377            Ok(bulk("terminal")),
2378            Ok(bulk("11111111-1111-1111-1111-111111111111")),
2379        ]);
2380        let err = parse_success_result(&raw, "test").unwrap_err();
2381        let unboxed = match err {
2382            SdkError::Engine(b) => *b,
2383            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2384        };
2385        match unboxed {
2386            crate::EngineError::Contention(
2387                crate::ContentionKind::ExecutionNotActive {
2388                    terminal_outcome,
2389                    lease_epoch,
2390                    lifecycle_phase,
2391                    attempt_id,
2392                },
2393            ) => {
2394                assert_eq!(terminal_outcome, "success");
2395                assert_eq!(lease_epoch, "42");
2396                assert_eq!(lifecycle_phase, "terminal");
2397                assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2398            }
2399            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2400        }
2401    }
2402
2403    /// parse_fail_result must extract all detail slots too so the
2404    /// reconciler in fail() can match lifecycle_phase = "runnable" for
2405    /// retry-scheduled replays.
2406    #[test]
2407    fn parse_fail_result_extracts_all_four_detail_slots() {
2408        let raw = Value::Array(vec![
2409            Ok(Value::Int(0)),
2410            Ok(bulk("execution_not_active")),
2411            Ok(bulk("none")),
2412            Ok(bulk("7")),
2413            Ok(bulk("runnable")),
2414            Ok(bulk("22222222-2222-2222-2222-222222222222")),
2415        ]);
2416        let err = parse_fail_result(&raw).unwrap_err();
2417        let unboxed = match err {
2418            SdkError::Engine(b) => *b,
2419            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2420        };
2421        match unboxed {
2422            crate::EngineError::Contention(
2423                crate::ContentionKind::ExecutionNotActive {
2424                    terminal_outcome,
2425                    lease_epoch,
2426                    lifecycle_phase,
2427                    attempt_id,
2428                },
2429            ) => {
2430                assert_eq!(terminal_outcome, "none");
2431                assert_eq!(lease_epoch, "7");
2432                assert_eq!(lifecycle_phase, "runnable");
2433                assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2434            }
2435            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2436        }
2437    }
2438
2439    /// Empty detail slots must default to "" (not panic) so older-Lua
2440    /// producers or malformed replies degrade to an unreconcilable
2441    /// variant rather than a Parse error.
2442    #[test]
2443    fn parse_success_result_missing_slots_defaults_to_empty() {
2444        let raw = Value::Array(vec![
2445            Ok(Value::Int(0)),
2446            Ok(bulk("execution_not_active")),
2447        ]);
2448        let err = parse_success_result(&raw, "test").unwrap_err();
2449        let unboxed = match err {
2450            SdkError::Engine(b) => *b,
2451            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2452        };
2453        match unboxed {
2454            crate::EngineError::Contention(
2455                crate::ContentionKind::ExecutionNotActive {
2456                    terminal_outcome,
2457                    lease_epoch,
2458                    lifecycle_phase,
2459                    attempt_id,
2460                },
2461            ) => {
2462                assert_eq!(terminal_outcome, "");
2463                assert_eq!(lease_epoch, "");
2464                assert_eq!(lifecycle_phase, "");
2465                assert_eq!(attempt_id, "");
2466            }
2467            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2468        }
2469    }
2470}