Skip to main content

ff_sdk/
task.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind, PendingWaitpoint};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_core::engine_error::StateKind;
11use ff_script::error::ScriptError;
12use ff_core::partition::PartitionConfig;
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21// RFC-013 Stage 1d — `TimeoutBehavior`, `SuspendOutcome`,
22// `ResumeCondition`, `SignalMatcher`, `ResumePolicy`,
23// `SuspensionReasonCode`, `SuspensionRequester`, `WaitpointBinding`,
24// `SuspendArgs`, `SuspendOutcomeDetails`, `IdempotencyKey`,
25// `CompositeBody` all live in `ff_core::contracts`. The `SuspendOutcome`
26// re-export preserves the `ff_sdk::SuspendOutcome` path; the other
27// paths are new.
28pub use ff_core::contracts::{
29    CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
30    SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
31    SuspensionRequester, TimeoutBehavior, WaitpointBinding,
32};
33
34/// RFC-013 Stage 1d — cookie returned by the strict `suspend` wrapper.
35///
36/// The returned `handle` is a `HandleKind::Suspended` cookie the caller
37/// uses for `observe_signals` and the eventual re-claim via
38/// `claim_resumed_execution`. Consuming `ClaimedTask::suspend`
39/// invalidates the pre-suspend `ClaimedTask` (already moved by the
40/// `self: ClaimedTask` receiver); this handle is the only live cookie
41/// left on the caller side.
42#[derive(Debug)]
43pub struct SuspendedHandle {
44    pub handle: Handle,
45    pub details: SuspendOutcomeDetails,
46}
47
48/// RFC-013 Stage 1d — outcome of the fallible `try_suspend` wrapper.
49///
50/// Unlike the strict wrapper, `AlreadySatisfied` hands the `ClaimedTask`
51/// back to the caller so work continues on the retained lease.
52#[allow(clippy::large_enum_variant)]
53pub enum TrySuspendOutcome {
54    Suspended(SuspendedHandle),
55    AlreadySatisfied {
56        task: ClaimedTask,
57        details: SuspendOutcomeDetails,
58    },
59}
60
61impl std::fmt::Debug for TrySuspendOutcome {
62    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63        match self {
64            Self::Suspended(h) => f.debug_tuple("Suspended").field(h).finish(),
65            Self::AlreadySatisfied { details, .. } => f
66                .debug_struct("AlreadySatisfied")
67                .field("details", details)
68                .finish_non_exhaustive(),
69        }
70    }
71}
72
73/// A signal to deliver to a suspended execution's waitpoint.
74#[derive(Clone, Debug)]
75pub struct Signal {
76    pub signal_name: String,
77    pub signal_category: String,
78    pub payload: Option<Vec<u8>>,
79    pub source_type: String,
80    pub source_identity: String,
81    pub idempotency_key: Option<String>,
82    /// HMAC token issued when the waitpoint was created. Required for
83    /// authenticated signal delivery (RFC-004 §Waitpoint Security).
84    pub waitpoint_token: WaitpointToken,
85}
86
87/// Outcome of `deliver_signal()`.
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub enum SignalOutcome {
90    /// Signal accepted, appended to waitpoint but condition not yet satisfied.
91    Accepted { signal_id: SignalId, effect: String },
92    /// Signal triggered resume — execution is now runnable.
93    TriggeredResume { signal_id: SignalId },
94    /// Duplicate signal (idempotency key matched).
95    Duplicate { existing_signal_id: String },
96}
97
98impl SignalOutcome {
99    /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
100    ///
101    /// Consuming packages that call `ff_deliver_signal` directly can use this
102    /// to interpret the Lua return value without depending on SDK internals.
103    pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
104        parse_signal_result(raw)
105    }
106}
107
108/// A signal that triggered a resume, readable by a worker after re-claim.
109///
110/// **RFC-012 Stage 0:** the canonical definition has moved to
111/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
112/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
113/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
114/// scheduled for removal in 0.5.0.
115pub use ff_core::backend::ResumeSignal;
116
117/// Outcome of `append_frame()`.
118///
119/// **RFC-012 §R7.2.1:** canonical definition moved to
120/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
121/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
122/// 0.4.x window. Existing consumers construct + match exhaustively
123/// without change. The derive set widened from `Clone, Debug` to
124/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
125pub use ff_core::backend::AppendFrameOutcome;
126
127/// Outcome of a `fail()` call.
128///
129/// **RFC-012 Stage 1a:** canonical definition moved to
130/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
131/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
132/// re-export in `lib.rs`) through the 0.4.x window. Existing
133/// consumers construct + match exhaustively without change.
134pub use ff_core::backend::FailOutcome;
135
136/// A claimed execution with an active lease. The worker processes this task
137/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
138///
139/// The lease is automatically renewed in the background at `lease_ttl / 3`
140/// intervals. Renewal stops when the task is consumed or dropped.
141///
142/// `complete`, `fail`, and `cancel` consume `self` — this prevents
143/// double-complete bugs at the type level.
144pub struct ClaimedTask {
145    /// Shared Valkey client.
146    #[allow(dead_code)]
147    client: Client,
148    /// `EngineBackend` the trait-migrated ops forward through.
149    ///
150    /// **RFC-012 Stage 1b + Round-7.** Today this is always a
151    /// `ValkeyBackend` wrapping `client`. Most per-task ops
152    /// (`complete`/`fail`/`cancel`/`delay_execution`/
153    /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
154    /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
155    /// through `backend.*()`. `suspend` still uses
156    /// `client.fcall("ff_suspend_execution", ...)` directly — this is
157    /// the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
158    /// input-shape work. Lease renewal goes through
159    /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
160    /// trait-shape gaps tracked by #117.
161    ///
162    /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
163    /// deliver_signal) through the same trait surface; Stage 1d
164    /// refactors this struct to carry a single `Handle` rather than
165    /// synthesising one per op via `synth_handle`.
166    backend: Arc<dyn EngineBackend>,
167    /// Partition config for key construction.
168    #[allow(dead_code)]
169    partition_config: PartitionConfig,
170    /// Execution identity.
171    execution_id: ExecutionId,
172    /// Current attempt.
173    attempt_index: AttemptIndex,
174    attempt_id: AttemptId,
175    /// Lease identity.
176    lease_id: LeaseId,
177    lease_epoch: LeaseEpoch,
178    /// Lease timing.
179    lease_ttl_ms: u64,
180    /// Lane used at claim time.
181    lane_id: LaneId,
182    /// Worker instance that holds this lease (for index cleanup keys).
183    worker_instance_id: WorkerInstanceId,
184    /// Execution data.
185    input_payload: Vec<u8>,
186    execution_kind: String,
187    tags: HashMap<String, String>,
188    /// Background renewal task handle.
189    renewal_handle: JoinHandle<()>,
190    /// Signal to stop renewal (used before consuming self).
191    renewal_stop: Arc<Notify>,
192    /// Consecutive lease renewal failures. Shared with the background renewal
193    /// task. Reset to 0 on each successful renewal. Workers should check
194    /// `is_lease_healthy()` before committing expensive side effects.
195    renewal_failures: Arc<AtomicU32>,
196    /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
197    /// response is received, just before `self` is consumed into `Drop`.
198    /// `Drop` reads this instead of `renewal_handle.is_finished()` to
199    /// suppress the false-positive "dropped without terminal operation"
200    /// warning: after `notify_one`, the renewal task has not yet been
201    /// polled by the runtime, so `is_finished()` is still `false` on the
202    /// happy path when self is being consumed.
203    ///
204    /// Note: the flag is set for any terminal-op path that reaches
205    /// `stop_renewal()`, which includes Lua-level script errors (the
206    /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
207    /// the caller already receives the `Err` via the op's return value,
208    /// so an additional `Drop` warning would be noise. The warning is
209    /// reserved for genuine drop-without-terminal-op cases (panic, early
210    /// return, transport failure before stop_renewal ran).
211    terminal_op_called: AtomicBool,
212    /// Concurrency permit from the worker's semaphore. Held for the lifetime
213    /// of the task; released on complete/fail/cancel/drop.
214    _concurrency_permit: Option<OwnedSemaphorePermit>,
215}
216
217impl ClaimedTask {
218    /// Construct a `ClaimedTask` from the results of a successful
219    /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
220    ///
221    /// # Arguments
222    ///
223    /// * `client` — shared Valkey client used for subsequent lease
224    ///   renewals, signal delivery, and the final
225    ///   complete/fail/cancel FCALL.
226    /// * `partition_config` — partition topology snapshot read at
227    ///   `FlowFabricWorker::connect`. Used for key construction on
228    ///   the lifetime of this task.
229    /// * `execution_id` — the claimed execution's UUID.
230    /// * `attempt_index` / `attempt_id` — current attempt identity.
231    ///   `attempt_index` is 0 on a fresh claim, preserved on a
232    ///   resumed claim.
233    /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
234    ///   is returned by the Lua FCALL and bumped on each resumed
235    ///   claim.
236    /// * `lease_ttl_ms` — lease TTL used to schedule the background
237    ///   renewal task (renews at `lease_ttl_ms / 3`).
238    /// * `lane_id` — the lane the task was claimed on. Used for
239    ///   index key construction (`lane_active`, `lease_expiry`,
240    ///   etc.).
241    /// * `worker_instance_id` — this worker's identity. Used to
242    ///   resolve `worker_leases` index entries during renewal and
243    ///   completion.
244    /// * `input_payload` / `execution_kind` / `tags` — pre-read
245    ///   execution metadata, exposed via getters so the worker
246    ///   doesn't round-trip to Valkey to inspect what it just
247    ///   claimed.
248    ///
249    /// # Invariant: constructor is `pub(crate)` on purpose
250    ///
251    /// **External callers cannot construct a `ClaimedTask`** — only
252    /// the in-crate claim entry points (`claim_next`,
253    /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
254    /// load-bearing: those entry points are the ONLY sites that
255    /// acquire a permit from the worker's concurrency semaphore
256    /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
257    /// it through `ClaimedTask::set_concurrency_permit` before
258    /// returning the task. Promoting `new` to `pub` would let
259    /// consumers build tasks that bypass the concurrency contract
260    /// the worker's `max_concurrent_tasks` config advertises — the
261    /// returned task would run alongside other in-flight work
262    /// without debiting the permit bank, and the
263    /// complete/fail/cancel/drop path would have nothing to
264    /// release.
265    ///
266    /// If an external callsite genuinely needs to rehydrate a
267    /// task from saved FCALL results, the right answer is a new
268    /// `FlowFabricWorker` entry point that wraps `new` + acquires a
269    /// permit — not promoting this constructor.
270    #[allow(clippy::too_many_arguments)]
271    pub(crate) fn new(
272        client: Client,
273        backend: Arc<dyn EngineBackend>,
274        partition_config: PartitionConfig,
275        execution_id: ExecutionId,
276        attempt_index: AttemptIndex,
277        attempt_id: AttemptId,
278        lease_id: LeaseId,
279        lease_epoch: LeaseEpoch,
280        lease_ttl_ms: u64,
281        lane_id: LaneId,
282        worker_instance_id: WorkerInstanceId,
283        input_payload: Vec<u8>,
284        execution_kind: String,
285        tags: HashMap<String, String>,
286    ) -> Self {
287        let renewal_stop = Arc::new(Notify::new());
288        let renewal_failures = Arc::new(AtomicU32::new(0));
289
290        // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
291        // Build the handle once at construction time and hand both Arc clones
292        // into the spawned task.
293        let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
294            execution_id.clone(),
295            attempt_index,
296            attempt_id.clone(),
297            lease_id.clone(),
298            lease_epoch,
299            lease_ttl_ms,
300            lane_id.clone(),
301            worker_instance_id.clone(),
302            HandleKind::Fresh,
303        );
304        let renewal_handle = spawn_renewal_task(
305            backend.clone(),
306            renewal_handle_cookie,
307            execution_id.clone(),
308            lease_ttl_ms,
309            renewal_stop.clone(),
310            renewal_failures.clone(),
311        );
312
313        Self {
314            client,
315            backend,
316            partition_config,
317            execution_id,
318            attempt_index,
319            attempt_id,
320            lease_id,
321            lease_epoch,
322            lease_ttl_ms,
323            lane_id,
324            worker_instance_id,
325            input_payload,
326            execution_kind,
327            tags,
328            renewal_handle,
329            renewal_stop,
330            renewal_failures,
331            terminal_op_called: AtomicBool::new(false),
332            _concurrency_permit: None,
333        }
334    }
335
336    /// Attach a concurrency permit from the worker's semaphore.
337    /// The permit is held for the task's lifetime and released on drop.
338    #[allow(dead_code)]
339    pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
340        self._concurrency_permit = Some(permit);
341    }
342
343    // ── Accessors ──
344
345    pub fn execution_id(&self) -> &ExecutionId {
346        &self.execution_id
347    }
348
349    pub fn attempt_index(&self) -> AttemptIndex {
350        self.attempt_index
351    }
352
353    pub fn attempt_id(&self) -> &AttemptId {
354        &self.attempt_id
355    }
356
357    pub fn lease_id(&self) -> &LeaseId {
358        &self.lease_id
359    }
360
361    pub fn lease_epoch(&self) -> LeaseEpoch {
362        self.lease_epoch
363    }
364
365    pub fn input_payload(&self) -> &[u8] {
366        &self.input_payload
367    }
368
369    pub fn execution_kind(&self) -> &str {
370        &self.execution_kind
371    }
372
373    pub fn tags(&self) -> &HashMap<String, String> {
374        &self.tags
375    }
376
377    pub fn lane_id(&self) -> &LaneId {
378        &self.lane_id
379    }
380
381    /// Read frames from this task's attempt stream.
382    ///
383    /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
384    /// no longer need the `ferriskey::Client` leak (#87). See
385    /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
386    /// for the backend-level contract.
387    ///
388    /// Validation (count_limit bounds) runs at this boundary — out-of-range
389    /// input surfaces as [`SdkError::Config`] before reaching the backend.
390    pub async fn read_stream(
391        &self,
392        from: StreamCursor,
393        to: StreamCursor,
394        count_limit: u64,
395    ) -> Result<StreamFrames, SdkError> {
396        validate_stream_read_count(count_limit)?;
397        Ok(self
398            .backend
399            .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
400            .await?)
401    }
402
403    /// Tail this task's attempt stream for new frames.
404    ///
405    /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
406    /// for the head-of-line warning — the task's backend is shared
407    /// with claim/complete/fail hot paths. Consumers that need
408    /// isolation should build a dedicated `EngineBackend` for tail
409    /// reads.
410    pub async fn tail_stream(
411        &self,
412        after: StreamCursor,
413        block_ms: u64,
414        count_limit: u64,
415    ) -> Result<StreamFrames, SdkError> {
416        self.tail_stream_with_visibility(
417            after,
418            block_ms,
419            count_limit,
420            ff_core::backend::TailVisibility::All,
421        )
422        .await
423    }
424
425    /// Tail with an explicit [`TailVisibility`](ff_core::backend::TailVisibility)
426    /// filter (RFC-015 §6). Use [`TailVisibility::ExcludeBestEffort`]
427    /// (ff_core::backend::TailVisibility::ExcludeBestEffort) to drop
428    /// [`StreamMode::BestEffortLive`](ff_core::backend::StreamMode::BestEffortLive)
429    /// frames server-side.
430    pub async fn tail_stream_with_visibility(
431        &self,
432        after: StreamCursor,
433        block_ms: u64,
434        count_limit: u64,
435        visibility: ff_core::backend::TailVisibility,
436    ) -> Result<StreamFrames, SdkError> {
437        if block_ms > MAX_TAIL_BLOCK_MS {
438            return Err(SdkError::Config {
439                context: "tail_stream".into(),
440                field: Some("block_ms".into()),
441                message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
442            });
443        }
444        validate_stream_read_count(count_limit)?;
445        validate_tail_cursor(&after)?;
446        Ok(self
447            .backend
448            .tail_stream(
449                &self.execution_id,
450                self.attempt_index,
451                after,
452                block_ms,
453                count_limit,
454                visibility,
455            )
456            .await?)
457    }
458
459    /// Synthesise a Valkey-backend `Handle` encoding this task's
460    /// attempt-cookie fields. Private to the SDK — the trait
461    /// forwarders call this per op to produce the `&Handle` argument
462    /// the `EngineBackend` methods take.
463    ///
464    /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
465    /// carry one cached `Handle` instead of synthesising per op is
466    /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
467    /// a single allocation of a small byte buffer — negligible
468    /// compared to the FCALL round-trip that follows.
469    ///
470    /// Kind is always `HandleKind::Fresh` because today the 9
471    /// Stage-1b ops all treat the backend tag + opaque bytes as
472    /// authoritative; they do not dispatch on kind. Stage 1d routes
473    /// resumed-claim callers through a `Resumed` synth.
474    fn synth_handle(&self) -> Handle {
475        ff_backend_valkey::ValkeyBackend::encode_handle(
476            self.execution_id.clone(),
477            self.attempt_index,
478            self.attempt_id.clone(),
479            self.lease_id.clone(),
480            self.lease_epoch,
481            self.lease_ttl_ms,
482            self.lane_id.clone(),
483            self.worker_instance_id.clone(),
484            HandleKind::Fresh,
485        )
486    }
487
488    /// Check if the lease is likely still valid based on renewal success.
489    ///
490    /// Returns `false` if 3 or more consecutive renewal attempts have failed.
491    /// Workers should check this before committing expensive or irreversible
492    /// side effects. A `false` return means Valkey may have already expired
493    /// the lease and another worker could be processing this execution.
494    pub fn is_lease_healthy(&self) -> bool {
495        self.renewal_failures.load(Ordering::Relaxed) < 3
496    }
497
498    /// Number of consecutive lease renewal failures since the last success.
499    ///
500    /// Returns 0 when renewals are working normally. Useful for observability
501    /// and custom health policies beyond the default threshold of 3.
502    pub fn consecutive_renewal_failures(&self) -> u32 {
503        self.renewal_failures.load(Ordering::Relaxed)
504    }
505
506    // ── Terminal operations (consume self) ──
507
508    /// Delay the execution until `delay_until`.
509    ///
510    /// Releases the lease. The execution moves to `delayed` state.
511    /// Consumes self — the task cannot be used after delay.
512    pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
513        // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
514        // the pre-migration `stop_renewal` contract: called only when
515        // the FCALL round-tripped (Ok or a typed Lua/engine error);
516        // raw transport errors bubble up without stopping renewal so
517        // the `Drop` warning still fires if the caller later drops
518        // `self` without retrying.
519        let handle = self.synth_handle();
520        let out = self.backend.delay(&handle, delay_until).await;
521        if fcall_landed(&out) {
522            self.stop_renewal();
523        }
524        out.map_err(SdkError::from)
525    }
526
527    /// Move execution to waiting_children state.
528    ///
529    /// Releases the lease. The execution waits for child dependencies to complete.
530    /// Consumes self.
531    pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
532        // RFC-012 Stage 1b: forwards through `backend.wait_children`.
533        // See `delay_execution` for the stop_renewal contract.
534        let handle = self.synth_handle();
535        let out = self.backend.wait_children(&handle).await;
536        if fcall_landed(&out) {
537            self.stop_renewal();
538        }
539        out.map_err(SdkError::from)
540    }
541
542    /// Complete the execution successfully.
543    ///
544    /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
545    /// Renewal continues during the FCALL to prevent lease expiry under
546    /// network latency — the Lua fences on lease_id+epoch atomically.
547    /// Consumes self — the task cannot be used after completion.
548    ///
549    /// # Connection errors and replay
550    ///
551    /// If the Valkey connection drops after the Lua commit but before the
552    /// client reads the response, retrying `complete()` with the same
553    /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
554    /// reconciles the "already terminal with matching outcome" response
555    /// into `Ok(())`. A retry after a different terminal op has raced in
556    /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
557    /// populated `terminal_outcome` so the caller can see what actually
558    /// happened.
559    pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
560        // RFC-012 Stage 1b: forwards through `backend.complete`.
561        // The FCALL body + the `ExecutionNotActive` replay reconciliation
562        // (match same epoch + attempt_id + outcome=="success" → Ok)
563        // moved to `ff_backend_valkey::complete_impl`.
564        let handle = self.synth_handle();
565        let out = self.backend.complete(&handle, result_payload).await;
566        if fcall_landed(&out) {
567            self.stop_renewal();
568        }
569        out.map_err(SdkError::from)
570    }
571
572    /// Fail the execution with a reason and error category.
573    ///
574    /// If the execution policy allows retries, the engine schedules a retry
575    /// (delayed backoff). Otherwise, the execution becomes terminal failed.
576    /// Returns [`FailOutcome`] so the caller knows what happened.
577    ///
578    /// # Connection errors and replay
579    ///
580    /// `fail()` is replay-safe under the same conditions as `complete()`:
581    /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
582    /// reconciled into the outcome the server actually committed
583    /// (`TerminalFailed` if no retries left; `RetryScheduled` with
584    /// `delay_until = 0` if a retry was scheduled — the exact delay is
585    /// not recovered on the replay path).
586    pub async fn fail(
587        self,
588        reason: &str,
589        error_category: &str,
590    ) -> Result<FailOutcome, SdkError> {
591        // RFC-012 Stage 1b: forwards through `backend.fail`.
592        // The FCALL body + retry-policy read + the two-shape replay
593        // reconciliation (terminal/failed → TerminalFailed, runnable
594        // → RetryScheduled{0}) moved to
595        // `ff_backend_valkey::fail_impl`.
596        //
597        // **Preserving the caller's raw category string.** The
598        // pre-Stage-1b code passed `error_category` straight to the
599        // Lua `failure_category` ARGV. Real callers use arbitrary
600        // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
601        // `"inference_error"`, …) that do not correspond to the 5
602        // named `FailureClass` variants. A lossy enum conversion
603        // would rewrite every non-canonical category to
604        // `Transient` — a real behavior change for
605        // ff-readiness-tests + the examples crates.
606        //
607        // Instead: pack the raw category bytes into
608        // `FailureReason.detail`; `fail_impl` reads them back and
609        // threads them to Lua verbatim. The `classification` enum
610        // is derived from the string for the few consumers who
611        // match on the trait return today (none in-workspace).
612        // Stage 1d (or issue #117) widens `FailureClass` with a
613        // `Custom(String)` arm; this stash carrier retires then.
614        let handle = self.synth_handle();
615        let failure_reason = ff_core::backend::FailureReason::with_detail(
616            reason.to_owned(),
617            error_category.as_bytes().to_vec(),
618        );
619        let classification = error_category_to_class(error_category);
620        let out = self.backend.fail(&handle, failure_reason, classification).await;
621        if fcall_landed(&out) {
622            self.stop_renewal();
623        }
624        out.map_err(SdkError::from)
625    }
626
627    /// Cancel the execution.
628    ///
629    /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
630    /// Consumes self.
631    ///
632    /// # Connection errors and replay
633    ///
634    /// `cancel()` is replay-safe under the same conditions as `complete()`:
635    /// a retry by the same caller matching `lease_epoch` + `attempt_id`
636    /// returns `Ok(())` if the server's stored `terminal_outcome` is
637    /// `cancelled`. A retry that finds a different outcome (because a
638    /// concurrent `complete()` or `fail()` won the race) surfaces as
639    /// `ExecutionNotActive` with the populated `terminal_outcome` so the
640    /// caller can see that the cancel intent was NOT honored.
641    pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
642        // RFC-012 Stage 1b: forwards through `backend.cancel`.
643        // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
644        // and the `ExecutionNotActive` → outcome=="cancelled" replay
645        // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
646        let handle = self.synth_handle();
647        let out = self.backend.cancel(&handle, reason).await;
648        if fcall_landed(&out) {
649            self.stop_renewal();
650        }
651        out.map_err(SdkError::from)
652    }
653
654    // ── Non-terminal operations ──
655
656    /// Manually renew the lease.
657    ///
658    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
659    /// trait. The background renewal task calls
660    /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
661    /// this method is the public entry point for workers that want
662    /// to force a renew out-of-band.
663    pub async fn renew_lease(&self) -> Result<(), SdkError> {
664        let handle = self.synth_handle();
665        self.backend
666            .renew(&handle)
667            .await
668            .map(|_renewal| ())
669            .map_err(SdkError::from)
670    }
671
672    /// Update progress (pct 0-100 and optional message).
673    ///
674    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
675    /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
676    ///
677    /// # Not for stream frames
678    ///
679    /// `update_progress` writes the `progress_percent` / `progress_message`
680    /// fields on `exec_core` (the execution's state hash). It is a
681    /// scalar heartbeat — each call overwrites the previous value and
682    /// nothing is appended to the output stream. Producers emitting
683    /// stream frames (arbitrary `frame_type` + payload, consumed via
684    /// `ClaimedTask::read_stream` / `tail_stream` or the HTTP
685    /// stream-tail routes) MUST use [`Self::append_frame`] instead;
686    /// `update_progress` is invisible to stream consumers.
687    pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
688        let handle = self.synth_handle();
689        self.backend
690            .progress(&handle, Some(pct), Some(message.to_owned()))
691            .await
692            .map_err(SdkError::from)
693    }
694
695    /// Report usage against a budget and check limits.
696    ///
697    /// Non-consuming — the worker can report usage multiple times.
698    /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
699    /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
700    ///
701    /// **RFC-012 §R7.2.3:** forwards through
702    /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
703    /// The trait's `UsageDimensions.dedup_key` carries the raw key;
704    /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
705    /// wrap so the dedup state co-locates with the budget partition
706    /// (PR #108).
707    pub async fn report_usage(
708        &self,
709        budget_id: &BudgetId,
710        dimensions: &[(&str, u64)],
711        dedup_key: Option<&str>,
712    ) -> Result<ReportUsageResult, SdkError> {
713        let handle = self.synth_handle();
714        let mut dims = ff_core::backend::UsageDimensions::default();
715        for (name, delta) in dimensions {
716            dims.custom.insert((*name).to_owned(), *delta);
717        }
718        dims.dedup_key = dedup_key
719            .filter(|k| !k.is_empty())
720            .map(|k| k.to_owned());
721        self.backend
722            .report_usage(&handle, budget_id, dims)
723            .await
724            .map_err(SdkError::from)
725    }
726
727    /// Create a pending waitpoint for future signal delivery.
728    ///
729    /// Non-consuming — the worker keeps the lease. Signals delivered to the
730    /// waitpoint are buffered. When the worker later calls `suspend()` with
731    /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
732    /// resume condition.
733    ///
734    /// Returns both the waitpoint_id AND the HMAC token required by external
735    /// callers to buffer signals against this pending waitpoint
736    /// (RFC-004 §Waitpoint Security).
737    ///
738    /// **RFC-012 §R7.2.2:** forwards through
739    /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
740    /// The trait returns
741    /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
742    /// `hmac_token` is the same wire HMAC this method has always
743    /// produced; the SDK unwraps it back to the historical
744    /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
745    pub async fn create_pending_waitpoint(
746        &self,
747        waitpoint_key: &str,
748        expires_in_ms: u64,
749    ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
750        let handle = self.synth_handle();
751        let expires_in = std::time::Duration::from_millis(expires_in_ms);
752        let pending = self
753            .backend
754            .create_waitpoint(&handle, waitpoint_key, expires_in)
755            .await
756            .map_err(SdkError::from)?;
757        // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
758        // `.token()` accessor borrows the underlying
759        // `WaitpointToken`, which is the caller's historical return
760        // shape.
761        Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
762    }
763
764    // ── Phase 4: Streaming ──
765
766    /// Append a frame to the current attempt's output stream.
767    ///
768    /// Non-consuming — the worker can append many frames during execution.
769    /// The stream is created lazily on the first append.
770    ///
771    /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
772    /// `EngineBackend` trait. The free-form `frame_type` tag and
773    /// optional `metadata` (wire `correlation_id`) travel on the
774    /// extended [`ff_core::backend::Frame`] shape (`frame_type:
775    /// String`, `correlation_id: Option<String>`), giving byte-for-byte
776    /// wire parity with the pre-migration direct-FCALL path.
777    ///
778    /// # `append_frame` vs [`Self::update_progress`]
779    ///
780    /// Stream-frame producers (arbitrary `frame_type` + payload
781    /// consumed via `ClaimedTask::read_stream` / `tail_stream` or the
782    /// HTTP stream-tail routes) MUST use `append_frame`.
783    /// `update_progress` writes scalar `progress_percent` /
784    /// `progress_message` fields to `exec_core` and is invisible to
785    /// stream consumers.
786    pub async fn append_frame(
787        &self,
788        frame_type: &str,
789        payload: &[u8],
790        metadata: Option<&str>,
791    ) -> Result<AppendFrameOutcome, SdkError> {
792        self.append_frame_with_mode(
793            frame_type,
794            payload,
795            metadata,
796            ff_core::backend::StreamMode::Durable,
797        )
798        .await
799    }
800
801    /// Append a frame under an explicit RFC-015
802    /// [`StreamMode`](ff_core::backend::StreamMode).
803    /// Defaults via [`Self::append_frame`] preserve pre-015 behaviour
804    /// ([`StreamMode::Durable`](ff_core::backend::StreamMode::Durable)).
805    pub async fn append_frame_with_mode(
806        &self,
807        frame_type: &str,
808        payload: &[u8],
809        metadata: Option<&str>,
810        mode: ff_core::backend::StreamMode,
811    ) -> Result<AppendFrameOutcome, SdkError> {
812        let handle = self.synth_handle();
813        let mut frame = ff_core::backend::Frame::new(
814            payload.to_vec(),
815            ff_core::backend::FrameKind::Event,
816        )
817        .with_frame_type(frame_type)
818        .with_mode(mode);
819        if let Some(cid) = metadata {
820            frame = frame.with_correlation_id(cid);
821        }
822        self.backend
823            .append_frame(&handle, frame)
824            .await
825            .map_err(SdkError::from)
826    }
827
828    // ── RFC-013 Stage 1d: Suspend ──
829
830    /// **Strict suspend.** Consumes `self` and yields a
831    /// [`SuspendedHandle`] or errors. On the early-satisfied path
832    /// (buffered signals already matched the condition), returns
833    /// `EngineError::State(AlreadySatisfied)` — use `try_suspend` when
834    /// that branch is part of the flow's happy path.
835    ///
836    /// RFC-013 §5.1 — this is the classic Rust `foo` / `try_foo`
837    /// split; `suspend` is the unconditional form. Defaults
838    /// `WaitpointBinding::fresh()` for the waitpoint. For pending
839    /// waitpoints previously issued via `create_pending_waitpoint`,
840    /// use `try_suspend_on_pending`.
841    pub async fn suspend(
842        self,
843        reason_code: SuspensionReasonCode,
844        resume_condition: ResumeCondition,
845        timeout: Option<(TimestampMs, TimeoutBehavior)>,
846        resume_policy: ResumePolicy,
847    ) -> Result<SuspendedHandle, SdkError> {
848        let outcome = self
849            .try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
850            .await?;
851        match outcome {
852            TrySuspendOutcome::Suspended(h) => Ok(h),
853            TrySuspendOutcome::AlreadySatisfied { .. } => Err(SdkError::from(
854                crate::EngineError::State(StateKind::AlreadySatisfied),
855            )),
856        }
857    }
858
859    /// **Fallible suspend.** On the early-satisfied path, the
860    /// `ClaimedTask` is handed back unchanged so the worker can
861    /// continue running against the retained lease.
862    ///
863    /// RFC-013 §5.1 — use this when consuming a pending waitpoint whose
864    /// buffered signals may already match the condition.
865    pub async fn try_suspend(
866        self,
867        reason_code: SuspensionReasonCode,
868        resume_condition: ResumeCondition,
869        timeout: Option<(TimestampMs, TimeoutBehavior)>,
870        resume_policy: ResumePolicy,
871    ) -> Result<TrySuspendOutcome, SdkError> {
872        self.try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
873            .await
874    }
875
876    /// Convenience: `try_suspend` against a pending waitpoint previously
877    /// issued via `create_pending_waitpoint`.
878    pub async fn try_suspend_on_pending(
879        self,
880        pending: &PendingWaitpoint,
881        reason_code: SuspensionReasonCode,
882        resume_condition: ResumeCondition,
883        timeout: Option<(TimestampMs, TimeoutBehavior)>,
884        resume_policy: ResumePolicy,
885    ) -> Result<TrySuspendOutcome, SdkError> {
886        self.try_suspend_inner(
887            WaitpointBinding::use_pending(pending),
888            reason_code,
889            resume_condition,
890            timeout,
891            resume_policy,
892        )
893        .await
894    }
895
896    async fn try_suspend_inner(
897        self,
898        waitpoint: WaitpointBinding,
899        reason_code: SuspensionReasonCode,
900        resume_condition: ResumeCondition,
901        timeout: Option<(TimestampMs, TimeoutBehavior)>,
902        resume_policy: ResumePolicy,
903    ) -> Result<TrySuspendOutcome, SdkError> {
904        let handle = self.synth_handle();
905        let (timeout_at, timeout_behavior) = match timeout {
906            Some((at, b)) => (Some(at), b),
907            None => (None, TimeoutBehavior::Fail),
908        };
909        // If the caller passed `ResumeCondition::Single { waitpoint_key }`
910        // alongside the default `WaitpointBinding::fresh()` (which mints a
911        // random internal key), rebind the Fresh binding to use the
912        // condition's key so RFC-013 §2.4 "waitpoint_key cross-field
913        // invariant" is honored. UsePending retains its own binding.
914        // RFC-014: for `Composite`, rebind to the first waitpoint_key
915        // observed in the composite tree (Single.waitpoint_key or
916        // Count.waitpoints[0]); consumers using single-waitpoint
917        // composites must keep all Single/Count waitpoint_keys equal
918        // (see RFC-014 single-waitpoint scoping).
919        let waitpoint = match (&waitpoint, &resume_condition) {
920            (
921                WaitpointBinding::Fresh { waitpoint_id, .. },
922                ResumeCondition::Single { waitpoint_key, .. },
923            ) => WaitpointBinding::Fresh {
924                waitpoint_id: waitpoint_id.clone(),
925                waitpoint_key: waitpoint_key.clone(),
926            },
927            (
928                WaitpointBinding::Fresh { waitpoint_id, .. },
929                ResumeCondition::Composite(body),
930            ) => {
931                if let Some(key) = composite_first_waitpoint_key(body) {
932                    WaitpointBinding::Fresh {
933                        waitpoint_id: waitpoint_id.clone(),
934                        waitpoint_key: key,
935                    }
936                } else {
937                    waitpoint
938                }
939            }
940            _ => waitpoint,
941        };
942        let mut args = SuspendArgs::new(
943            SuspensionId::new(),
944            waitpoint,
945            resume_condition,
946            resume_policy,
947            reason_code,
948            TimestampMs::now(),
949        )
950        .with_requester(SuspensionRequester::Worker);
951        if let Some(at) = timeout_at {
952            args = args.with_timeout(at, timeout_behavior);
953        }
954
955        let outcome = self
956            .backend
957            .suspend(&handle, args)
958            .await
959            .map_err(SdkError::from)?;
960        match outcome {
961            SuspendOutcome::Suspended { details, handle: new_handle } => {
962                // Renewal is stopped only AFTER a successful Suspended
963                // outcome. The suspended-kind handle is what the caller
964                // will later hand to `observe_signals` / `claim_from_reclaim`.
965                self.stop_renewal();
966                Ok(TrySuspendOutcome::Suspended(SuspendedHandle {
967                    handle: new_handle,
968                    details,
969                }))
970            }
971            SuspendOutcome::AlreadySatisfied { details } => {
972                // Lease is retained Lua-side; keep the ClaimedTask alive
973                // so the worker continues against the existing lease.
974                Ok(TrySuspendOutcome::AlreadySatisfied {
975                    task: self,
976                    details,
977                })
978            }
979            _ => Err(SdkError::from(ScriptError::Parse {
980                fcall: "try_suspend_inner".into(),
981                execution_id: None,
982                message: "unexpected SuspendOutcome variant".into(),
983            })),
984        }
985    }
986
987    /// Read the signals that satisfied the waitpoint and triggered this
988    /// resume.
989    ///
990    /// Non-consuming. Intended to be called immediately after re-claim via
991    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
992    /// subsequent `suspend()` (which replaces `suspension:current`).
993    ///
994    /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
995    ///
996    /// - No prior suspension on this execution.
997    /// - The prior suspension belonged to an earlier attempt (e.g. the
998    ///   attempt was cancelled/failed and a retry is now claiming).
999    /// - The prior suspension was closed by timeout / cancel / operator
1000    ///   override rather than by a matched signal.
1001    ///
1002    /// Reads `suspension:current` once, filters by `attempt_index` to
1003    /// guard against stale prior-attempt records, then fetches the matched
1004    /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
1005    /// fields and reads each signal's metadata + payload directly.
1006    pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
1007        // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
1008        // Pre-migration body (HGETALL suspension_current + HMGET
1009        // matchers + pipelined HGETALL signal_hash / GET
1010        // signal_payload) lives in
1011        // `ff_backend_valkey::observe_signals_impl`.
1012        let handle = self.synth_handle();
1013        self.backend
1014            .observe_signals(&handle)
1015            .await
1016            .map_err(SdkError::from)
1017    }
1018
1019    /// Signal the renewal task to stop. Called by every terminal op
1020    /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
1021    /// `move_to_waiting_children`) after the FCALL returns. Also marks
1022    /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
1023    /// consumption from a genuine drop-without-terminal-op.
1024    fn stop_renewal(&self) {
1025        self.terminal_op_called.store(true, Ordering::Release);
1026        self.renewal_stop.notify_one();
1027    }
1028
1029}
1030
1031/// True iff the backend's FCALL result represents a round-trip that
1032/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
1033/// contention, conflict, state, bug) all count as "landed" — the
1034/// server either committed or rejected with a typed response. Only
1035/// raw `Transport` errors (connection drops, request timeouts, parse
1036/// failures) count as "did not land", which matches the pre-Stage-1b
1037/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
1038/// — those errors returned before `stop_renewal()` ran, preserving
1039/// the `Drop` warning for genuine "lease will leak" cases.
1040///
1041/// Stage 1b terminal-op forwarders use this predicate to decide
1042/// whether to call `stop_renewal()`: yes for landed responses, no
1043/// for transport errors so the caller's retry path still sees a
1044/// running renewal task.
1045fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
1046    match r {
1047        Ok(_) => true,
1048        Err(crate::EngineError::Transport { .. }) => false,
1049        Err(_) => true,
1050    }
1051}
1052
1053/// Map the SDK's free-form `error_category: &str` to the typed
1054/// `FailureClass` the trait's `fail` method takes. Unknown categories
1055/// fall through to `Transient` — the Lua side already tolerated
1056/// arbitrary strings, so the worst a category drift does under the
1057/// Stage 1b forwarder is reclassify a novel category as transient.
1058/// Stage 1d (or issue #117) widens `FailureClass` with a
1059/// `Custom(String)` arm for exact round-trip.
1060fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
1061    use ff_core::backend::FailureClass;
1062    match s {
1063        "transient" => FailureClass::Transient,
1064        "permanent" => FailureClass::Permanent,
1065        "infra_crash" => FailureClass::InfraCrash,
1066        "timeout" => FailureClass::Timeout,
1067        "cancelled" => FailureClass::Cancelled,
1068        _ => FailureClass::Transient,
1069    }
1070}
1071
1072impl Drop for ClaimedTask {
1073    fn drop(&mut self) {
1074        // Abort the background renewal task on drop.
1075        // This is a safety net — complete/fail/cancel already stop renewal
1076        // via notify before consuming self. But if the task is dropped
1077        // without being consumed (e.g., panic), abort prevents leaked renewals.
1078        //
1079        // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1080        // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1081        // and then self is consumed into Drop immediately. The renewal task
1082        // has not yet been polled by the runtime, so `is_finished()` is still
1083        // `false` here — which previously fired the warning on every
1084        // complete/fail/cancel/suspend call. `terminal_op_called` is the
1085        // authoritative signal that a terminal-op path ran to the point of
1086        // stopping renewal; it does not by itself certify the Lua side
1087        // succeeded (see the field doc). The caller surfaces any error via
1088        // the op's return value, so a `Drop` warning is unneeded there.
1089        if !self.terminal_op_called.load(Ordering::Acquire) {
1090            tracing::warn!(
1091                execution_id = %self.execution_id,
1092                "ClaimedTask dropped without terminal operation — lease will expire"
1093            );
1094        }
1095        self.renewal_handle.abort();
1096    }
1097}
1098
1099// ── Lease renewal ──
1100
1101/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1102/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1103/// hooks still see one span per renewal (restores PR #119 Cursor
1104/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1105/// construction time, not per-tick).
1106///
1107/// See `benches/harness/src/bin/long_running.rs` for the bench
1108/// consumer that depends on this span naming.
1109#[tracing::instrument(
1110    name = "renew_lease",
1111    skip_all,
1112    fields(execution_id = %execution_id)
1113)]
1114async fn renew_once(
1115    backend: &dyn EngineBackend,
1116    handle: &Handle,
1117    execution_id: &ExecutionId,
1118) -> Result<(), crate::EngineError> {
1119    backend.renew(handle).await.map(|_| ())
1120}
1121
1122/// Spawn a background tokio task that renews the lease at `ttl / 3`
1123/// intervals.
1124///
1125/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1126/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1127/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1128/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1129/// + the encoded `Handle` instead. Per-tick tracing lives on
1130///   [`renew_once`]; this function itself is sync + one-shot.
1131///
1132/// Stops when:
1133/// - `stop_signal` is notified (complete/fail/cancel called)
1134/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1135/// - The task handle is aborted (ClaimedTask dropped)
1136fn spawn_renewal_task(
1137    backend: Arc<dyn EngineBackend>,
1138    handle: Handle,
1139    execution_id: ExecutionId,
1140    lease_ttl_ms: u64,
1141    stop_signal: Arc<Notify>,
1142    failure_counter: Arc<AtomicU32>,
1143) -> JoinHandle<()> {
1144    // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1145    // panics if a caller (or a misconfigured test) passes a
1146    // lease_ttl_ms < 3. The SDK config validator already enforces
1147    // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1148    // is a cheap belt-and-suspenders (Copilot review finding on
1149    // PR #119).
1150    let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1151
1152    tokio::spawn(async move {
1153        let mut tick = tokio::time::interval(interval);
1154        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1155        // Skip the first immediate tick — the lease was just acquired.
1156        tick.tick().await;
1157
1158        loop {
1159            tokio::select! {
1160                _ = stop_signal.notified() => {
1161                    tracing::debug!(
1162                        execution_id = %execution_id,
1163                        "lease renewal stopped by signal"
1164                    );
1165                    return;
1166                }
1167                _ = tick.tick() => {
1168                    match renew_once(backend.as_ref(), &handle, &execution_id).await {
1169                        Ok(_renewal) => {
1170                            failure_counter.store(0, Ordering::Relaxed);
1171                            tracing::trace!(
1172                                execution_id = %execution_id,
1173                                "lease renewed"
1174                            );
1175                        }
1176                        Err(e) if is_terminal_renewal_error(&e) => {
1177                            failure_counter.fetch_add(1, Ordering::Relaxed);
1178                            tracing::warn!(
1179                                execution_id = %execution_id,
1180                                error = %e,
1181                                "lease renewal failed with terminal error, stopping renewal"
1182                            );
1183                            return;
1184                        }
1185                        Err(e) => {
1186                            let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1187                            tracing::warn!(
1188                                execution_id = %execution_id,
1189                                error = %e,
1190                                consecutive_failures = count,
1191                                "lease renewal failed (will retry next interval)"
1192                            );
1193                        }
1194                    }
1195                }
1196            }
1197        }
1198    })
1199}
1200
1201/// Check if an engine error means renewal should stop permanently.
1202#[allow(dead_code)]
1203fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1204    use crate::{ContentionKind, EngineError, StateKind};
1205    matches!(
1206        err,
1207        EngineError::State(
1208            StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1209        ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1210            | EngineError::NotFound { entity: "execution" }
1211    )
1212}
1213
1214// ── FCALL result parsing ──
1215
1216/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1217/// function into a typed [`ReportUsageResult`].
1218///
1219/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1220///                  `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1221/// Status code `!= 1` is parsed as a [`ScriptError`] via
1222/// [`ScriptError::from_code_with_detail`].
1223///
1224/// Exposed as `pub` so downstream SDKs that speak the same wire format
1225/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1226/// call this directly instead of re-implementing the parse. Keeping one
1227/// parser paired with the producer (the Lua function registered at
1228/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1229/// against silent format drift between producer and consumer.
1230pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1231    let arr = match raw {
1232        Value::Array(arr) => arr,
1233        _ => {
1234            return Err(SdkError::from(ScriptError::Parse {
1235                fcall: "parse_report_usage_result".into(),
1236                execution_id: None,
1237                message: "ff_report_usage_and_check: expected Array".into(),
1238            }));
1239        }
1240    };
1241    let status_code = match arr.first() {
1242        Some(Ok(Value::Int(n))) => *n,
1243        _ => {
1244            return Err(SdkError::from(ScriptError::Parse {
1245                fcall: "parse_report_usage_result".into(),
1246                execution_id: None,
1247                message: "ff_report_usage_and_check: expected Int status code".into(),
1248            }));
1249        }
1250    };
1251    if status_code != 1 {
1252        let error_code = usage_field_str(arr, 1);
1253        let detail = usage_field_str(arr, 2);
1254        return Err(SdkError::from(
1255            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1256                ScriptError::Parse {
1257                    fcall: "parse_report_usage_result".into(),
1258                    execution_id: None,
1259                    message: format!("ff_report_usage_and_check: {error_code}"),
1260                }
1261            }),
1262        ));
1263    }
1264    let sub_status = usage_field_str(arr, 1);
1265    match sub_status.as_str() {
1266        "OK" => Ok(ReportUsageResult::Ok),
1267        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1268        "SOFT_BREACH" => {
1269            let dim = usage_field_str(arr, 2);
1270            let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1271            let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1272            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1273        }
1274        "HARD_BREACH" => {
1275            let dim = usage_field_str(arr, 2);
1276            let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1277            let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1278            Ok(ReportUsageResult::HardBreach {
1279                dimension: dim,
1280                current_usage: current,
1281                hard_limit: limit,
1282            })
1283        }
1284        _ => Err(SdkError::from(ScriptError::Parse {
1285            fcall: "parse_report_usage_result".into(),
1286            execution_id: None,
1287            message: format!(
1288            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1289        ),
1290        })),
1291    }
1292}
1293
1294fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1295    match arr.get(index) {
1296        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1297        Some(Ok(Value::SimpleString(s))) => s.clone(),
1298        Some(Ok(Value::Int(n))) => n.to_string(),
1299        _ => String::new(),
1300    }
1301}
1302
1303/// Parse a required numeric usage field (u64) from the wire array at
1304/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1305/// holds a non-string/non-int value, or contains a string that does
1306/// not parse as u64.
1307///
1308/// Rationale: the Lua producer (`lua/budget.lua:99`,
1309/// `ff_report_usage_and_check`) always emits
1310/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1311/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1312/// non-numeric value here means the Lua and Rust sides drifted;
1313/// silently coercing to `0` would surface drift as "zero-usage breach"
1314/// — arithmetically correct but semantically nonsense. Fail loudly
1315/// instead so drift shows up as a parse error at the first call site.
1316fn parse_usage_u64(
1317    arr: &[Result<Value, ferriskey::Error>],
1318    index: usize,
1319    sub_status: &str,
1320    field_name: &str,
1321) -> Result<u64, SdkError> {
1322    match arr.get(index) {
1323        Some(Ok(Value::Int(n))) => {
1324            u64::try_from(*n).map_err(|_| {
1325                SdkError::from(ScriptError::Parse {
1326                    fcall: "parse_usage_u64".into(),
1327                    execution_id: None,
1328                    message: format!(
1329                    "ff_report_usage_and_check {sub_status}: {field_name} \
1330                     (index {index}) negative int {n} cannot be u64"
1331                ),
1332                })
1333            })
1334        }
1335        Some(Ok(Value::BulkString(b))) => {
1336            let s = String::from_utf8_lossy(b);
1337            s.parse::<u64>().map_err(|_| {
1338                SdkError::from(ScriptError::Parse {
1339                    fcall: "parse_usage_u64".into(),
1340                    execution_id: None,
1341                    message: format!(
1342                    "ff_report_usage_and_check {sub_status}: {field_name} \
1343                     (index {index}) not a u64 string: {s:?}"
1344                ),
1345                })
1346            })
1347        }
1348        Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1349            SdkError::from(ScriptError::Parse {
1350                fcall: "parse_usage_u64".into(),
1351                execution_id: None,
1352                message: format!(
1353                "ff_report_usage_and_check {sub_status}: {field_name} \
1354                 (index {index}) not a u64 string: {s:?}"
1355            ),
1356            })
1357        }),
1358        Some(_) => Err(SdkError::from(ScriptError::Parse {
1359            fcall: "parse_usage_u64".into(),
1360            execution_id: None,
1361            message: format!(
1362            "ff_report_usage_and_check {sub_status}: {field_name} \
1363             (index {index}) wrong wire type (expected Int or String)"
1364        ),
1365        })),
1366        None => Err(SdkError::from(ScriptError::Parse {
1367            fcall: "parse_usage_u64".into(),
1368            execution_id: None,
1369            message: format!(
1370            "ff_report_usage_and_check {sub_status}: {field_name} \
1371             (index {index}) missing from response"
1372        ),
1373        })),
1374    }
1375}
1376
1377/// Pure helper: decide whether `suspension:current` represents a
1378/// signal-driven resume for the currently-claimed attempt, and extract
1379/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1380/// (no record, stale prior-attempt, non-resumed close). Returns an error
1381/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1382/// bug rather than a missing-data case.
1383///
1384/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1385/// `EngineBackend::observe_signals`, which re-implements this invariant
1386/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1387/// tests so the parsing contract stays exercised at the SDK layer —
1388/// Stage 1d will consolidate (either promote the helper into ff-core
1389/// or drop these tests once the backend-side tests cover equivalent
1390/// ground).
1391#[allow(dead_code)]
1392fn resume_waitpoint_id_from_suspension(
1393    susp: &HashMap<String, String>,
1394    claimed_attempt: AttemptIndex,
1395) -> Result<Option<WaitpointId>, SdkError> {
1396    if susp.is_empty() {
1397        return Ok(None);
1398    }
1399    let susp_att: u32 = susp
1400        .get("attempt_index")
1401        .and_then(|s| s.parse().ok())
1402        .unwrap_or(u32::MAX);
1403    if susp_att != claimed_attempt.0 {
1404        return Ok(None);
1405    }
1406    let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1407    if close_reason != "resumed" {
1408        return Ok(None);
1409    }
1410    let wp_id_str = susp
1411        .get("waitpoint_id")
1412        .map(String::as_str)
1413        .unwrap_or_default();
1414    if wp_id_str.is_empty() {
1415        return Ok(None);
1416    }
1417    let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1418        SdkError::from(ScriptError::Parse {
1419            fcall: "resume_waitpoint_id_from_suspension".into(),
1420            execution_id: None,
1421            message: format!(
1422            "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1423        ),
1424        })
1425    })?;
1426    Ok(Some(waitpoint_id))
1427}
1428
1429#[cfg_attr(not(feature = "direct-valkey-claim"), allow(dead_code))]
1430pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1431    let arr = match raw {
1432        Value::Array(arr) => arr,
1433        _ => {
1434            return Err(SdkError::from(ScriptError::Parse {
1435                fcall: "parse_success_result".into(),
1436                execution_id: None,
1437                message: format!(
1438                "{function_name}: expected Array, got non-array"
1439            ),
1440            }));
1441        }
1442    };
1443
1444    if arr.is_empty() {
1445        return Err(SdkError::from(ScriptError::Parse {
1446            fcall: "parse_success_result".into(),
1447            execution_id: None,
1448            message: format!(
1449            "{function_name}: empty result array"
1450        ),
1451        }));
1452    }
1453
1454    let status_code = match arr.first() {
1455        Some(Ok(Value::Int(n))) => *n,
1456        _ => {
1457            return Err(SdkError::from(ScriptError::Parse {
1458                fcall: "parse_success_result".into(),
1459                execution_id: None,
1460                message: format!(
1461                "{function_name}: expected Int at index 0"
1462            ),
1463            }));
1464        }
1465    };
1466
1467    if status_code == 1 {
1468        Ok(())
1469    } else {
1470        // Extract error code from index 1 and optional detail from index 2
1471        // (e.g. `capability_mismatch` ships missing tokens there). Variants
1472        // that carry a String payload pick the detail up via
1473        // `from_code_with_detail`; other variants ignore it.
1474        let field_str = |idx: usize| -> String {
1475            arr.get(idx)
1476                .and_then(|v| match v {
1477                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1478                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1479                    _ => None,
1480                })
1481                .unwrap_or_default()
1482        };
1483        let error_code = {
1484            let s = field_str(1);
1485            if s.is_empty() { "unknown".to_owned() } else { s }
1486        };
1487        // Collect all detail slots (idx >= 2). Most variants only read
1488        // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1489        // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1490        // reconciliation after a network drop.
1491        let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1492        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1493
1494        let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1495            .unwrap_or_else(|| {
1496                ScriptError::Parse {
1497                    fcall: "parse_success_result".into(),
1498                    execution_id: None,
1499                    message: format!("{function_name}: unknown error: {error_code}"),
1500                }
1501            });
1502
1503        Err(SdkError::from(script_err))
1504    }
1505}
1506
1507// RFC-013 Stage 1d — `parse_suspend_result` removed. The backend impl
1508// in `ff_backend_valkey` now parses the Lua return and produces a typed
1509// `SuspendOutcome`; the SDK forwarder consumes that directly.
1510
1511/// Parse ff_deliver_signal result:
1512///   ok(signal_id, effect)
1513///   ok_duplicate(existing_signal_id)
1514pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1515    let arr = match raw {
1516        Value::Array(arr) => arr,
1517        _ => {
1518            return Err(SdkError::from(ScriptError::Parse {
1519                fcall: "parse_signal_result".into(),
1520                execution_id: None,
1521                message: "ff_deliver_signal: expected Array".into(),
1522            }));
1523        }
1524    };
1525
1526    let status_code = match arr.first() {
1527        Some(Ok(Value::Int(n))) => *n,
1528        _ => {
1529            return Err(SdkError::from(ScriptError::Parse {
1530                fcall: "parse_signal_result".into(),
1531                execution_id: None,
1532                message: "ff_deliver_signal: bad status code".into(),
1533            }));
1534        }
1535    };
1536
1537    if status_code != 1 {
1538        let err_field_str = |idx: usize| -> String {
1539            arr.get(idx)
1540                .and_then(|v| match v {
1541                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1542                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1543                    _ => None,
1544                })
1545                .unwrap_or_default()
1546        };
1547        let error_code = {
1548            let s = err_field_str(1);
1549            if s.is_empty() { "unknown".to_owned() } else { s }
1550        };
1551        let detail = err_field_str(2);
1552        return Err(SdkError::from(
1553            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1554                ScriptError::Parse {
1555                    fcall: "parse_signal_result".into(),
1556                    execution_id: None,
1557                    message: format!("ff_deliver_signal: {error_code}"),
1558                }
1559            }),
1560        ));
1561    }
1562
1563    let sub_status = arr
1564        .get(1)
1565        .and_then(|v| match v {
1566            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1567            Ok(Value::SimpleString(s)) => Some(s.clone()),
1568            _ => None,
1569        })
1570        .unwrap_or_default();
1571
1572    if sub_status == "DUPLICATE" {
1573        let existing_id = arr
1574            .get(2)
1575            .and_then(|v| match v {
1576                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1577                Ok(Value::SimpleString(s)) => Some(s.clone()),
1578                _ => None,
1579            })
1580            .unwrap_or_default();
1581        return Ok(SignalOutcome::Duplicate {
1582            existing_signal_id: existing_id,
1583        });
1584    }
1585
1586    // Parse: {1, "OK", signal_id, effect}
1587    let signal_id_str = arr
1588        .get(2)
1589        .and_then(|v| match v {
1590            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1591            Ok(Value::SimpleString(s)) => Some(s.clone()),
1592            _ => None,
1593        })
1594        .unwrap_or_default();
1595
1596    let effect = arr
1597        .get(3)
1598        .and_then(|v| match v {
1599            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1600            Ok(Value::SimpleString(s)) => Some(s.clone()),
1601            _ => None,
1602        })
1603        .unwrap_or_default();
1604
1605    let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1606        SdkError::from(ScriptError::Parse {
1607            fcall: "parse_signal_result".into(),
1608            execution_id: None,
1609            message: format!(
1610            "ff_deliver_signal: invalid signal_id from Lua: {e}"
1611        ),
1612        })
1613    })?;
1614
1615    if effect == "resume_condition_satisfied" {
1616        Ok(SignalOutcome::TriggeredResume { signal_id })
1617    } else {
1618        Ok(SignalOutcome::Accepted { signal_id, effect })
1619    }
1620}
1621
1622/// Parse ff_fail_execution result:
1623///   ok("retry_scheduled", delay_until)
1624///   ok("terminal_failed")
1625#[allow(dead_code)]
1626fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1627    let arr = match raw {
1628        Value::Array(arr) => arr,
1629        _ => {
1630            return Err(SdkError::from(ScriptError::Parse {
1631                fcall: "parse_fail_result".into(),
1632                execution_id: None,
1633                message: "ff_fail_execution: expected Array".into(),
1634            }));
1635        }
1636    };
1637
1638    let status_code = match arr.first() {
1639        Some(Ok(Value::Int(n))) => *n,
1640        _ => {
1641            return Err(SdkError::from(ScriptError::Parse {
1642                fcall: "parse_fail_result".into(),
1643                execution_id: None,
1644                message: "ff_fail_execution: bad status code".into(),
1645            }));
1646        }
1647    };
1648
1649    if status_code != 1 {
1650        let err_field_str = |idx: usize| -> String {
1651            arr.get(idx)
1652                .and_then(|v| match v {
1653                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1654                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1655                    _ => None,
1656                })
1657                .unwrap_or_default()
1658        };
1659        let error_code = {
1660            let s = err_field_str(1);
1661            if s.is_empty() { "unknown".to_owned() } else { s }
1662        };
1663        let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1664        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1665        return Err(SdkError::from(
1666            ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1667                ScriptError::Parse {
1668                    fcall: "parse_fail_result".into(),
1669                    execution_id: None,
1670                    message: format!("ff_fail_execution: {error_code}"),
1671                }
1672            }),
1673        ));
1674    }
1675
1676    // Parse sub-status from field[2] (index 2 = first field after status+OK)
1677    let sub_status = arr
1678        .get(2)
1679        .and_then(|v| match v {
1680            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1681            Ok(Value::SimpleString(s)) => Some(s.clone()),
1682            _ => None,
1683        })
1684        .unwrap_or_default();
1685
1686    match sub_status.as_str() {
1687        "retry_scheduled" => {
1688            // Lua returns: ok("retry_scheduled", tostring(delay_until))
1689            // arr[3] = delay_until
1690            let delay_str = arr
1691                .get(3)
1692                .and_then(|v| match v {
1693                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1694                    Ok(Value::Int(n)) => Some(n.to_string()),
1695                    _ => None,
1696                })
1697                .unwrap_or_default();
1698            let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1699
1700            Ok(FailOutcome::RetryScheduled {
1701                delay_until: TimestampMs::from_millis(delay_until),
1702            })
1703        }
1704        "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1705        _ => Err(SdkError::from(ScriptError::Parse {
1706            fcall: "parse_fail_result".into(),
1707            execution_id: None,
1708            message: format!(
1709            "ff_fail_execution: unexpected sub-status: {sub_status}"
1710        ),
1711        })),
1712    }
1713}
1714
1715// ── Stream read / tail (consumer API, RFC-006 #2) ──
1716
1717/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1718/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1719/// server would accept.
1720pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1721
1722/// Maximum frames per read/tail call. Mirrors
1723/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1724/// callers don't need to import ff-core just to read the bound.
1725pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1726
1727/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1728/// signal so polling consumers can exit cleanly.
1729///
1730/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1731pub use ff_core::contracts::StreamFrames;
1732
1733/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1734/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1735/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1736/// `StreamCursor::Start` / `StreamCursor::End` instead.
1737pub use ff_core::contracts::StreamCursor;
1738
1739/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1740/// — XREAD does not accept the open markers. Pulled out as a bare
1741/// function so unit tests can exercise the guard without constructing a
1742/// live `ferriskey::Client`.
1743fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1744    if !after.is_concrete() {
1745        return Err(SdkError::Config {
1746            context: "tail_stream".into(),
1747            field: Some("after".into()),
1748            message: "XREAD cursor must be a concrete entry id; pass \
1749                      StreamCursor::from_beginning() to start from the \
1750                      beginning"
1751                .into(),
1752        });
1753    }
1754    Ok(())
1755}
1756
1757fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1758    if count_limit == 0 {
1759        return Err(SdkError::Config {
1760            context: "read_stream_frames".into(),
1761            field: Some("count_limit".into()),
1762            message: "count_limit must be >= 1".into(),
1763        });
1764    }
1765    if count_limit > STREAM_READ_HARD_CAP {
1766        return Err(SdkError::Config {
1767            context: "read_stream_frames".into(),
1768            field: Some("count_limit".into()),
1769            message: format!(
1770                "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1771            ),
1772        });
1773    }
1774    Ok(())
1775}
1776
1777/// Read frames from a completed or in-flight attempt's stream.
1778///
1779/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1780/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1781/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1782/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1783/// `0` returns [`SdkError::Config`].
1784///
1785/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1786/// consumers know when the producer has finalized the stream. A
1787/// never-written attempt and an in-progress stream are indistinguishable
1788/// here — both present as `frames=[]`, `closed_at=None`.
1789///
1790/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1791/// client but are not the lease-holding worker — no lease check is
1792/// performed.
1793///
1794/// # Head-of-line note
1795///
1796/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1797/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1798/// calling this on a `client` that is also serving FCALLs stalls those
1799/// FCALLs behind the reply. The REST server isolates reads on its
1800/// `tail_client`; direct SDK callers should either use a dedicated
1801/// client OR paginate through smaller `count_limit` slices.
1802pub async fn read_stream(
1803    backend: &dyn EngineBackend,
1804    execution_id: &ExecutionId,
1805    attempt_index: AttemptIndex,
1806    from: StreamCursor,
1807    to: StreamCursor,
1808    count_limit: u64,
1809) -> Result<StreamFrames, SdkError> {
1810    validate_stream_read_count(count_limit)?;
1811    Ok(backend
1812        .read_stream(execution_id, attempt_index, from, to, count_limit)
1813        .await?)
1814}
1815
1816/// Tail a live attempt's stream.
1817///
1818/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1819/// with id strictly greater than `after`. Pass
1820/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1821/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1822/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1823/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1824///
1825/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1826/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1827/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1828/// SDK and REST ceilings aligned.
1829///
1830/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1831/// polling consumers should loop until `result.is_closed()` is true, then
1832/// drain and exit. Timeout with no new frames presents as
1833/// `frames=[], closed_at=None`.
1834///
1835/// # Head-of-line warning — use a dedicated client
1836///
1837/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1838/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1839/// the read side until a frame arrives or the block elapses. If the
1840/// `client` you pass here is ALSO used for claims, completes, fails,
1841/// appends, or any other FCALL, a 30-second tail will stall all those
1842/// calls for up to 30 seconds.
1843///
1844/// **Strongly recommended**: build a separate `ferriskey::Client` for
1845/// tail callers — mirrors the `Server::tail_client` split that the REST
1846/// server uses internally (see `crates/ff-server/src/server.rs` and
1847/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1848///
1849/// # Tail parallelism caveat (same mux)
1850///
1851/// Even a dedicated tail client is still one multiplexed TCP connection.
1852/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1853/// ferriskey's per-call `request_timeout` starts at future-poll — so
1854/// two concurrent tails against the same client can time out spuriously:
1855/// the second call's BLOCK budget elapses while it waits for the first
1856/// BLOCK to return. The REST server handles this internally with a
1857/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1858/// each call its full `block_ms` budget at the server.
1859///
1860/// **Direct SDK callers that need concurrent tails**: either
1861///   (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1862///       pool of clients, rotated by the caller), OR
1863///   (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1864///       only one BLOCK is in flight per client at a time.
1865/// If you need the REST-side backpressure (429 on contention) and the
1866/// built-in serializer, go through the
1867/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1868/// than calling this directly.
1869///
1870/// This SDK does not enforce either pattern — the mutex belongs at the
1871/// application layer, and the connection pool belongs at the SDK
1872/// caller's DI layer; neither has a structured place inside this
1873/// helper.
1874///
1875/// # Timeout handling
1876///
1877/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1878/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1879/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1880/// 500ms)`, overriding the client's default per-call. A tail with
1881/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1882/// the client was built with a shorter `request_timeout`. No custom client
1883/// configuration is required for timeout reasons — only for head-of-line
1884/// isolation above.
1885pub async fn tail_stream(
1886    backend: &dyn EngineBackend,
1887    execution_id: &ExecutionId,
1888    attempt_index: AttemptIndex,
1889    after: StreamCursor,
1890    block_ms: u64,
1891    count_limit: u64,
1892) -> Result<StreamFrames, SdkError> {
1893    tail_stream_with_visibility(
1894        backend,
1895        execution_id,
1896        attempt_index,
1897        after,
1898        block_ms,
1899        count_limit,
1900        ff_core::backend::TailVisibility::All,
1901    )
1902    .await
1903}
1904
1905/// Tail helper with an explicit RFC-015
1906/// [`TailVisibility`](ff_core::backend::TailVisibility) filter.
1907pub async fn tail_stream_with_visibility(
1908    backend: &dyn EngineBackend,
1909    execution_id: &ExecutionId,
1910    attempt_index: AttemptIndex,
1911    after: StreamCursor,
1912    block_ms: u64,
1913    count_limit: u64,
1914    visibility: ff_core::backend::TailVisibility,
1915) -> Result<StreamFrames, SdkError> {
1916    if block_ms > MAX_TAIL_BLOCK_MS {
1917        return Err(SdkError::Config {
1918            context: "tail_stream".into(),
1919            field: Some("block_ms".into()),
1920            message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1921        });
1922    }
1923    validate_stream_read_count(count_limit)?;
1924    // XREAD does not accept `-` / `+` markers as cursors — reject at
1925    // the SDK boundary with `SdkError::Config` rather than forwarding
1926    // an invalid `-`/`+` into the backend (which would surface as an
1927    // opaque `EngineError::Transport`).
1928    validate_tail_cursor(&after)?;
1929
1930    Ok(backend
1931        .tail_stream(
1932            execution_id,
1933            attempt_index,
1934            after,
1935            block_ms,
1936            count_limit,
1937            visibility,
1938        )
1939        .await?)
1940}
1941
1942#[cfg(test)]
1943mod tail_stream_boundary_tests {
1944    use super::*;
1945
1946    // `validate_tail_cursor` rejects `StreamCursor::Start` and
1947    // `StreamCursor::End` before `tail_stream` touches the client —
1948    // same shape as the `count_limit` guard above. The matching
1949    // full-path rejection on the REST layer is covered by
1950    // `ff-server::api`.
1951
1952    #[test]
1953    fn rejects_start_cursor() {
1954        let err = validate_tail_cursor(&StreamCursor::Start)
1955            .expect_err("Start must be rejected");
1956        match err {
1957            SdkError::Config { field, context, .. } => {
1958                assert_eq!(field.as_deref(), Some("after"));
1959                assert_eq!(context, "tail_stream");
1960            }
1961            other => panic!("expected SdkError::Config, got {other:?}"),
1962        }
1963    }
1964
1965    #[test]
1966    fn rejects_end_cursor() {
1967        let err = validate_tail_cursor(&StreamCursor::End)
1968            .expect_err("End must be rejected");
1969        assert!(matches!(err, SdkError::Config { .. }));
1970    }
1971
1972    #[test]
1973    fn accepts_at_cursor() {
1974        validate_tail_cursor(&StreamCursor::At("0-0".into()))
1975            .expect("At cursor must be accepted");
1976        validate_tail_cursor(&StreamCursor::from_beginning())
1977            .expect("from_beginning() must be accepted");
1978        validate_tail_cursor(&StreamCursor::At("123-0".into()))
1979            .expect("concrete id must be accepted");
1980    }
1981}
1982
1983#[cfg(test)]
1984mod parse_report_usage_result_tests {
1985    use super::*;
1986
1987    /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
1988    /// BulkString and SimpleString uniformly (see
1989    /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
1990    /// `Value::SimpleString(s)` → clone). SimpleString avoids a
1991    /// dev-dependency on `bytes` just for test construction.
1992    fn s(v: &str) -> Result<Value, ferriskey::Error> {
1993        Ok(Value::SimpleString(v.to_owned()))
1994    }
1995
1996    fn int(n: i64) -> Result<Value, ferriskey::Error> {
1997        Ok(Value::Int(n))
1998    }
1999
2000    fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2001        Value::Array(items)
2002    }
2003
2004    #[test]
2005    fn ok_status() {
2006        let raw = arr(vec![int(1), s("OK")]);
2007        assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2008    }
2009
2010    #[test]
2011    fn already_applied_status() {
2012        let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2013        assert_eq!(
2014            parse_report_usage_result(&raw).unwrap(),
2015            ReportUsageResult::AlreadyApplied
2016        );
2017    }
2018
2019    #[test]
2020    fn soft_breach_status() {
2021        let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2022        match parse_report_usage_result(&raw).unwrap() {
2023            ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2024                assert_eq!(dimension, "tokens");
2025                assert_eq!(current_usage, 150);
2026                assert_eq!(soft_limit, 100);
2027            }
2028            other => panic!("expected SoftBreach, got {other:?}"),
2029        }
2030    }
2031
2032    #[test]
2033    fn hard_breach_status() {
2034        let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2035        match parse_report_usage_result(&raw).unwrap() {
2036            ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2037                assert_eq!(dimension, "requests");
2038                assert_eq!(current_usage, 10001);
2039                assert_eq!(hard_limit, 10000);
2040            }
2041            other => panic!("expected HardBreach, got {other:?}"),
2042        }
2043    }
2044
2045    /// Negative case: non-Array input. Guards against a future Lua refactor
2046    /// that accidentally returns a bare string/int — the parser must fail
2047    /// loudly rather than silently succeed or panic.
2048    #[test]
2049    fn non_array_input_is_parse_error() {
2050        let raw = Value::SimpleString("OK".to_owned());
2051        let err = parse_report_usage_result(&raw).unwrap_err();
2052        let msg = format!("{err}");
2053        assert!(
2054            msg.to_lowercase().contains("expected array"),
2055            "error should mention expected shape, got: {msg}"
2056        );
2057    }
2058
2059    /// Negative case: Array whose first element isn't an Int status code.
2060    /// The Lua function's first return slot is always `status_code` (1 on
2061    /// success, an error code otherwise); a non-Int there is a wire-format
2062    /// break that must surface as a parse error.
2063    #[test]
2064    fn first_element_non_int_is_parse_error() {
2065        let raw = arr(vec![s("not_an_int"), s("OK")]);
2066        let err = parse_report_usage_result(&raw).unwrap_err();
2067        let msg = format!("{err}");
2068        assert!(
2069            msg.to_lowercase().contains("int"),
2070            "error should mention Int status code, got: {msg}"
2071        );
2072    }
2073
2074    /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2075    /// field. Guards against the silent-coercion defect cross-review
2076    /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2077    /// which would have surfaced Lua-side wire-format drift as a
2078    /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2079    /// but semantically wrong (a "breach with zero usage" is nonsense
2080    /// and masks the real error).
2081    #[test]
2082    fn soft_breach_non_numeric_current_is_parse_error() {
2083        let raw = arr(vec![
2084            int(1),
2085            s("SOFT_BREACH"),
2086            s("tokens"),
2087            s("not_a_number"), // current_usage — must fail, not coerce to 0
2088            s("100"),
2089        ]);
2090        let err = parse_report_usage_result(&raw).unwrap_err();
2091        let msg = format!("{err}");
2092        assert!(
2093            msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2094            "error should identify sub-status + field, got: {msg}"
2095        );
2096        assert!(
2097            msg.to_lowercase().contains("u64"),
2098            "error should mention the expected type (u64), got: {msg}"
2099        );
2100    }
2101
2102    /// Negative case: HARD_BREACH with the limit slot missing
2103    /// entirely. Same defence as the non-numeric test above: a
2104    /// truncated response must fail loudly rather than coerce to 0.
2105    #[test]
2106    fn hard_breach_missing_limit_is_parse_error() {
2107        let raw = arr(vec![
2108            int(1),
2109            s("HARD_BREACH"),
2110            s("requests"),
2111            s("10001"),
2112            // no index 4 — hard_limit missing
2113        ]);
2114        let err = parse_report_usage_result(&raw).unwrap_err();
2115        let msg = format!("{err}");
2116        assert!(
2117            msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2118            "error should identify sub-status + field, got: {msg}"
2119        );
2120        assert!(
2121            msg.to_lowercase().contains("missing"),
2122            "error should say 'missing', got: {msg}"
2123        );
2124    }
2125}
2126
2127/// RFC-014 helper: pick the first waitpoint_key out of a composite
2128/// tree so `try_suspend_inner` can rebind a Fresh waitpoint to the
2129/// user-supplied key. Single-waitpoint scoping: all
2130/// Single.waitpoint_key / Count.waitpoints[i] in the tree must be
2131/// equal; this function returns the first one it encounters.
2132fn composite_first_waitpoint_key(body: &CompositeBody) -> Option<String> {
2133    match body {
2134        CompositeBody::AllOf { members } => members.iter().find_map(|m| match m {
2135            ResumeCondition::Single { waitpoint_key, .. } => Some(waitpoint_key.clone()),
2136            ResumeCondition::Composite(inner) => composite_first_waitpoint_key(inner),
2137            _ => None,
2138        }),
2139        CompositeBody::Count { waitpoints, .. } => waitpoints.first().cloned(),
2140        _ => None,
2141    }
2142}
2143
2144
2145#[cfg(test)]
2146mod resume_signals_tests {
2147    use super::*;
2148
2149    fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2150        pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2151    }
2152
2153    #[test]
2154    fn empty_suspension_returns_none() {
2155        let susp = m(&[]);
2156        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2157        assert!(out.is_none(), "no suspension record → None");
2158    }
2159
2160    #[test]
2161    fn stale_prior_attempt_returns_none() {
2162        let wp = WaitpointId::new();
2163        let susp = m(&[
2164            ("attempt_index", "0"),
2165            ("close_reason", "resumed"),
2166            ("waitpoint_id", &wp.to_string()),
2167        ]);
2168        // Claimed attempt is 1; suspension belongs to 0 → stale.
2169        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2170        assert!(out.is_none(), "attempt_index mismatch → None");
2171    }
2172
2173    #[test]
2174    fn non_resumed_close_returns_none() {
2175        let wp = WaitpointId::new();
2176        for reason in ["timeout", "cancelled", "", "expired"] {
2177            let susp = m(&[
2178                ("attempt_index", "0"),
2179                ("close_reason", reason),
2180                ("waitpoint_id", &wp.to_string()),
2181            ]);
2182            let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2183            assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2184        }
2185    }
2186
2187    #[test]
2188    fn resumed_same_attempt_returns_waitpoint() {
2189        let wp = WaitpointId::new();
2190        let susp = m(&[
2191            ("attempt_index", "2"),
2192            ("close_reason", "resumed"),
2193            ("waitpoint_id", &wp.to_string()),
2194        ]);
2195        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2196        assert_eq!(out, Some(wp));
2197    }
2198
2199    #[test]
2200    fn malformed_waitpoint_id_is_error() {
2201        let susp = m(&[
2202            ("attempt_index", "0"),
2203            ("close_reason", "resumed"),
2204            ("waitpoint_id", "not-a-uuid"),
2205        ]);
2206        let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2207        assert!(
2208            format!("{err}").contains("not a valid UUID"),
2209            "error should mention invalid UUID, got: {err}"
2210        );
2211    }
2212
2213    #[test]
2214    fn empty_waitpoint_id_returns_none() {
2215        // Defensive: an empty waitpoint_id field (shouldn't happen on
2216        // resumed records, but guard against partial writes) is None, not an error.
2217        let susp = m(&[
2218            ("attempt_index", "0"),
2219            ("close_reason", "resumed"),
2220            ("waitpoint_id", ""),
2221        ]);
2222        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2223        assert!(out.is_none());
2224    }
2225
2226    // The previous `matched_signal_ids_from_condition` helper was
2227    // removed when the production path switched from unbounded
2228    // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2229    // feedback on unbounded condition-hash reply size). That loop
2230    // is exercised by the integration tests in `ff-test`.
2231}
2232
2233#[cfg(test)]
2234mod terminal_replay_parsing_tests {
2235    //! Unit tests for the SDK's parse path of the enriched
2236    //! `execution_not_active` error returned on a terminal-op replay.
2237    //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2238    //! the Lua side emits the 4-slot detail; these tests prove the
2239    //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2240    //! variant so the reconciler in complete()/fail()/cancel() can
2241    //! match on them.
2242
2243    use super::*;
2244    use ferriskey::Value;
2245
2246    // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2247    // in the test harness. parse_success_result + parse_fail_result handle both.
2248    fn bulk(s: &str) -> Value {
2249        Value::SimpleString(s.to_owned())
2250    }
2251
2252    /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2253    #[test]
2254    fn parse_success_result_extracts_all_four_detail_slots() {
2255        let raw = Value::Array(vec![
2256            Ok(Value::Int(0)),
2257            Ok(bulk("execution_not_active")),
2258            Ok(bulk("success")),
2259            Ok(bulk("42")),
2260            Ok(bulk("terminal")),
2261            Ok(bulk("11111111-1111-1111-1111-111111111111")),
2262        ]);
2263        let err = parse_success_result(&raw, "test").unwrap_err();
2264        let unboxed = match err {
2265            SdkError::Engine(b) => *b,
2266            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2267        };
2268        match unboxed {
2269            crate::EngineError::Contention(
2270                crate::ContentionKind::ExecutionNotActive {
2271                    terminal_outcome,
2272                    lease_epoch,
2273                    lifecycle_phase,
2274                    attempt_id,
2275                },
2276            ) => {
2277                assert_eq!(terminal_outcome, "success");
2278                assert_eq!(lease_epoch, "42");
2279                assert_eq!(lifecycle_phase, "terminal");
2280                assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2281            }
2282            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2283        }
2284    }
2285
2286    /// parse_fail_result must extract all detail slots too so the
2287    /// reconciler in fail() can match lifecycle_phase = "runnable" for
2288    /// retry-scheduled replays.
2289    #[test]
2290    fn parse_fail_result_extracts_all_four_detail_slots() {
2291        let raw = Value::Array(vec![
2292            Ok(Value::Int(0)),
2293            Ok(bulk("execution_not_active")),
2294            Ok(bulk("none")),
2295            Ok(bulk("7")),
2296            Ok(bulk("runnable")),
2297            Ok(bulk("22222222-2222-2222-2222-222222222222")),
2298        ]);
2299        let err = parse_fail_result(&raw).unwrap_err();
2300        let unboxed = match err {
2301            SdkError::Engine(b) => *b,
2302            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2303        };
2304        match unboxed {
2305            crate::EngineError::Contention(
2306                crate::ContentionKind::ExecutionNotActive {
2307                    terminal_outcome,
2308                    lease_epoch,
2309                    lifecycle_phase,
2310                    attempt_id,
2311                },
2312            ) => {
2313                assert_eq!(terminal_outcome, "none");
2314                assert_eq!(lease_epoch, "7");
2315                assert_eq!(lifecycle_phase, "runnable");
2316                assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2317            }
2318            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2319        }
2320    }
2321
2322    /// Empty detail slots must default to "" (not panic) so older-Lua
2323    /// producers or malformed replies degrade to an unreconcilable
2324    /// variant rather than a Parse error.
2325    #[test]
2326    fn parse_success_result_missing_slots_defaults_to_empty() {
2327        let raw = Value::Array(vec![
2328            Ok(Value::Int(0)),
2329            Ok(bulk("execution_not_active")),
2330        ]);
2331        let err = parse_success_result(&raw, "test").unwrap_err();
2332        let unboxed = match err {
2333            SdkError::Engine(b) => *b,
2334            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2335        };
2336        match unboxed {
2337            crate::EngineError::Contention(
2338                crate::ContentionKind::ExecutionNotActive {
2339                    terminal_outcome,
2340                    lease_epoch,
2341                    lifecycle_phase,
2342                    attempt_id,
2343                },
2344            ) => {
2345                assert_eq!(terminal_outcome, "");
2346                assert_eq!(lease_epoch, "");
2347                assert_eq!(lifecycle_phase, "");
2348                assert_eq!(attempt_id, "");
2349            }
2350            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2351        }
2352    }
2353}