Skip to main content

ff_sdk/
task.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6#[cfg(feature = "valkey-default")]
7use ferriskey::Value;
8use ff_core::backend::Handle;
9use ff_core::backend::PendingWaitpoint;
10use ff_core::contracts::ReportUsageResult;
11use ff_core::engine_backend::EngineBackend;
12use ff_core::engine_error::StateKind;
13use ff_script::error::ScriptError;
14use ff_core::partition::PartitionConfig;
15use ff_core::types::*;
16use tokio::sync::{Notify, OwnedSemaphorePermit};
17use tokio::task::JoinHandle;
18
19use crate::SdkError;
20
21// ── Phase 3: Suspend/Signal types ──
22
23// RFC-013 Stage 1d — `TimeoutBehavior`, `SuspendOutcome`,
24// `ResumeCondition`, `SignalMatcher`, `ResumePolicy`,
25// `SuspensionReasonCode`, `SuspensionRequester`, `WaitpointBinding`,
26// `SuspendArgs`, `SuspendOutcomeDetails`, `IdempotencyKey`,
27// `CompositeBody` all live in `ff_core::contracts`. The `SuspendOutcome`
28// re-export preserves the `ff_sdk::SuspendOutcome` path; the other
29// paths are new.
30pub use ff_core::contracts::{
31    CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
32    SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
33    SuspensionRequester, TimeoutBehavior, WaitpointBinding,
34};
35
36/// RFC-013 Stage 1d — cookie returned by the strict `suspend` wrapper.
37///
38/// The returned `handle` is a `HandleKind::Suspended` cookie the caller
39/// uses for `observe_signals` and the eventual re-claim via
40/// `claim_resumed_execution`. Consuming `ClaimedTask::suspend`
41/// invalidates the pre-suspend `ClaimedTask` (already moved by the
42/// `self: ClaimedTask` receiver); this handle is the only live cookie
43/// left on the caller side.
44#[derive(Debug)]
45pub struct SuspendedHandle {
46    pub handle: Handle,
47    pub details: SuspendOutcomeDetails,
48}
49
50/// RFC-013 Stage 1d — outcome of the fallible `try_suspend` wrapper.
51///
52/// Unlike the strict wrapper, `AlreadySatisfied` hands the `ClaimedTask`
53/// back to the caller so work continues on the retained lease.
54#[allow(clippy::large_enum_variant)]
55pub enum TrySuspendOutcome {
56    Suspended(SuspendedHandle),
57    AlreadySatisfied {
58        task: ClaimedTask,
59        details: SuspendOutcomeDetails,
60    },
61}
62
63impl std::fmt::Debug for TrySuspendOutcome {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        match self {
66            Self::Suspended(h) => f.debug_tuple("Suspended").field(h).finish(),
67            Self::AlreadySatisfied { details, .. } => f
68                .debug_struct("AlreadySatisfied")
69                .field("details", details)
70                .finish_non_exhaustive(),
71        }
72    }
73}
74
75/// A signal to deliver to a suspended execution's waitpoint.
76#[derive(Clone, Debug)]
77pub struct Signal {
78    pub signal_name: String,
79    pub signal_category: String,
80    pub payload: Option<Vec<u8>>,
81    pub source_type: String,
82    pub source_identity: String,
83    pub idempotency_key: Option<String>,
84    /// HMAC token issued when the waitpoint was created. Required for
85    /// authenticated signal delivery (RFC-004 §Waitpoint Security).
86    pub waitpoint_token: WaitpointToken,
87}
88
89/// Outcome of `deliver_signal()`.
90#[derive(Clone, Debug, PartialEq, Eq)]
91pub enum SignalOutcome {
92    /// Signal accepted, appended to waitpoint but condition not yet satisfied.
93    Accepted { signal_id: SignalId, effect: String },
94    /// Signal triggered resume — execution is now runnable.
95    TriggeredResume { signal_id: SignalId },
96    /// Duplicate signal (idempotency key matched).
97    Duplicate { existing_signal_id: String },
98}
99
100#[cfg(feature = "valkey-default")]
101impl SignalOutcome {
102    /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
103    ///
104    /// Consuming packages that call `ff_deliver_signal` directly can use this
105    /// to interpret the Lua return value without depending on SDK internals.
106    pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
107        parse_signal_result(raw)
108    }
109}
110
111/// A signal that triggered a resume, readable by a worker after re-claim.
112///
113/// **RFC-012 Stage 0:** the canonical definition has moved to
114/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
115/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
116/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
117/// scheduled for removal in 0.5.0.
118pub use ff_core::backend::ResumeSignal;
119
120/// Outcome of `append_frame()`.
121///
122/// **RFC-012 §R7.2.1:** canonical definition moved to
123/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
124/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
125/// 0.4.x window. Existing consumers construct + match exhaustively
126/// without change. The derive set widened from `Clone, Debug` to
127/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
128pub use ff_core::backend::AppendFrameOutcome;
129
130/// Outcome of a `fail()` call.
131///
132/// **RFC-012 Stage 1a:** canonical definition moved to
133/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
134/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
135/// re-export in `lib.rs`) through the 0.4.x window. Existing
136/// consumers construct + match exhaustively without change.
137pub use ff_core::backend::FailOutcome;
138
139/// A claimed execution with an active lease. The worker processes this task
140/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
141///
142/// The lease is automatically renewed in the background at `lease_ttl / 3`
143/// intervals. Renewal stops when the task is consumed or dropped.
144///
145/// `complete`, `fail`, and `cancel` consume `self` — this prevents
146/// double-complete bugs at the type level.
147pub struct ClaimedTask {
148    /// `EngineBackend` the trait-migrated ops forward through.
149    ///
150    /// **RFC-012 Stage 1b + Round-7.** Today this is always a
151    /// `ValkeyBackend` wrapping an internal `ferriskey::Client`. Most
152    /// per-task ops
153    /// (`complete`/`fail`/`cancel`/`delay_execution`/
154    /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
155    /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
156    /// through `backend.*()`. `suspend` still uses the backend's
157    /// internal `fcall("ff_suspend_execution", ...)` directly — this
158    /// is the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
159    /// input-shape work. Lease renewal goes through
160    /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
161    /// trait-shape gaps tracked by #117.
162    ///
163    /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
164    /// deliver_signal) through the same trait surface; Stage 1d
165    /// refactors this struct to carry a single `Handle` rather than
166    /// synthesising one per op via `cloned_handle`.
167    backend: Arc<dyn EngineBackend>,
168    /// Partition config for key construction.
169    #[allow(dead_code)]
170    partition_config: PartitionConfig,
171    /// Execution identity.
172    execution_id: ExecutionId,
173    /// Current attempt.
174    attempt_index: AttemptIndex,
175    attempt_id: AttemptId,
176    /// Lease identity.
177    lease_id: LeaseId,
178    lease_epoch: LeaseEpoch,
179    /// Lease timing.
180    #[allow(dead_code)]
181    lease_ttl_ms: u64,
182    /// Lane used at claim time.
183    lane_id: LaneId,
184    /// Worker instance that holds this lease (for index cleanup keys).
185    #[allow(dead_code)]
186    worker_instance_id: WorkerInstanceId,
187    /// Backend-minted attempt handle cached at claim time.
188    ///
189    /// **RFC-012 Stage 1d (v0.12 PR-5.5).** Replaces the pre-PR
190    /// `cloned_handle` helper that per-op synthesised a Valkey-specific
191    /// `Handle` via `ValkeyBackend::encode_handle`. The handle is now
192    /// produced by the owning backend at `claim_execution` /
193    /// `claim_resumed_execution` time and rides on the
194    /// `ClaimedExecution` / `ClaimedResumedExecution` result. Every
195    /// per-op trait forwarder clones it into `backend.*()` — no
196    /// runtime cost difference vs. the pre-PR-5.5 per-op
197    /// `encode_handle` (one `Box<[u8]>` allocation per op).
198    handle: Handle,
199    /// Execution data.
200    input_payload: Vec<u8>,
201    execution_kind: String,
202    tags: HashMap<String, String>,
203    /// Background renewal task handle.
204    renewal_handle: JoinHandle<()>,
205    /// Signal to stop renewal (used before consuming self).
206    renewal_stop: Arc<Notify>,
207    /// Consecutive lease renewal failures. Shared with the background renewal
208    /// task. Reset to 0 on each successful renewal. Workers should check
209    /// `is_lease_healthy()` before committing expensive side effects.
210    renewal_failures: Arc<AtomicU32>,
211    /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
212    /// response is received, just before `self` is consumed into `Drop`.
213    /// `Drop` reads this instead of `renewal_handle.is_finished()` to
214    /// suppress the false-positive "dropped without terminal operation"
215    /// warning: after `notify_one`, the renewal task has not yet been
216    /// polled by the runtime, so `is_finished()` is still `false` on the
217    /// happy path when self is being consumed.
218    ///
219    /// Note: the flag is set for any terminal-op path that reaches
220    /// `stop_renewal()`, which includes Lua-level script errors (the
221    /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
222    /// the caller already receives the `Err` via the op's return value,
223    /// so an additional `Drop` warning would be noise. The warning is
224    /// reserved for genuine drop-without-terminal-op cases (panic, early
225    /// return, transport failure before stop_renewal ran).
226    terminal_op_called: AtomicBool,
227    /// Concurrency permit from the worker's semaphore. Held for the lifetime
228    /// of the task; released on complete/fail/cancel/drop.
229    _concurrency_permit: Option<OwnedSemaphorePermit>,
230}
231
232impl ClaimedTask {
233    /// Construct a `ClaimedTask` from the results of a successful
234    /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
235    ///
236    /// # Arguments
237    ///
238    /// * `backend` — `Arc<dyn EngineBackend>` the per-task ops
239    ///   (`complete`/`fail`/`cancel`/`renew`/`append_frame`/…) forward
240    ///   through, plus the background lease-renewal task. Today this
241    ///   is always a `ValkeyBackend` (see the struct doc on `backend`
242    ///   above); RFC-023 widens the admissible backends.
243    /// * `partition_config` — partition topology snapshot read at
244    ///   `FlowFabricWorker::connect`. Used for key construction on
245    ///   the lifetime of this task.
246    /// * `execution_id` — the claimed execution's UUID.
247    /// * `attempt_index` / `attempt_id` — current attempt identity.
248    ///   `attempt_index` is 0 on a fresh claim, preserved on a
249    ///   resumed claim.
250    /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
251    ///   is returned by the Lua FCALL and bumped on each resumed
252    ///   claim.
253    /// * `lease_ttl_ms` — lease TTL used to schedule the background
254    ///   renewal task (renews at `lease_ttl_ms / 3`).
255    /// * `lane_id` — the lane the task was claimed on. Used for
256    ///   index key construction (`lane_active`, `lease_expiry`,
257    ///   etc.).
258    /// * `worker_instance_id` — this worker's identity. Used to
259    ///   resolve `worker_leases` index entries during renewal and
260    ///   completion.
261    /// * `input_payload` / `execution_kind` / `tags` — pre-read
262    ///   execution metadata, exposed via getters so the worker
263    ///   doesn't round-trip to Valkey to inspect what it just
264    ///   claimed.
265    ///
266    /// # Invariant: constructor is `pub(crate)` on purpose
267    ///
268    /// **External callers cannot construct a `ClaimedTask`** — only
269    /// the in-crate claim entry points (`claim_next`,
270    /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
271    /// load-bearing: those entry points are the ONLY sites that
272    /// acquire a permit from the worker's concurrency semaphore
273    /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
274    /// it through `ClaimedTask::set_concurrency_permit` before
275    /// returning the task. Promoting `new` to `pub` would let
276    /// consumers build tasks that bypass the concurrency contract
277    /// the worker's `max_concurrent_tasks` config advertises — the
278    /// returned task would run alongside other in-flight work
279    /// without debiting the permit bank, and the
280    /// complete/fail/cancel/drop path would have nothing to
281    /// release.
282    ///
283    /// If an external callsite genuinely needs to rehydrate a
284    /// task from saved FCALL results, the right answer is a new
285    /// `FlowFabricWorker` entry point that wraps `new` + acquires a
286    /// permit — not promoting this constructor.
287    #[allow(clippy::too_many_arguments)]
288    pub(crate) fn new(
289        backend: Arc<dyn EngineBackend>,
290        partition_config: PartitionConfig,
291        handle: Handle,
292        execution_id: ExecutionId,
293        attempt_index: AttemptIndex,
294        attempt_id: AttemptId,
295        lease_id: LeaseId,
296        lease_epoch: LeaseEpoch,
297        lease_ttl_ms: u64,
298        lane_id: LaneId,
299        worker_instance_id: WorkerInstanceId,
300        input_payload: Vec<u8>,
301        execution_kind: String,
302        tags: HashMap<String, String>,
303    ) -> Self {
304        let renewal_stop = Arc::new(Notify::new());
305        let renewal_failures = Arc::new(AtomicU32::new(0));
306
307        // v0.12 PR-5.5: the renewal task forwards through
308        // `backend.renew(&handle)` using the backend-minted handle. No
309        // Valkey-specific `encode_handle` call here — the handle rode in
310        // on the claim result.
311        let renewal_handle = spawn_renewal_task(
312            backend.clone(),
313            handle.clone(),
314            execution_id.clone(),
315            lease_ttl_ms,
316            renewal_stop.clone(),
317            renewal_failures.clone(),
318        );
319
320        Self {
321            backend,
322            partition_config,
323            execution_id,
324            attempt_index,
325            attempt_id,
326            lease_id,
327            lease_epoch,
328            lease_ttl_ms,
329            lane_id,
330            worker_instance_id,
331            handle,
332            input_payload,
333            execution_kind,
334            tags,
335            renewal_handle,
336            renewal_stop,
337            renewal_failures,
338            terminal_op_called: AtomicBool::new(false),
339            _concurrency_permit: None,
340        }
341    }
342
343    /// Attach a concurrency permit from the worker's semaphore.
344    /// The permit is held for the task's lifetime and released on drop.
345    #[allow(dead_code)]
346    pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
347        self._concurrency_permit = Some(permit);
348    }
349
350    // ── Accessors ──
351
352    pub fn execution_id(&self) -> &ExecutionId {
353        &self.execution_id
354    }
355
356    pub fn attempt_index(&self) -> AttemptIndex {
357        self.attempt_index
358    }
359
360    pub fn attempt_id(&self) -> &AttemptId {
361        &self.attempt_id
362    }
363
364    pub fn lease_id(&self) -> &LeaseId {
365        &self.lease_id
366    }
367
368    pub fn lease_epoch(&self) -> LeaseEpoch {
369        self.lease_epoch
370    }
371
372    pub fn input_payload(&self) -> &[u8] {
373        &self.input_payload
374    }
375
376    pub fn execution_kind(&self) -> &str {
377        &self.execution_kind
378    }
379
380    pub fn tags(&self) -> &HashMap<String, String> {
381        &self.tags
382    }
383
384    pub fn lane_id(&self) -> &LaneId {
385        &self.lane_id
386    }
387
388    /// Read frames from this task's attempt stream.
389    ///
390    /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
391    /// no longer need the `ferriskey::Client` leak (#87). See
392    /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
393    /// for the backend-level contract.
394    ///
395    /// Validation (count_limit bounds) runs at this boundary — out-of-range
396    /// input surfaces as [`SdkError::Config`] before reaching the backend.
397    ///
398    /// Gated on `valkey-default` — the backing
399    /// [`EngineBackend::read_stream`] trait method requires ff-core's
400    /// `streaming` feature, which the `valkey-default` feature set
401    /// activates. Sqlite-only builds (`--no-default-features,
402    /// features = ["sqlite"]`) do not expose this method today.
403    #[cfg(feature = "valkey-default")]
404    pub async fn read_stream(
405        &self,
406        from: StreamCursor,
407        to: StreamCursor,
408        count_limit: u64,
409    ) -> Result<StreamFrames, SdkError> {
410        validate_stream_read_count(count_limit)?;
411        Ok(self
412            .backend
413            .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
414            .await?)
415    }
416
417    /// Tail this task's attempt stream for new frames.
418    ///
419    /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
420    /// for the head-of-line warning — the task's backend is shared
421    /// with claim/complete/fail hot paths. Consumers that need
422    /// isolation should build a dedicated `EngineBackend` for tail
423    /// reads.
424    ///
425    /// Gated on `valkey-default` — see [`Self::read_stream`].
426    #[cfg(feature = "valkey-default")]
427    pub async fn tail_stream(
428        &self,
429        after: StreamCursor,
430        block_ms: u64,
431        count_limit: u64,
432    ) -> Result<StreamFrames, SdkError> {
433        self.tail_stream_with_visibility(
434            after,
435            block_ms,
436            count_limit,
437            ff_core::backend::TailVisibility::All,
438        )
439        .await
440    }
441
442    /// Tail with an explicit [`TailVisibility`](ff_core::backend::TailVisibility)
443    /// filter (RFC-015 §6). Use [`TailVisibility::ExcludeBestEffort`]
444    /// (ff_core::backend::TailVisibility::ExcludeBestEffort) to drop
445    /// [`StreamMode::BestEffortLive`](ff_core::backend::StreamMode::BestEffortLive)
446    /// frames server-side.
447    ///
448    /// Gated on `valkey-default` — see [`Self::read_stream`].
449    #[cfg(feature = "valkey-default")]
450    pub async fn tail_stream_with_visibility(
451        &self,
452        after: StreamCursor,
453        block_ms: u64,
454        count_limit: u64,
455        visibility: ff_core::backend::TailVisibility,
456    ) -> Result<StreamFrames, SdkError> {
457        if block_ms > MAX_TAIL_BLOCK_MS {
458            return Err(SdkError::Config {
459                context: "tail_stream".into(),
460                field: Some("block_ms".into()),
461                message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
462            });
463        }
464        validate_stream_read_count(count_limit)?;
465        validate_tail_cursor(&after)?;
466        Ok(self
467            .backend
468            .tail_stream(
469                &self.execution_id,
470                self.attempt_index,
471                after,
472                block_ms,
473                count_limit,
474                visibility,
475            )
476            .await?)
477    }
478
479    /// Clone this task's cached attempt handle for passing to a trait
480    /// op (`backend.complete(&handle, …)`, `backend.fail(&handle, …)`,
481    /// etc.).
482    ///
483    /// **RFC-012 Stage 1d (v0.12 PR-5.5).** The handle rides in on the
484    /// claim result and is stored on the struct — per-op calls clone
485    /// it instead of re-synthesising via backend-specific code. The
486    /// `Handle` payload is `Box<[u8]> + 2 small enums`, so a clone is
487    /// effectively one heap allocation — the same cost as the
488    /// pre-PR-5.5 `cloned_handle`.
489    fn cloned_handle(&self) -> Handle {
490        self.handle.clone()
491    }
492
493    /// Check if the lease is likely still valid based on renewal success.
494    ///
495    /// Returns `false` if 3 or more consecutive renewal attempts have failed.
496    /// Workers should check this before committing expensive or irreversible
497    /// side effects. A `false` return means Valkey may have already expired
498    /// the lease and another worker could be processing this execution.
499    pub fn is_lease_healthy(&self) -> bool {
500        self.renewal_failures.load(Ordering::Relaxed) < 3
501    }
502
503    /// Number of consecutive lease renewal failures since the last success.
504    ///
505    /// Returns 0 when renewals are working normally. Useful for observability
506    /// and custom health policies beyond the default threshold of 3.
507    pub fn consecutive_renewal_failures(&self) -> u32 {
508        self.renewal_failures.load(Ordering::Relaxed)
509    }
510
511    // ── Terminal operations (consume self) ──
512
513    /// Delay the execution until `delay_until`.
514    ///
515    /// Releases the lease. The execution moves to `delayed` state.
516    /// Consumes self — the task cannot be used after delay.
517    pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
518        // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
519        // the pre-migration `stop_renewal` contract: called only when
520        // the FCALL round-tripped (Ok or a typed Lua/engine error);
521        // raw transport errors bubble up without stopping renewal so
522        // the `Drop` warning still fires if the caller later drops
523        // `self` without retrying.
524        let handle = self.cloned_handle();
525        let out = self.backend.delay(&handle, delay_until).await;
526        if fcall_landed(&out) {
527            self.stop_renewal();
528        }
529        out.map_err(SdkError::from)
530    }
531
532    /// Move execution to waiting_children state.
533    ///
534    /// Releases the lease. The execution waits for child dependencies to complete.
535    /// Consumes self.
536    pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
537        // RFC-012 Stage 1b: forwards through `backend.wait_children`.
538        // See `delay_execution` for the stop_renewal contract.
539        let handle = self.cloned_handle();
540        let out = self.backend.wait_children(&handle).await;
541        if fcall_landed(&out) {
542            self.stop_renewal();
543        }
544        out.map_err(SdkError::from)
545    }
546
547    /// Complete the execution successfully.
548    ///
549    /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
550    /// Renewal continues during the FCALL to prevent lease expiry under
551    /// network latency — the Lua fences on lease_id+epoch atomically.
552    /// Consumes self — the task cannot be used after completion.
553    ///
554    /// # Connection errors and replay
555    ///
556    /// If the Valkey connection drops after the Lua commit but before the
557    /// client reads the response, retrying `complete()` with the same
558    /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
559    /// reconciles the "already terminal with matching outcome" response
560    /// into `Ok(())`. A retry after a different terminal op has raced in
561    /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
562    /// populated `terminal_outcome` so the caller can see what actually
563    /// happened.
564    pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
565        // RFC-012 Stage 1b: forwards through `backend.complete`.
566        // The FCALL body + the `ExecutionNotActive` replay reconciliation
567        // (match same epoch + attempt_id + outcome=="success" → Ok)
568        // moved to `ff_backend_valkey::complete_impl`.
569        let handle = self.cloned_handle();
570        let out = self.backend.complete(&handle, result_payload).await;
571        if fcall_landed(&out) {
572            self.stop_renewal();
573        }
574        out.map_err(SdkError::from)
575    }
576
577    /// Fail the execution with a reason and error category.
578    ///
579    /// If the execution policy allows retries, the engine schedules a retry
580    /// (delayed backoff). Otherwise, the execution becomes terminal failed.
581    /// Returns [`FailOutcome`] so the caller knows what happened.
582    ///
583    /// # Connection errors and replay
584    ///
585    /// `fail()` is replay-safe under the same conditions as `complete()`:
586    /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
587    /// reconciled into the outcome the server actually committed
588    /// (`TerminalFailed` if no retries left; `RetryScheduled` with
589    /// `delay_until = 0` if a retry was scheduled — the exact delay is
590    /// not recovered on the replay path).
591    pub async fn fail(
592        self,
593        reason: &str,
594        error_category: &str,
595    ) -> Result<FailOutcome, SdkError> {
596        // RFC-012 Stage 1b: forwards through `backend.fail`.
597        // The FCALL body + retry-policy read + the two-shape replay
598        // reconciliation (terminal/failed → TerminalFailed, runnable
599        // → RetryScheduled{0}) moved to
600        // `ff_backend_valkey::fail_impl`.
601        //
602        // **Preserving the caller's raw category string.** The
603        // pre-Stage-1b code passed `error_category` straight to the
604        // Lua `failure_category` ARGV. Real callers use arbitrary
605        // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
606        // `"inference_error"`, …) that do not correspond to the 5
607        // named `FailureClass` variants. A lossy enum conversion
608        // would rewrite every non-canonical category to
609        // `Transient` — a real behavior change for
610        // ff-readiness-tests + the examples crates.
611        //
612        // Instead: pack the raw category bytes into
613        // `FailureReason.detail`; `fail_impl` reads them back and
614        // threads them to Lua verbatim. The `classification` enum
615        // is derived from the string for the few consumers who
616        // match on the trait return today (none in-workspace).
617        // Stage 1d (or issue #117) widens `FailureClass` with a
618        // `Custom(String)` arm; this stash carrier retires then.
619        let handle = self.cloned_handle();
620        let failure_reason = ff_core::backend::FailureReason::with_detail(
621            reason.to_owned(),
622            error_category.as_bytes().to_vec(),
623        );
624        let classification = error_category_to_class(error_category);
625        let out = self.backend.fail(&handle, failure_reason, classification).await;
626        if fcall_landed(&out) {
627            self.stop_renewal();
628        }
629        out.map_err(SdkError::from)
630    }
631
632    /// Cancel the execution.
633    ///
634    /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
635    /// Consumes self.
636    ///
637    /// # Connection errors and replay
638    ///
639    /// `cancel()` is replay-safe under the same conditions as `complete()`:
640    /// a retry by the same caller matching `lease_epoch` + `attempt_id`
641    /// returns `Ok(())` if the server's stored `terminal_outcome` is
642    /// `cancelled`. A retry that finds a different outcome (because a
643    /// concurrent `complete()` or `fail()` won the race) surfaces as
644    /// `ExecutionNotActive` with the populated `terminal_outcome` so the
645    /// caller can see that the cancel intent was NOT honored.
646    pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
647        // RFC-012 Stage 1b: forwards through `backend.cancel`.
648        // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
649        // and the `ExecutionNotActive` → outcome=="cancelled" replay
650        // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
651        let handle = self.cloned_handle();
652        let out = self.backend.cancel(&handle, reason).await;
653        if fcall_landed(&out) {
654            self.stop_renewal();
655        }
656        out.map_err(SdkError::from)
657    }
658
659    // ── Non-terminal operations ──
660
661    /// Manually renew the lease.
662    ///
663    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
664    /// trait. The background renewal task calls
665    /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
666    /// this method is the public entry point for workers that want
667    /// to force a renew out-of-band.
668    pub async fn renew_lease(&self) -> Result<(), SdkError> {
669        let handle = self.cloned_handle();
670        self.backend
671            .renew(&handle)
672            .await
673            .map(|_renewal| ())
674            .map_err(SdkError::from)
675    }
676
677    /// Update progress (pct 0-100 and optional message).
678    ///
679    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
680    /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
681    ///
682    /// # Not for stream frames
683    ///
684    /// `update_progress` writes the `progress_percent` / `progress_message`
685    /// fields on `exec_core` (the execution's state hash). It is a
686    /// scalar heartbeat — each call overwrites the previous value and
687    /// nothing is appended to the output stream. Producers emitting
688    /// stream frames (arbitrary `frame_type` + payload, consumed via
689    /// `ClaimedTask::read_stream` / `tail_stream` or the HTTP
690    /// stream-tail routes) MUST use [`Self::append_frame`] instead;
691    /// `update_progress` is invisible to stream consumers.
692    pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
693        let handle = self.cloned_handle();
694        self.backend
695            .progress(&handle, Some(pct), Some(message.to_owned()))
696            .await
697            .map_err(SdkError::from)
698    }
699
700    /// Report usage against a budget and check limits.
701    ///
702    /// Non-consuming — the worker can report usage multiple times.
703    /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
704    /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
705    ///
706    /// **RFC-012 §R7.2.3:** forwards through
707    /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
708    /// The trait's `UsageDimensions.dedup_key` carries the raw key;
709    /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
710    /// wrap so the dedup state co-locates with the budget partition
711    /// (PR #108).
712    pub async fn report_usage(
713        &self,
714        budget_id: &BudgetId,
715        dimensions: &[(&str, u64)],
716        dedup_key: Option<&str>,
717    ) -> Result<ReportUsageResult, SdkError> {
718        let handle = self.cloned_handle();
719        let mut dims = ff_core::backend::UsageDimensions::default();
720        for (name, delta) in dimensions {
721            dims.custom.insert((*name).to_owned(), *delta);
722        }
723        dims.dedup_key = dedup_key
724            .filter(|k| !k.is_empty())
725            .map(|k| k.to_owned());
726        self.backend
727            .report_usage(&handle, budget_id, dims)
728            .await
729            .map_err(SdkError::from)
730    }
731
732    /// Create a pending waitpoint for future signal delivery.
733    ///
734    /// Non-consuming — the worker keeps the lease. Signals delivered to the
735    /// waitpoint are buffered. When the worker later calls `suspend()` with
736    /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
737    /// resume condition.
738    ///
739    /// Returns both the waitpoint_id AND the HMAC token required by external
740    /// callers to buffer signals against this pending waitpoint
741    /// (RFC-004 §Waitpoint Security).
742    ///
743    /// **RFC-012 §R7.2.2:** forwards through
744    /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
745    /// The trait returns
746    /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
747    /// `hmac_token` is the same wire HMAC this method has always
748    /// produced; the SDK unwraps it back to the historical
749    /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
750    pub async fn create_pending_waitpoint(
751        &self,
752        waitpoint_key: &str,
753        expires_in_ms: u64,
754    ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
755        let handle = self.cloned_handle();
756        let expires_in = std::time::Duration::from_millis(expires_in_ms);
757        let pending = self
758            .backend
759            .create_waitpoint(&handle, waitpoint_key, expires_in)
760            .await
761            .map_err(SdkError::from)?;
762        // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
763        // `.token()` accessor borrows the underlying
764        // `WaitpointToken`, which is the caller's historical return
765        // shape.
766        Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
767    }
768
769    // ── Phase 4: Streaming ──
770
771    /// Append a frame to the current attempt's output stream.
772    ///
773    /// Non-consuming — the worker can append many frames during execution.
774    /// The stream is created lazily on the first append.
775    ///
776    /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
777    /// `EngineBackend` trait. The free-form `frame_type` tag and
778    /// optional `metadata` (wire `correlation_id`) travel on the
779    /// extended [`ff_core::backend::Frame`] shape (`frame_type:
780    /// String`, `correlation_id: Option<String>`), giving byte-for-byte
781    /// wire parity with the pre-migration direct-FCALL path.
782    ///
783    /// # `append_frame` vs [`Self::update_progress`]
784    ///
785    /// Stream-frame producers (arbitrary `frame_type` + payload
786    /// consumed via `ClaimedTask::read_stream` / `tail_stream` or the
787    /// HTTP stream-tail routes) MUST use `append_frame`.
788    /// `update_progress` writes scalar `progress_percent` /
789    /// `progress_message` fields to `exec_core` and is invisible to
790    /// stream consumers.
791    pub async fn append_frame(
792        &self,
793        frame_type: &str,
794        payload: &[u8],
795        metadata: Option<&str>,
796    ) -> Result<AppendFrameOutcome, SdkError> {
797        self.append_frame_with_mode(
798            frame_type,
799            payload,
800            metadata,
801            ff_core::backend::StreamMode::Durable,
802        )
803        .await
804    }
805
806    /// Append a frame under an explicit RFC-015
807    /// [`StreamMode`](ff_core::backend::StreamMode).
808    /// Defaults via [`Self::append_frame`] preserve pre-015 behaviour
809    /// ([`StreamMode::Durable`](ff_core::backend::StreamMode::Durable)).
810    pub async fn append_frame_with_mode(
811        &self,
812        frame_type: &str,
813        payload: &[u8],
814        metadata: Option<&str>,
815        mode: ff_core::backend::StreamMode,
816    ) -> Result<AppendFrameOutcome, SdkError> {
817        let handle = self.cloned_handle();
818        let mut frame = ff_core::backend::Frame::new(
819            payload.to_vec(),
820            ff_core::backend::FrameKind::Event,
821        )
822        .with_frame_type(frame_type)
823        .with_mode(mode);
824        if let Some(cid) = metadata {
825            frame = frame.with_correlation_id(cid);
826        }
827        self.backend
828            .append_frame(&handle, frame)
829            .await
830            .map_err(SdkError::from)
831    }
832
833    // ── RFC-013 Stage 1d: Suspend ──
834
835    /// **Strict suspend.** Consumes `self` and yields a
836    /// [`SuspendedHandle`] or errors. On the early-satisfied path
837    /// (buffered signals already matched the condition), returns
838    /// `EngineError::State(AlreadySatisfied)` — use `try_suspend` when
839    /// that branch is part of the flow's happy path.
840    ///
841    /// RFC-013 §5.1 — this is the classic Rust `foo` / `try_foo`
842    /// split; `suspend` is the unconditional form. Defaults
843    /// `WaitpointBinding::fresh()` for the waitpoint. For pending
844    /// waitpoints previously issued via `create_pending_waitpoint`,
845    /// use `try_suspend_on_pending`.
846    pub async fn suspend(
847        self,
848        reason_code: SuspensionReasonCode,
849        resume_condition: ResumeCondition,
850        timeout: Option<(TimestampMs, TimeoutBehavior)>,
851        resume_policy: ResumePolicy,
852    ) -> Result<SuspendedHandle, SdkError> {
853        let outcome = self
854            .try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
855            .await?;
856        match outcome {
857            TrySuspendOutcome::Suspended(h) => Ok(h),
858            TrySuspendOutcome::AlreadySatisfied { .. } => Err(SdkError::from(
859                crate::EngineError::State(StateKind::AlreadySatisfied),
860            )),
861        }
862    }
863
864    /// **Fallible suspend.** On the early-satisfied path, the
865    /// `ClaimedTask` is handed back unchanged so the worker can
866    /// continue running against the retained lease.
867    ///
868    /// RFC-013 §5.1 — use this when consuming a pending waitpoint whose
869    /// buffered signals may already match the condition.
870    pub async fn try_suspend(
871        self,
872        reason_code: SuspensionReasonCode,
873        resume_condition: ResumeCondition,
874        timeout: Option<(TimestampMs, TimeoutBehavior)>,
875        resume_policy: ResumePolicy,
876    ) -> Result<TrySuspendOutcome, SdkError> {
877        self.try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
878            .await
879    }
880
881    /// Convenience: `try_suspend` against a pending waitpoint previously
882    /// issued via `create_pending_waitpoint`.
883    pub async fn try_suspend_on_pending(
884        self,
885        pending: &PendingWaitpoint,
886        reason_code: SuspensionReasonCode,
887        resume_condition: ResumeCondition,
888        timeout: Option<(TimestampMs, TimeoutBehavior)>,
889        resume_policy: ResumePolicy,
890    ) -> Result<TrySuspendOutcome, SdkError> {
891        self.try_suspend_inner(
892            WaitpointBinding::use_pending(pending),
893            reason_code,
894            resume_condition,
895            timeout,
896            resume_policy,
897        )
898        .await
899    }
900
901    async fn try_suspend_inner(
902        self,
903        waitpoint: WaitpointBinding,
904        reason_code: SuspensionReasonCode,
905        resume_condition: ResumeCondition,
906        timeout: Option<(TimestampMs, TimeoutBehavior)>,
907        resume_policy: ResumePolicy,
908    ) -> Result<TrySuspendOutcome, SdkError> {
909        let handle = self.cloned_handle();
910        let (timeout_at, timeout_behavior) = match timeout {
911            Some((at, b)) => (Some(at), b),
912            None => (None, TimeoutBehavior::Fail),
913        };
914        // If the caller passed `ResumeCondition::Single { waitpoint_key }`
915        // alongside the default `WaitpointBinding::fresh()` (which mints a
916        // random internal key), rebind the Fresh binding to use the
917        // condition's key so RFC-013 §2.4 "waitpoint_key cross-field
918        // invariant" is honored. UsePending retains its own binding.
919        // RFC-014: for `Composite`, rebind to the first waitpoint_key
920        // observed in the composite tree (Single.waitpoint_key or
921        // Count.waitpoints[0]); consumers using single-waitpoint
922        // composites must keep all Single/Count waitpoint_keys equal
923        // (see RFC-014 single-waitpoint scoping).
924        let waitpoint = match (&waitpoint, &resume_condition) {
925            (
926                WaitpointBinding::Fresh { waitpoint_id, .. },
927                ResumeCondition::Single { waitpoint_key, .. },
928            ) => WaitpointBinding::Fresh {
929                waitpoint_id: waitpoint_id.clone(),
930                waitpoint_key: waitpoint_key.clone(),
931            },
932            (
933                WaitpointBinding::Fresh { waitpoint_id, .. },
934                ResumeCondition::Composite(body),
935            ) => {
936                if let Some(key) = composite_first_waitpoint_key(body) {
937                    WaitpointBinding::Fresh {
938                        waitpoint_id: waitpoint_id.clone(),
939                        waitpoint_key: key,
940                    }
941                } else {
942                    waitpoint
943                }
944            }
945            _ => waitpoint,
946        };
947        let mut args = SuspendArgs::new(
948            SuspensionId::new(),
949            waitpoint,
950            resume_condition,
951            resume_policy,
952            reason_code,
953            TimestampMs::now(),
954        )
955        .with_requester(SuspensionRequester::Worker);
956        if let Some(at) = timeout_at {
957            args = args.with_timeout(at, timeout_behavior);
958        }
959
960        let outcome = self
961            .backend
962            .suspend(&handle, args)
963            .await
964            .map_err(SdkError::from)?;
965        match outcome {
966            SuspendOutcome::Suspended { details, handle: new_handle } => {
967                // Renewal is stopped only AFTER a successful Suspended
968                // outcome. The suspended-kind handle is what the caller
969                // will later hand to `observe_signals` / `claim_from_resume_grant`.
970                self.stop_renewal();
971                Ok(TrySuspendOutcome::Suspended(SuspendedHandle {
972                    handle: new_handle,
973                    details,
974                }))
975            }
976            SuspendOutcome::AlreadySatisfied { details } => {
977                // Lease is retained Lua-side; keep the ClaimedTask alive
978                // so the worker continues against the existing lease.
979                Ok(TrySuspendOutcome::AlreadySatisfied {
980                    task: self,
981                    details,
982                })
983            }
984            _ => Err(SdkError::from(ScriptError::Parse {
985                fcall: "try_suspend_inner".into(),
986                execution_id: None,
987                message: "unexpected SuspendOutcome variant".into(),
988            })),
989        }
990    }
991
992    /// Read the signals that satisfied the waitpoint and triggered this
993    /// resume.
994    ///
995    /// Non-consuming. Intended to be called immediately after re-claim via
996    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
997    /// subsequent `suspend()` (which replaces `suspension:current`).
998    ///
999    /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
1000    ///
1001    /// - No prior suspension on this execution.
1002    /// - The prior suspension belonged to an earlier attempt (e.g. the
1003    ///   attempt was cancelled/failed and a retry is now claiming).
1004    /// - The prior suspension was closed by timeout / cancel / operator
1005    ///   override rather than by a matched signal.
1006    ///
1007    /// Reads `suspension:current` once, filters by `attempt_index` to
1008    /// guard against stale prior-attempt records, then fetches the matched
1009    /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
1010    /// fields and reads each signal's metadata + payload directly.
1011    pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
1012        // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
1013        // Pre-migration body (HGETALL suspension_current + HMGET
1014        // matchers + pipelined HGETALL signal_hash / GET
1015        // signal_payload) lives in
1016        // `ff_backend_valkey::observe_signals_impl`.
1017        let handle = self.cloned_handle();
1018        self.backend
1019            .observe_signals(&handle)
1020            .await
1021            .map_err(SdkError::from)
1022    }
1023
1024    /// Signal the renewal task to stop. Called by every terminal op
1025    /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
1026    /// `move_to_waiting_children`) after the FCALL returns. Also marks
1027    /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
1028    /// consumption from a genuine drop-without-terminal-op.
1029    fn stop_renewal(&self) {
1030        self.terminal_op_called.store(true, Ordering::Release);
1031        self.renewal_stop.notify_one();
1032    }
1033
1034}
1035
1036/// True iff the backend's FCALL result represents a round-trip that
1037/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
1038/// contention, conflict, state, bug) all count as "landed" — the
1039/// server either committed or rejected with a typed response. Only
1040/// raw `Transport` errors (connection drops, request timeouts, parse
1041/// failures) count as "did not land", which matches the pre-Stage-1b
1042/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
1043/// — those errors returned before `stop_renewal()` ran, preserving
1044/// the `Drop` warning for genuine "lease will leak" cases.
1045///
1046/// Stage 1b terminal-op forwarders use this predicate to decide
1047/// whether to call `stop_renewal()`: yes for landed responses, no
1048/// for transport errors so the caller's retry path still sees a
1049/// running renewal task.
1050fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
1051    match r {
1052        Ok(_) => true,
1053        Err(crate::EngineError::Transport { .. }) => false,
1054        Err(_) => true,
1055    }
1056}
1057
1058/// Map the SDK's free-form `error_category: &str` to the typed
1059/// `FailureClass` the trait's `fail` method takes. Unknown categories
1060/// fall through to `Transient` — the Lua side already tolerated
1061/// arbitrary strings, so the worst a category drift does under the
1062/// Stage 1b forwarder is reclassify a novel category as transient.
1063/// Stage 1d (or issue #117) widens `FailureClass` with a
1064/// `Custom(String)` arm for exact round-trip.
1065fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
1066    use ff_core::backend::FailureClass;
1067    match s {
1068        "transient" => FailureClass::Transient,
1069        "permanent" => FailureClass::Permanent,
1070        "infra_crash" => FailureClass::InfraCrash,
1071        "timeout" => FailureClass::Timeout,
1072        "cancelled" => FailureClass::Cancelled,
1073        _ => FailureClass::Transient,
1074    }
1075}
1076
1077impl Drop for ClaimedTask {
1078    fn drop(&mut self) {
1079        // Abort the background renewal task on drop.
1080        // This is a safety net — complete/fail/cancel already stop renewal
1081        // via notify before consuming self. But if the task is dropped
1082        // without being consumed (e.g., panic), abort prevents leaked renewals.
1083        //
1084        // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1085        // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1086        // and then self is consumed into Drop immediately. The renewal task
1087        // has not yet been polled by the runtime, so `is_finished()` is still
1088        // `false` here — which previously fired the warning on every
1089        // complete/fail/cancel/suspend call. `terminal_op_called` is the
1090        // authoritative signal that a terminal-op path ran to the point of
1091        // stopping renewal; it does not by itself certify the Lua side
1092        // succeeded (see the field doc). The caller surfaces any error via
1093        // the op's return value, so a `Drop` warning is unneeded there.
1094        if !self.terminal_op_called.load(Ordering::Acquire) {
1095            tracing::warn!(
1096                execution_id = %self.execution_id,
1097                "ClaimedTask dropped without terminal operation — lease will expire"
1098            );
1099        }
1100        self.renewal_handle.abort();
1101    }
1102}
1103
1104// ── Lease renewal ──
1105
1106/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1107/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1108/// hooks still see one span per renewal (restores PR #119 Cursor
1109/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1110/// construction time, not per-tick).
1111///
1112/// See `benches/harness/src/bin/long_running.rs` for the bench
1113/// consumer that depends on this span naming.
1114#[tracing::instrument(
1115    name = "renew_lease",
1116    skip_all,
1117    fields(execution_id = %execution_id)
1118)]
1119async fn renew_once(
1120    backend: &dyn EngineBackend,
1121    handle: &Handle,
1122    execution_id: &ExecutionId,
1123) -> Result<(), crate::EngineError> {
1124    backend.renew(handle).await.map(|_| ())
1125}
1126
1127/// Spawn a background tokio task that renews the lease at `ttl / 3`
1128/// intervals.
1129///
1130/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1131/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1132/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1133/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1134/// + the encoded `Handle` instead. Per-tick tracing lives on
1135///   [`renew_once`]; this function itself is sync + one-shot.
1136///
1137/// Stops when:
1138/// - `stop_signal` is notified (complete/fail/cancel called)
1139/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1140/// - The task handle is aborted (ClaimedTask dropped)
1141fn spawn_renewal_task(
1142    backend: Arc<dyn EngineBackend>,
1143    handle: Handle,
1144    execution_id: ExecutionId,
1145    lease_ttl_ms: u64,
1146    stop_signal: Arc<Notify>,
1147    failure_counter: Arc<AtomicU32>,
1148) -> JoinHandle<()> {
1149    // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1150    // panics if a caller (or a misconfigured test) passes a
1151    // lease_ttl_ms < 3. The SDK config validator already enforces
1152    // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1153    // is a cheap belt-and-suspenders (Copilot review finding on
1154    // PR #119).
1155    let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1156
1157    tokio::spawn(async move {
1158        let mut tick = tokio::time::interval(interval);
1159        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1160        // Skip the first immediate tick — the lease was just acquired.
1161        tick.tick().await;
1162
1163        loop {
1164            tokio::select! {
1165                _ = stop_signal.notified() => {
1166                    tracing::debug!(
1167                        execution_id = %execution_id,
1168                        "lease renewal stopped by signal"
1169                    );
1170                    return;
1171                }
1172                _ = tick.tick() => {
1173                    match renew_once(backend.as_ref(), &handle, &execution_id).await {
1174                        Ok(_renewal) => {
1175                            failure_counter.store(0, Ordering::Relaxed);
1176                            tracing::trace!(
1177                                execution_id = %execution_id,
1178                                "lease renewed"
1179                            );
1180                        }
1181                        Err(e) if is_terminal_renewal_error(&e) => {
1182                            failure_counter.fetch_add(1, Ordering::Relaxed);
1183                            tracing::warn!(
1184                                execution_id = %execution_id,
1185                                error = %e,
1186                                "lease renewal failed with terminal error, stopping renewal"
1187                            );
1188                            return;
1189                        }
1190                        Err(e) => {
1191                            let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1192                            tracing::warn!(
1193                                execution_id = %execution_id,
1194                                error = %e,
1195                                consecutive_failures = count,
1196                                "lease renewal failed (will retry next interval)"
1197                            );
1198                        }
1199                    }
1200                }
1201            }
1202        }
1203    })
1204}
1205
1206/// Check if an engine error means renewal should stop permanently.
1207#[allow(dead_code)]
1208fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1209    use crate::{ContentionKind, EngineError, StateKind};
1210    matches!(
1211        err,
1212        EngineError::State(
1213            StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1214        ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1215            | EngineError::NotFound { entity: "execution" }
1216    )
1217}
1218
1219// ── FCALL result parsing ──
1220
1221/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1222/// function into a typed [`ReportUsageResult`].
1223///
1224/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1225///                  `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1226/// Status code `!= 1` is parsed as a [`ScriptError`] via
1227/// [`ScriptError::from_code_with_detail`].
1228///
1229/// Exposed as `pub` so downstream SDKs that speak the same wire format
1230/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1231/// call this directly instead of re-implementing the parse. Keeping one
1232/// parser paired with the producer (the Lua function registered at
1233/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1234/// against silent format drift between producer and consumer.
1235#[cfg(feature = "valkey-default")]
1236pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1237    let arr = match raw {
1238        Value::Array(arr) => arr,
1239        _ => {
1240            return Err(SdkError::from(ScriptError::Parse {
1241                fcall: "parse_report_usage_result".into(),
1242                execution_id: None,
1243                message: "ff_report_usage_and_check: expected Array".into(),
1244            }));
1245        }
1246    };
1247    let status_code = match arr.first() {
1248        Some(Ok(Value::Int(n))) => *n,
1249        _ => {
1250            return Err(SdkError::from(ScriptError::Parse {
1251                fcall: "parse_report_usage_result".into(),
1252                execution_id: None,
1253                message: "ff_report_usage_and_check: expected Int status code".into(),
1254            }));
1255        }
1256    };
1257    if status_code != 1 {
1258        let error_code = usage_field_str(arr, 1);
1259        let detail = usage_field_str(arr, 2);
1260        return Err(SdkError::from(
1261            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1262                ScriptError::Parse {
1263                    fcall: "parse_report_usage_result".into(),
1264                    execution_id: None,
1265                    message: format!("ff_report_usage_and_check: {error_code}"),
1266                }
1267            }),
1268        ));
1269    }
1270    let sub_status = usage_field_str(arr, 1);
1271    match sub_status.as_str() {
1272        "OK" => Ok(ReportUsageResult::Ok),
1273        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1274        "SOFT_BREACH" => {
1275            let dim = usage_field_str(arr, 2);
1276            let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1277            let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1278            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1279        }
1280        "HARD_BREACH" => {
1281            let dim = usage_field_str(arr, 2);
1282            let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1283            let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1284            Ok(ReportUsageResult::HardBreach {
1285                dimension: dim,
1286                current_usage: current,
1287                hard_limit: limit,
1288            })
1289        }
1290        _ => Err(SdkError::from(ScriptError::Parse {
1291            fcall: "parse_report_usage_result".into(),
1292            execution_id: None,
1293            message: format!(
1294            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1295        ),
1296        })),
1297    }
1298}
1299
1300#[cfg(feature = "valkey-default")]
1301fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1302    match arr.get(index) {
1303        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1304        Some(Ok(Value::SimpleString(s))) => s.clone(),
1305        Some(Ok(Value::Int(n))) => n.to_string(),
1306        _ => String::new(),
1307    }
1308}
1309
1310/// Parse a required numeric usage field (u64) from the wire array at
1311/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1312/// holds a non-string/non-int value, or contains a string that does
1313/// not parse as u64.
1314///
1315/// Rationale: the Lua producer (`lua/budget.lua:99`,
1316/// `ff_report_usage_and_check`) always emits
1317/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1318/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1319/// non-numeric value here means the Lua and Rust sides drifted;
1320/// silently coercing to `0` would surface drift as "zero-usage breach"
1321/// — arithmetically correct but semantically nonsense. Fail loudly
1322/// instead so drift shows up as a parse error at the first call site.
1323#[cfg(feature = "valkey-default")]
1324fn parse_usage_u64(
1325    arr: &[Result<Value, ferriskey::Error>],
1326    index: usize,
1327    sub_status: &str,
1328    field_name: &str,
1329) -> Result<u64, SdkError> {
1330    match arr.get(index) {
1331        Some(Ok(Value::Int(n))) => {
1332            u64::try_from(*n).map_err(|_| {
1333                SdkError::from(ScriptError::Parse {
1334                    fcall: "parse_usage_u64".into(),
1335                    execution_id: None,
1336                    message: format!(
1337                    "ff_report_usage_and_check {sub_status}: {field_name} \
1338                     (index {index}) negative int {n} cannot be u64"
1339                ),
1340                })
1341            })
1342        }
1343        Some(Ok(Value::BulkString(b))) => {
1344            let s = String::from_utf8_lossy(b);
1345            s.parse::<u64>().map_err(|_| {
1346                SdkError::from(ScriptError::Parse {
1347                    fcall: "parse_usage_u64".into(),
1348                    execution_id: None,
1349                    message: format!(
1350                    "ff_report_usage_and_check {sub_status}: {field_name} \
1351                     (index {index}) not a u64 string: {s:?}"
1352                ),
1353                })
1354            })
1355        }
1356        Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1357            SdkError::from(ScriptError::Parse {
1358                fcall: "parse_usage_u64".into(),
1359                execution_id: None,
1360                message: format!(
1361                "ff_report_usage_and_check {sub_status}: {field_name} \
1362                 (index {index}) not a u64 string: {s:?}"
1363            ),
1364            })
1365        }),
1366        Some(_) => Err(SdkError::from(ScriptError::Parse {
1367            fcall: "parse_usage_u64".into(),
1368            execution_id: None,
1369            message: format!(
1370            "ff_report_usage_and_check {sub_status}: {field_name} \
1371             (index {index}) wrong wire type (expected Int or String)"
1372        ),
1373        })),
1374        None => Err(SdkError::from(ScriptError::Parse {
1375            fcall: "parse_usage_u64".into(),
1376            execution_id: None,
1377            message: format!(
1378            "ff_report_usage_and_check {sub_status}: {field_name} \
1379             (index {index}) missing from response"
1380        ),
1381        })),
1382    }
1383}
1384
1385/// Pure helper: decide whether `suspension:current` represents a
1386/// signal-driven resume for the currently-claimed attempt, and extract
1387/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1388/// (no record, stale prior-attempt, non-resumed close). Returns an error
1389/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1390/// bug rather than a missing-data case.
1391///
1392/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1393/// `EngineBackend::observe_signals`, which re-implements this invariant
1394/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1395/// tests so the parsing contract stays exercised at the SDK layer —
1396/// Stage 1d will consolidate (either promote the helper into ff-core
1397/// or drop these tests once the backend-side tests cover equivalent
1398/// ground).
1399#[allow(dead_code)]
1400fn resume_waitpoint_id_from_suspension(
1401    susp: &HashMap<String, String>,
1402    claimed_attempt: AttemptIndex,
1403) -> Result<Option<WaitpointId>, SdkError> {
1404    if susp.is_empty() {
1405        return Ok(None);
1406    }
1407    let susp_att: u32 = susp
1408        .get("attempt_index")
1409        .and_then(|s| s.parse().ok())
1410        .unwrap_or(u32::MAX);
1411    if susp_att != claimed_attempt.0 {
1412        return Ok(None);
1413    }
1414    let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1415    if close_reason != "resumed" {
1416        return Ok(None);
1417    }
1418    let wp_id_str = susp
1419        .get("waitpoint_id")
1420        .map(String::as_str)
1421        .unwrap_or_default();
1422    if wp_id_str.is_empty() {
1423        return Ok(None);
1424    }
1425    let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1426        SdkError::from(ScriptError::Parse {
1427            fcall: "resume_waitpoint_id_from_suspension".into(),
1428            execution_id: None,
1429            message: format!(
1430            "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1431        ),
1432        })
1433    })?;
1434    Ok(Some(waitpoint_id))
1435}
1436
1437// v0.12 PR-5: inline SDK callers removed (the `claim_next` scanner
1438// now routes through `EngineBackend::issue_claim_grant` +
1439// `block_route`, whose Valkey impls own the parse). The helper is
1440// retained for the regression tests in this module (terminal-op
1441// replay detail extraction) until they move to ff-backend-valkey;
1442// future PRs may delete both the helper and its tests together.
1443#[cfg(feature = "valkey-default")]
1444#[allow(dead_code)]
1445pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1446    let arr = match raw {
1447        Value::Array(arr) => arr,
1448        _ => {
1449            return Err(SdkError::from(ScriptError::Parse {
1450                fcall: "parse_success_result".into(),
1451                execution_id: None,
1452                message: format!(
1453                "{function_name}: expected Array, got non-array"
1454            ),
1455            }));
1456        }
1457    };
1458
1459    if arr.is_empty() {
1460        return Err(SdkError::from(ScriptError::Parse {
1461            fcall: "parse_success_result".into(),
1462            execution_id: None,
1463            message: format!(
1464            "{function_name}: empty result array"
1465        ),
1466        }));
1467    }
1468
1469    let status_code = match arr.first() {
1470        Some(Ok(Value::Int(n))) => *n,
1471        _ => {
1472            return Err(SdkError::from(ScriptError::Parse {
1473                fcall: "parse_success_result".into(),
1474                execution_id: None,
1475                message: format!(
1476                "{function_name}: expected Int at index 0"
1477            ),
1478            }));
1479        }
1480    };
1481
1482    if status_code == 1 {
1483        Ok(())
1484    } else {
1485        // Extract error code from index 1 and optional detail from index 2
1486        // (e.g. `capability_mismatch` ships missing tokens there). Variants
1487        // that carry a String payload pick the detail up via
1488        // `from_code_with_detail`; other variants ignore it.
1489        let field_str = |idx: usize| -> String {
1490            arr.get(idx)
1491                .and_then(|v| match v {
1492                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1493                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1494                    _ => None,
1495                })
1496                .unwrap_or_default()
1497        };
1498        let error_code = {
1499            let s = field_str(1);
1500            if s.is_empty() { "unknown".to_owned() } else { s }
1501        };
1502        // Collect all detail slots (idx >= 2). Most variants only read
1503        // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1504        // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1505        // reconciliation after a network drop.
1506        let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1507        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1508
1509        let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1510            .unwrap_or_else(|| {
1511                ScriptError::Parse {
1512                    fcall: "parse_success_result".into(),
1513                    execution_id: None,
1514                    message: format!("{function_name}: unknown error: {error_code}"),
1515                }
1516            });
1517
1518        Err(SdkError::from(script_err))
1519    }
1520}
1521
1522// RFC-013 Stage 1d — `parse_suspend_result` removed. The backend impl
1523// in `ff_backend_valkey` now parses the Lua return and produces a typed
1524// `SuspendOutcome`; the SDK forwarder consumes that directly.
1525
1526/// Parse ff_deliver_signal result:
1527///   ok(signal_id, effect)
1528///   ok_duplicate(existing_signal_id)
1529#[cfg(feature = "valkey-default")]
1530pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1531    let arr = match raw {
1532        Value::Array(arr) => arr,
1533        _ => {
1534            return Err(SdkError::from(ScriptError::Parse {
1535                fcall: "parse_signal_result".into(),
1536                execution_id: None,
1537                message: "ff_deliver_signal: expected Array".into(),
1538            }));
1539        }
1540    };
1541
1542    let status_code = match arr.first() {
1543        Some(Ok(Value::Int(n))) => *n,
1544        _ => {
1545            return Err(SdkError::from(ScriptError::Parse {
1546                fcall: "parse_signal_result".into(),
1547                execution_id: None,
1548                message: "ff_deliver_signal: bad status code".into(),
1549            }));
1550        }
1551    };
1552
1553    if status_code != 1 {
1554        let err_field_str = |idx: usize| -> String {
1555            arr.get(idx)
1556                .and_then(|v| match v {
1557                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1558                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1559                    _ => None,
1560                })
1561                .unwrap_or_default()
1562        };
1563        let error_code = {
1564            let s = err_field_str(1);
1565            if s.is_empty() { "unknown".to_owned() } else { s }
1566        };
1567        let detail = err_field_str(2);
1568        return Err(SdkError::from(
1569            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1570                ScriptError::Parse {
1571                    fcall: "parse_signal_result".into(),
1572                    execution_id: None,
1573                    message: format!("ff_deliver_signal: {error_code}"),
1574                }
1575            }),
1576        ));
1577    }
1578
1579    let sub_status = arr
1580        .get(1)
1581        .and_then(|v| match v {
1582            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1583            Ok(Value::SimpleString(s)) => Some(s.clone()),
1584            _ => None,
1585        })
1586        .unwrap_or_default();
1587
1588    if sub_status == "DUPLICATE" {
1589        let existing_id = arr
1590            .get(2)
1591            .and_then(|v| match v {
1592                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1593                Ok(Value::SimpleString(s)) => Some(s.clone()),
1594                _ => None,
1595            })
1596            .unwrap_or_default();
1597        return Ok(SignalOutcome::Duplicate {
1598            existing_signal_id: existing_id,
1599        });
1600    }
1601
1602    // Parse: {1, "OK", signal_id, effect}
1603    let signal_id_str = arr
1604        .get(2)
1605        .and_then(|v| match v {
1606            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1607            Ok(Value::SimpleString(s)) => Some(s.clone()),
1608            _ => None,
1609        })
1610        .unwrap_or_default();
1611
1612    let effect = arr
1613        .get(3)
1614        .and_then(|v| match v {
1615            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1616            Ok(Value::SimpleString(s)) => Some(s.clone()),
1617            _ => None,
1618        })
1619        .unwrap_or_default();
1620
1621    let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1622        SdkError::from(ScriptError::Parse {
1623            fcall: "parse_signal_result".into(),
1624            execution_id: None,
1625            message: format!(
1626            "ff_deliver_signal: invalid signal_id from Lua: {e}"
1627        ),
1628        })
1629    })?;
1630
1631    if effect == "resume_condition_satisfied" {
1632        Ok(SignalOutcome::TriggeredResume { signal_id })
1633    } else {
1634        Ok(SignalOutcome::Accepted { signal_id, effect })
1635    }
1636}
1637
1638/// Parse ff_fail_execution result:
1639///   ok("retry_scheduled", delay_until)
1640///   ok("terminal_failed")
1641#[cfg(feature = "valkey-default")]
1642#[allow(dead_code)]
1643fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1644    let arr = match raw {
1645        Value::Array(arr) => arr,
1646        _ => {
1647            return Err(SdkError::from(ScriptError::Parse {
1648                fcall: "parse_fail_result".into(),
1649                execution_id: None,
1650                message: "ff_fail_execution: expected Array".into(),
1651            }));
1652        }
1653    };
1654
1655    let status_code = match arr.first() {
1656        Some(Ok(Value::Int(n))) => *n,
1657        _ => {
1658            return Err(SdkError::from(ScriptError::Parse {
1659                fcall: "parse_fail_result".into(),
1660                execution_id: None,
1661                message: "ff_fail_execution: bad status code".into(),
1662            }));
1663        }
1664    };
1665
1666    if status_code != 1 {
1667        let err_field_str = |idx: usize| -> String {
1668            arr.get(idx)
1669                .and_then(|v| match v {
1670                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1671                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1672                    _ => None,
1673                })
1674                .unwrap_or_default()
1675        };
1676        let error_code = {
1677            let s = err_field_str(1);
1678            if s.is_empty() { "unknown".to_owned() } else { s }
1679        };
1680        let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1681        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1682        return Err(SdkError::from(
1683            ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1684                ScriptError::Parse {
1685                    fcall: "parse_fail_result".into(),
1686                    execution_id: None,
1687                    message: format!("ff_fail_execution: {error_code}"),
1688                }
1689            }),
1690        ));
1691    }
1692
1693    // Parse sub-status from field[2] (index 2 = first field after status+OK)
1694    let sub_status = arr
1695        .get(2)
1696        .and_then(|v| match v {
1697            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1698            Ok(Value::SimpleString(s)) => Some(s.clone()),
1699            _ => None,
1700        })
1701        .unwrap_or_default();
1702
1703    match sub_status.as_str() {
1704        "retry_scheduled" => {
1705            // Lua returns: ok("retry_scheduled", tostring(delay_until))
1706            // arr[3] = delay_until
1707            let delay_str = arr
1708                .get(3)
1709                .and_then(|v| match v {
1710                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1711                    Ok(Value::Int(n)) => Some(n.to_string()),
1712                    _ => None,
1713                })
1714                .unwrap_or_default();
1715            let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1716
1717            Ok(FailOutcome::RetryScheduled {
1718                delay_until: TimestampMs::from_millis(delay_until),
1719            })
1720        }
1721        "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1722        _ => Err(SdkError::from(ScriptError::Parse {
1723            fcall: "parse_fail_result".into(),
1724            execution_id: None,
1725            message: format!(
1726            "ff_fail_execution: unexpected sub-status: {sub_status}"
1727        ),
1728        })),
1729    }
1730}
1731
1732// ── Stream read / tail (consumer API, RFC-006 #2) ──
1733
1734/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1735/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1736/// server would accept.
1737pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1738
1739/// Maximum frames per read/tail call. Mirrors
1740/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1741/// callers don't need to import ff-core just to read the bound.
1742pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1743
1744/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1745/// signal so polling consumers can exit cleanly.
1746///
1747/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1748pub use ff_core::contracts::StreamFrames;
1749
1750/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1751/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1752/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1753/// `StreamCursor::Start` / `StreamCursor::End` instead.
1754pub use ff_core::contracts::StreamCursor;
1755
1756/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1757/// — XREAD does not accept the open markers. Pulled out as a bare
1758/// function so unit tests can exercise the guard without constructing a
1759/// live `ferriskey::Client`.
1760#[allow(dead_code)]
1761fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1762    if !after.is_concrete() {
1763        return Err(SdkError::Config {
1764            context: "tail_stream".into(),
1765            field: Some("after".into()),
1766            message: "XREAD cursor must be a concrete entry id; pass \
1767                      StreamCursor::from_beginning() to start from the \
1768                      beginning"
1769                .into(),
1770        });
1771    }
1772    Ok(())
1773}
1774
1775#[allow(dead_code)]
1776fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1777    if count_limit == 0 {
1778        return Err(SdkError::Config {
1779            context: "read_stream_frames".into(),
1780            field: Some("count_limit".into()),
1781            message: "count_limit must be >= 1".into(),
1782        });
1783    }
1784    if count_limit > STREAM_READ_HARD_CAP {
1785        return Err(SdkError::Config {
1786            context: "read_stream_frames".into(),
1787            field: Some("count_limit".into()),
1788            message: format!(
1789                "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1790            ),
1791        });
1792    }
1793    Ok(())
1794}
1795
1796/// Read frames from a completed or in-flight attempt's stream.
1797///
1798/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1799/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1800/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1801/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1802/// `0` returns [`SdkError::Config`].
1803///
1804/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1805/// consumers know when the producer has finalized the stream. A
1806/// never-written attempt and an in-progress stream are indistinguishable
1807/// here — both present as `frames=[]`, `closed_at=None`.
1808///
1809/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1810/// client but are not the lease-holding worker — no lease check is
1811/// performed.
1812///
1813/// # Head-of-line note
1814///
1815/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1816/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1817/// calling this on a `client` that is also serving FCALLs stalls those
1818/// FCALLs behind the reply. The REST server isolates reads on its
1819/// `tail_client`; direct SDK callers should either use a dedicated
1820/// client OR paginate through smaller `count_limit` slices.
1821#[cfg(feature = "valkey-default")]
1822pub async fn read_stream(
1823    backend: &dyn EngineBackend,
1824    execution_id: &ExecutionId,
1825    attempt_index: AttemptIndex,
1826    from: StreamCursor,
1827    to: StreamCursor,
1828    count_limit: u64,
1829) -> Result<StreamFrames, SdkError> {
1830    validate_stream_read_count(count_limit)?;
1831    Ok(backend
1832        .read_stream(execution_id, attempt_index, from, to, count_limit)
1833        .await?)
1834}
1835
1836/// Tail a live attempt's stream.
1837///
1838/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1839/// with id strictly greater than `after`. Pass
1840/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1841/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1842/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1843/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1844///
1845/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1846/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1847/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1848/// SDK and REST ceilings aligned.
1849///
1850/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1851/// polling consumers should loop until `result.is_closed()` is true, then
1852/// drain and exit. Timeout with no new frames presents as
1853/// `frames=[], closed_at=None`.
1854///
1855/// # Head-of-line warning — use a dedicated client
1856///
1857/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1858/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1859/// the read side until a frame arrives or the block elapses. If the
1860/// `client` you pass here is ALSO used for claims, completes, fails,
1861/// appends, or any other FCALL, a 30-second tail will stall all those
1862/// calls for up to 30 seconds.
1863///
1864/// **Strongly recommended**: build a separate `ferriskey::Client` for
1865/// tail callers — mirrors the `Server::tail_client` split that the REST
1866/// server uses internally (see `crates/ff-server/src/server.rs` and
1867/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1868///
1869/// # Tail parallelism caveat (same mux)
1870///
1871/// Even a dedicated tail client is still one multiplexed TCP connection.
1872/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1873/// ferriskey's per-call `request_timeout` starts at future-poll — so
1874/// two concurrent tails against the same client can time out spuriously:
1875/// the second call's BLOCK budget elapses while it waits for the first
1876/// BLOCK to return. The REST server handles this internally with a
1877/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1878/// each call its full `block_ms` budget at the server.
1879///
1880/// **Direct SDK callers that need concurrent tails**: either
1881///   (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1882///       pool of clients, rotated by the caller), OR
1883///   (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1884///       only one BLOCK is in flight per client at a time.
1885/// If you need the REST-side backpressure (429 on contention) and the
1886/// built-in serializer, go through the
1887/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1888/// than calling this directly.
1889///
1890/// This SDK does not enforce either pattern — the mutex belongs at the
1891/// application layer, and the connection pool belongs at the SDK
1892/// caller's DI layer; neither has a structured place inside this
1893/// helper.
1894///
1895/// # Timeout handling
1896///
1897/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1898/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1899/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1900/// 500ms)`, overriding the client's default per-call. A tail with
1901/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1902/// the client was built with a shorter `request_timeout`. No custom client
1903/// configuration is required for timeout reasons — only for head-of-line
1904/// isolation above.
1905#[cfg(feature = "valkey-default")]
1906pub async fn tail_stream(
1907    backend: &dyn EngineBackend,
1908    execution_id: &ExecutionId,
1909    attempt_index: AttemptIndex,
1910    after: StreamCursor,
1911    block_ms: u64,
1912    count_limit: u64,
1913) -> Result<StreamFrames, SdkError> {
1914    tail_stream_with_visibility(
1915        backend,
1916        execution_id,
1917        attempt_index,
1918        after,
1919        block_ms,
1920        count_limit,
1921        ff_core::backend::TailVisibility::All,
1922    )
1923    .await
1924}
1925
1926/// Tail helper with an explicit RFC-015
1927/// [`TailVisibility`](ff_core::backend::TailVisibility) filter.
1928#[cfg(feature = "valkey-default")]
1929pub async fn tail_stream_with_visibility(
1930    backend: &dyn EngineBackend,
1931    execution_id: &ExecutionId,
1932    attempt_index: AttemptIndex,
1933    after: StreamCursor,
1934    block_ms: u64,
1935    count_limit: u64,
1936    visibility: ff_core::backend::TailVisibility,
1937) -> Result<StreamFrames, SdkError> {
1938    if block_ms > MAX_TAIL_BLOCK_MS {
1939        return Err(SdkError::Config {
1940            context: "tail_stream".into(),
1941            field: Some("block_ms".into()),
1942            message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1943        });
1944    }
1945    validate_stream_read_count(count_limit)?;
1946    // XREAD does not accept `-` / `+` markers as cursors — reject at
1947    // the SDK boundary with `SdkError::Config` rather than forwarding
1948    // an invalid `-`/`+` into the backend (which would surface as an
1949    // opaque `EngineError::Transport`).
1950    validate_tail_cursor(&after)?;
1951
1952    Ok(backend
1953        .tail_stream(
1954            execution_id,
1955            attempt_index,
1956            after,
1957            block_ms,
1958            count_limit,
1959            visibility,
1960        )
1961        .await?)
1962}
1963
1964#[cfg(all(test, feature = "valkey-default"))]
1965mod tail_stream_boundary_tests {
1966    use super::*;
1967
1968    // `validate_tail_cursor` rejects `StreamCursor::Start` and
1969    // `StreamCursor::End` before `tail_stream` touches the client —
1970    // same shape as the `count_limit` guard above. The matching
1971    // full-path rejection on the REST layer is covered by
1972    // `ff-server::api`.
1973
1974    #[test]
1975    fn rejects_start_cursor() {
1976        let err = validate_tail_cursor(&StreamCursor::Start)
1977            .expect_err("Start must be rejected");
1978        match err {
1979            SdkError::Config { field, context, .. } => {
1980                assert_eq!(field.as_deref(), Some("after"));
1981                assert_eq!(context, "tail_stream");
1982            }
1983            other => panic!("expected SdkError::Config, got {other:?}"),
1984        }
1985    }
1986
1987    #[test]
1988    fn rejects_end_cursor() {
1989        let err = validate_tail_cursor(&StreamCursor::End)
1990            .expect_err("End must be rejected");
1991        assert!(matches!(err, SdkError::Config { .. }));
1992    }
1993
1994    #[test]
1995    fn accepts_at_cursor() {
1996        validate_tail_cursor(&StreamCursor::At("0-0".into()))
1997            .expect("At cursor must be accepted");
1998        validate_tail_cursor(&StreamCursor::from_beginning())
1999            .expect("from_beginning() must be accepted");
2000        validate_tail_cursor(&StreamCursor::At("123-0".into()))
2001            .expect("concrete id must be accepted");
2002    }
2003}
2004
2005#[cfg(all(test, feature = "valkey-default"))]
2006mod parse_report_usage_result_tests {
2007    use super::*;
2008
2009    /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
2010    /// BulkString and SimpleString uniformly (see
2011    /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
2012    /// `Value::SimpleString(s)` → clone). SimpleString avoids a
2013    /// dev-dependency on `bytes` just for test construction.
2014    fn s(v: &str) -> Result<Value, ferriskey::Error> {
2015        Ok(Value::SimpleString(v.to_owned()))
2016    }
2017
2018    fn int(n: i64) -> Result<Value, ferriskey::Error> {
2019        Ok(Value::Int(n))
2020    }
2021
2022    fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2023        Value::Array(items)
2024    }
2025
2026    #[test]
2027    fn ok_status() {
2028        let raw = arr(vec![int(1), s("OK")]);
2029        assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2030    }
2031
2032    #[test]
2033    fn already_applied_status() {
2034        let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2035        assert_eq!(
2036            parse_report_usage_result(&raw).unwrap(),
2037            ReportUsageResult::AlreadyApplied
2038        );
2039    }
2040
2041    #[test]
2042    fn soft_breach_status() {
2043        let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2044        match parse_report_usage_result(&raw).unwrap() {
2045            ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2046                assert_eq!(dimension, "tokens");
2047                assert_eq!(current_usage, 150);
2048                assert_eq!(soft_limit, 100);
2049            }
2050            other => panic!("expected SoftBreach, got {other:?}"),
2051        }
2052    }
2053
2054    #[test]
2055    fn hard_breach_status() {
2056        let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2057        match parse_report_usage_result(&raw).unwrap() {
2058            ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2059                assert_eq!(dimension, "requests");
2060                assert_eq!(current_usage, 10001);
2061                assert_eq!(hard_limit, 10000);
2062            }
2063            other => panic!("expected HardBreach, got {other:?}"),
2064        }
2065    }
2066
2067    /// Negative case: non-Array input. Guards against a future Lua refactor
2068    /// that accidentally returns a bare string/int — the parser must fail
2069    /// loudly rather than silently succeed or panic.
2070    #[test]
2071    fn non_array_input_is_parse_error() {
2072        let raw = Value::SimpleString("OK".to_owned());
2073        let err = parse_report_usage_result(&raw).unwrap_err();
2074        let msg = format!("{err}");
2075        assert!(
2076            msg.to_lowercase().contains("expected array"),
2077            "error should mention expected shape, got: {msg}"
2078        );
2079    }
2080
2081    /// Negative case: Array whose first element isn't an Int status code.
2082    /// The Lua function's first return slot is always `status_code` (1 on
2083    /// success, an error code otherwise); a non-Int there is a wire-format
2084    /// break that must surface as a parse error.
2085    #[test]
2086    fn first_element_non_int_is_parse_error() {
2087        let raw = arr(vec![s("not_an_int"), s("OK")]);
2088        let err = parse_report_usage_result(&raw).unwrap_err();
2089        let msg = format!("{err}");
2090        assert!(
2091            msg.to_lowercase().contains("int"),
2092            "error should mention Int status code, got: {msg}"
2093        );
2094    }
2095
2096    /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2097    /// field. Guards against the silent-coercion defect cross-review
2098    /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2099    /// which would have surfaced Lua-side wire-format drift as a
2100    /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2101    /// but semantically wrong (a "breach with zero usage" is nonsense
2102    /// and masks the real error).
2103    #[test]
2104    fn soft_breach_non_numeric_current_is_parse_error() {
2105        let raw = arr(vec![
2106            int(1),
2107            s("SOFT_BREACH"),
2108            s("tokens"),
2109            s("not_a_number"), // current_usage — must fail, not coerce to 0
2110            s("100"),
2111        ]);
2112        let err = parse_report_usage_result(&raw).unwrap_err();
2113        let msg = format!("{err}");
2114        assert!(
2115            msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2116            "error should identify sub-status + field, got: {msg}"
2117        );
2118        assert!(
2119            msg.to_lowercase().contains("u64"),
2120            "error should mention the expected type (u64), got: {msg}"
2121        );
2122    }
2123
2124    /// Negative case: HARD_BREACH with the limit slot missing
2125    /// entirely. Same defence as the non-numeric test above: a
2126    /// truncated response must fail loudly rather than coerce to 0.
2127    #[test]
2128    fn hard_breach_missing_limit_is_parse_error() {
2129        let raw = arr(vec![
2130            int(1),
2131            s("HARD_BREACH"),
2132            s("requests"),
2133            s("10001"),
2134            // no index 4 — hard_limit missing
2135        ]);
2136        let err = parse_report_usage_result(&raw).unwrap_err();
2137        let msg = format!("{err}");
2138        assert!(
2139            msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2140            "error should identify sub-status + field, got: {msg}"
2141        );
2142        assert!(
2143            msg.to_lowercase().contains("missing"),
2144            "error should say 'missing', got: {msg}"
2145        );
2146    }
2147}
2148
2149/// RFC-014 helper: pick the first waitpoint_key out of a composite
2150/// tree so `try_suspend_inner` can rebind a Fresh waitpoint to the
2151/// user-supplied key. Single-waitpoint scoping: all
2152/// Single.waitpoint_key / Count.waitpoints[i] in the tree must be
2153/// equal; this function returns the first one it encounters.
2154fn composite_first_waitpoint_key(body: &CompositeBody) -> Option<String> {
2155    match body {
2156        CompositeBody::AllOf { members } => members.iter().find_map(|m| match m {
2157            ResumeCondition::Single { waitpoint_key, .. } => Some(waitpoint_key.clone()),
2158            ResumeCondition::Composite(inner) => composite_first_waitpoint_key(inner),
2159            _ => None,
2160        }),
2161        CompositeBody::Count { waitpoints, .. } => waitpoints.first().cloned(),
2162        _ => None,
2163    }
2164}
2165
2166
2167#[cfg(test)]
2168mod resume_signals_tests {
2169    use super::*;
2170
2171    fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2172        pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2173    }
2174
2175    #[test]
2176    fn empty_suspension_returns_none() {
2177        let susp = m(&[]);
2178        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2179        assert!(out.is_none(), "no suspension record → None");
2180    }
2181
2182    #[test]
2183    fn stale_prior_attempt_returns_none() {
2184        let wp = WaitpointId::new();
2185        let susp = m(&[
2186            ("attempt_index", "0"),
2187            ("close_reason", "resumed"),
2188            ("waitpoint_id", &wp.to_string()),
2189        ]);
2190        // Claimed attempt is 1; suspension belongs to 0 → stale.
2191        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2192        assert!(out.is_none(), "attempt_index mismatch → None");
2193    }
2194
2195    #[test]
2196    fn non_resumed_close_returns_none() {
2197        let wp = WaitpointId::new();
2198        for reason in ["timeout", "cancelled", "", "expired"] {
2199            let susp = m(&[
2200                ("attempt_index", "0"),
2201                ("close_reason", reason),
2202                ("waitpoint_id", &wp.to_string()),
2203            ]);
2204            let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2205            assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2206        }
2207    }
2208
2209    #[test]
2210    fn resumed_same_attempt_returns_waitpoint() {
2211        let wp = WaitpointId::new();
2212        let susp = m(&[
2213            ("attempt_index", "2"),
2214            ("close_reason", "resumed"),
2215            ("waitpoint_id", &wp.to_string()),
2216        ]);
2217        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2218        assert_eq!(out, Some(wp));
2219    }
2220
2221    #[test]
2222    fn malformed_waitpoint_id_is_error() {
2223        let susp = m(&[
2224            ("attempt_index", "0"),
2225            ("close_reason", "resumed"),
2226            ("waitpoint_id", "not-a-uuid"),
2227        ]);
2228        let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2229        assert!(
2230            format!("{err}").contains("not a valid UUID"),
2231            "error should mention invalid UUID, got: {err}"
2232        );
2233    }
2234
2235    #[test]
2236    fn empty_waitpoint_id_returns_none() {
2237        // Defensive: an empty waitpoint_id field (shouldn't happen on
2238        // resumed records, but guard against partial writes) is None, not an error.
2239        let susp = m(&[
2240            ("attempt_index", "0"),
2241            ("close_reason", "resumed"),
2242            ("waitpoint_id", ""),
2243        ]);
2244        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2245        assert!(out.is_none());
2246    }
2247
2248    // The previous `matched_signal_ids_from_condition` helper was
2249    // removed when the production path switched from unbounded
2250    // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2251    // feedback on unbounded condition-hash reply size). That loop
2252    // is exercised by the integration tests in `ff-test`.
2253}
2254
2255#[cfg(all(test, feature = "valkey-default"))]
2256mod terminal_replay_parsing_tests {
2257    //! Unit tests for the SDK's parse path of the enriched
2258    //! `execution_not_active` error returned on a terminal-op replay.
2259    //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2260    //! the Lua side emits the 4-slot detail; these tests prove the
2261    //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2262    //! variant so the reconciler in complete()/fail()/cancel() can
2263    //! match on them.
2264
2265    use super::*;
2266    use ferriskey::Value;
2267
2268    // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2269    // in the test harness. parse_success_result + parse_fail_result handle both.
2270    fn bulk(s: &str) -> Value {
2271        Value::SimpleString(s.to_owned())
2272    }
2273
2274    /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2275    #[test]
2276    fn parse_success_result_extracts_all_four_detail_slots() {
2277        let raw = Value::Array(vec![
2278            Ok(Value::Int(0)),
2279            Ok(bulk("execution_not_active")),
2280            Ok(bulk("success")),
2281            Ok(bulk("42")),
2282            Ok(bulk("terminal")),
2283            Ok(bulk("11111111-1111-1111-1111-111111111111")),
2284        ]);
2285        let err = parse_success_result(&raw, "test").unwrap_err();
2286        let unboxed = match err {
2287            SdkError::Engine(b) => *b,
2288            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2289        };
2290        match unboxed {
2291            crate::EngineError::Contention(
2292                crate::ContentionKind::ExecutionNotActive {
2293                    terminal_outcome,
2294                    lease_epoch,
2295                    lifecycle_phase,
2296                    attempt_id,
2297                },
2298            ) => {
2299                assert_eq!(terminal_outcome, "success");
2300                assert_eq!(lease_epoch, "42");
2301                assert_eq!(lifecycle_phase, "terminal");
2302                assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2303            }
2304            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2305        }
2306    }
2307
2308    /// parse_fail_result must extract all detail slots too so the
2309    /// reconciler in fail() can match lifecycle_phase = "runnable" for
2310    /// retry-scheduled replays.
2311    #[test]
2312    fn parse_fail_result_extracts_all_four_detail_slots() {
2313        let raw = Value::Array(vec![
2314            Ok(Value::Int(0)),
2315            Ok(bulk("execution_not_active")),
2316            Ok(bulk("none")),
2317            Ok(bulk("7")),
2318            Ok(bulk("runnable")),
2319            Ok(bulk("22222222-2222-2222-2222-222222222222")),
2320        ]);
2321        let err = parse_fail_result(&raw).unwrap_err();
2322        let unboxed = match err {
2323            SdkError::Engine(b) => *b,
2324            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2325        };
2326        match unboxed {
2327            crate::EngineError::Contention(
2328                crate::ContentionKind::ExecutionNotActive {
2329                    terminal_outcome,
2330                    lease_epoch,
2331                    lifecycle_phase,
2332                    attempt_id,
2333                },
2334            ) => {
2335                assert_eq!(terminal_outcome, "none");
2336                assert_eq!(lease_epoch, "7");
2337                assert_eq!(lifecycle_phase, "runnable");
2338                assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2339            }
2340            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2341        }
2342    }
2343
2344    /// Empty detail slots must default to "" (not panic) so older-Lua
2345    /// producers or malformed replies degrade to an unreconcilable
2346    /// variant rather than a Parse error.
2347    #[test]
2348    fn parse_success_result_missing_slots_defaults_to_empty() {
2349        let raw = Value::Array(vec![
2350            Ok(Value::Int(0)),
2351            Ok(bulk("execution_not_active")),
2352        ]);
2353        let err = parse_success_result(&raw, "test").unwrap_err();
2354        let unboxed = match err {
2355            SdkError::Engine(b) => *b,
2356            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2357        };
2358        match unboxed {
2359            crate::EngineError::Contention(
2360                crate::ContentionKind::ExecutionNotActive {
2361                    terminal_outcome,
2362                    lease_epoch,
2363                    lifecycle_phase,
2364                    attempt_id,
2365                },
2366            ) => {
2367                assert_eq!(terminal_outcome, "");
2368                assert_eq!(lease_epoch, "");
2369                assert_eq!(lifecycle_phase, "");
2370                assert_eq!(attempt_id, "");
2371            }
2372            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2373        }
2374    }
2375}