Skip to main content

ff_sdk/
task.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_script::error::ScriptError;
11use ff_core::keys::{ExecKeyContext, IndexKeys};
12use ff_core::partition::{execution_partition, PartitionConfig};
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21/// Timeout behavior when a suspension deadline is reached.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum TimeoutBehavior {
24    Fail,
25    Cancel,
26    Expire,
27    AutoResume,
28    Escalate,
29}
30
31impl TimeoutBehavior {
32    pub fn as_str(&self) -> &str {
33        match self {
34            Self::Fail => "fail",
35            Self::Cancel => "cancel",
36            Self::Expire => "expire",
37            Self::AutoResume => "auto_resume_with_timeout_signal",
38            Self::Escalate => "escalate",
39        }
40    }
41}
42
43impl std::fmt::Display for TimeoutBehavior {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.write_str(self.as_str())
46    }
47}
48
49impl std::str::FromStr for TimeoutBehavior {
50    type Err = String;
51
52    fn from_str(s: &str) -> Result<Self, Self::Err> {
53        match s {
54            "fail" => Ok(Self::Fail),
55            "cancel" => Ok(Self::Cancel),
56            "expire" => Ok(Self::Expire),
57            "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
58            "escalate" => Ok(Self::Escalate),
59            other => Err(format!("unknown timeout behavior: {other}")),
60        }
61    }
62}
63
64/// A condition matcher for the resume condition.
65#[derive(Clone, Debug)]
66pub struct ConditionMatcher {
67    /// Signal name to match (empty = wildcard).
68    pub signal_name: String,
69}
70
71/// Outcome of a `suspend()` call.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub enum SuspendOutcome {
74    /// Execution is now suspended, waitpoint active.
75    Suspended {
76        suspension_id: SuspensionId,
77        waitpoint_id: WaitpointId,
78        waitpoint_key: String,
79        /// HMAC token that signal deliverers must supply to this waitpoint.
80        waitpoint_token: WaitpointToken,
81    },
82    /// Buffered signals already satisfied the condition — suspension skipped.
83    /// The caller still holds the lease.
84    AlreadySatisfied {
85        suspension_id: SuspensionId,
86        waitpoint_id: WaitpointId,
87        waitpoint_key: String,
88        waitpoint_token: WaitpointToken,
89    },
90}
91
92/// A signal to deliver to a suspended execution's waitpoint.
93#[derive(Clone, Debug)]
94pub struct Signal {
95    pub signal_name: String,
96    pub signal_category: String,
97    pub payload: Option<Vec<u8>>,
98    pub source_type: String,
99    pub source_identity: String,
100    pub idempotency_key: Option<String>,
101    /// HMAC token issued when the waitpoint was created. Required for
102    /// authenticated signal delivery (RFC-004 §Waitpoint Security).
103    pub waitpoint_token: WaitpointToken,
104}
105
106/// Outcome of `deliver_signal()`.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum SignalOutcome {
109    /// Signal accepted, appended to waitpoint but condition not yet satisfied.
110    Accepted { signal_id: SignalId, effect: String },
111    /// Signal triggered resume — execution is now runnable.
112    TriggeredResume { signal_id: SignalId },
113    /// Duplicate signal (idempotency key matched).
114    Duplicate { existing_signal_id: String },
115}
116
117impl SignalOutcome {
118    /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
119    ///
120    /// Consuming packages that call `ff_deliver_signal` directly can use this
121    /// to interpret the Lua return value without depending on SDK internals.
122    pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
123        parse_signal_result(raw)
124    }
125}
126
127/// A signal that triggered a resume, readable by a worker after re-claim.
128///
129/// **RFC-012 Stage 0:** the canonical definition has moved to
130/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
131/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
132/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
133/// scheduled for removal in 0.5.0.
134pub use ff_core::backend::ResumeSignal;
135
136/// Outcome of `append_frame()`.
137///
138/// **RFC-012 §R7.2.1:** canonical definition moved to
139/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
140/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
141/// 0.4.x window. Existing consumers construct + match exhaustively
142/// without change. The derive set widened from `Clone, Debug` to
143/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
144pub use ff_core::backend::AppendFrameOutcome;
145
146/// Outcome of a `fail()` call.
147///
148/// **RFC-012 Stage 1a:** canonical definition moved to
149/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
150/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
151/// re-export in `lib.rs`) through the 0.4.x window. Existing
152/// consumers construct + match exhaustively without change.
153pub use ff_core::backend::FailOutcome;
154
155/// A claimed execution with an active lease. The worker processes this task
156/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
157///
158/// The lease is automatically renewed in the background at `lease_ttl / 3`
159/// intervals. Renewal stops when the task is consumed or dropped.
160///
161/// `complete`, `fail`, and `cancel` consume `self` — this prevents
162/// double-complete bugs at the type level.
163pub struct ClaimedTask {
164    /// Shared Valkey client.
165    client: Client,
166    /// `EngineBackend` the trait-migrated ops forward through.
167    ///
168    /// **RFC-012 Stage 1b + Round-7.** Today this is always a
169    /// `ValkeyBackend` wrapping `client`. Most per-task ops
170    /// (`complete`/`fail`/`cancel`/`delay_execution`/
171    /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
172    /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
173    /// through `backend.*()`. `suspend` still uses
174    /// `client.fcall("ff_suspend_execution", ...)` directly — this is
175    /// the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
176    /// input-shape work. Lease renewal goes through
177    /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
178    /// trait-shape gaps tracked by #117.
179    ///
180    /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
181    /// deliver_signal) through the same trait surface; Stage 1d
182    /// refactors this struct to carry a single `Handle` rather than
183    /// synthesising one per op via `synth_handle`.
184    backend: Arc<dyn EngineBackend>,
185    /// Partition config for key construction.
186    partition_config: PartitionConfig,
187    /// Execution identity.
188    execution_id: ExecutionId,
189    /// Current attempt.
190    attempt_index: AttemptIndex,
191    attempt_id: AttemptId,
192    /// Lease identity.
193    lease_id: LeaseId,
194    lease_epoch: LeaseEpoch,
195    /// Lease timing.
196    lease_ttl_ms: u64,
197    /// Lane used at claim time.
198    lane_id: LaneId,
199    /// Worker instance that holds this lease (for index cleanup keys).
200    worker_instance_id: WorkerInstanceId,
201    /// Execution data.
202    input_payload: Vec<u8>,
203    execution_kind: String,
204    tags: HashMap<String, String>,
205    /// Background renewal task handle.
206    renewal_handle: JoinHandle<()>,
207    /// Signal to stop renewal (used before consuming self).
208    renewal_stop: Arc<Notify>,
209    /// Consecutive lease renewal failures. Shared with the background renewal
210    /// task. Reset to 0 on each successful renewal. Workers should check
211    /// `is_lease_healthy()` before committing expensive side effects.
212    renewal_failures: Arc<AtomicU32>,
213    /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
214    /// response is received, just before `self` is consumed into `Drop`.
215    /// `Drop` reads this instead of `renewal_handle.is_finished()` to
216    /// suppress the false-positive "dropped without terminal operation"
217    /// warning: after `notify_one`, the renewal task has not yet been
218    /// polled by the runtime, so `is_finished()` is still `false` on the
219    /// happy path when self is being consumed.
220    ///
221    /// Note: the flag is set for any terminal-op path that reaches
222    /// `stop_renewal()`, which includes Lua-level script errors (the
223    /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
224    /// the caller already receives the `Err` via the op's return value,
225    /// so an additional `Drop` warning would be noise. The warning is
226    /// reserved for genuine drop-without-terminal-op cases (panic, early
227    /// return, transport failure before stop_renewal ran).
228    terminal_op_called: AtomicBool,
229    /// Concurrency permit from the worker's semaphore. Held for the lifetime
230    /// of the task; released on complete/fail/cancel/drop.
231    _concurrency_permit: Option<OwnedSemaphorePermit>,
232}
233
234impl ClaimedTask {
235    /// Construct a `ClaimedTask` from the results of a successful
236    /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
237    ///
238    /// # Arguments
239    ///
240    /// * `client` — shared Valkey client used for subsequent lease
241    ///   renewals, signal delivery, and the final
242    ///   complete/fail/cancel FCALL.
243    /// * `partition_config` — partition topology snapshot read at
244    ///   `FlowFabricWorker::connect`. Used for key construction on
245    ///   the lifetime of this task.
246    /// * `execution_id` — the claimed execution's UUID.
247    /// * `attempt_index` / `attempt_id` — current attempt identity.
248    ///   `attempt_index` is 0 on a fresh claim, preserved on a
249    ///   resumed claim.
250    /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
251    ///   is returned by the Lua FCALL and bumped on each resumed
252    ///   claim.
253    /// * `lease_ttl_ms` — lease TTL used to schedule the background
254    ///   renewal task (renews at `lease_ttl_ms / 3`).
255    /// * `lane_id` — the lane the task was claimed on. Used for
256    ///   index key construction (`lane_active`, `lease_expiry`,
257    ///   etc.).
258    /// * `worker_instance_id` — this worker's identity. Used to
259    ///   resolve `worker_leases` index entries during renewal and
260    ///   completion.
261    /// * `input_payload` / `execution_kind` / `tags` — pre-read
262    ///   execution metadata, exposed via getters so the worker
263    ///   doesn't round-trip to Valkey to inspect what it just
264    ///   claimed.
265    ///
266    /// # Invariant: constructor is `pub(crate)` on purpose
267    ///
268    /// **External callers cannot construct a `ClaimedTask`** — only
269    /// the in-crate claim entry points (`claim_next`,
270    /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
271    /// load-bearing: those entry points are the ONLY sites that
272    /// acquire a permit from the worker's concurrency semaphore
273    /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
274    /// it through `ClaimedTask::set_concurrency_permit` before
275    /// returning the task. Promoting `new` to `pub` would let
276    /// consumers build tasks that bypass the concurrency contract
277    /// the worker's `max_concurrent_tasks` config advertises — the
278    /// returned task would run alongside other in-flight work
279    /// without debiting the permit bank, and the
280    /// complete/fail/cancel/drop path would have nothing to
281    /// release.
282    ///
283    /// If an external callsite genuinely needs to rehydrate a
284    /// task from saved FCALL results, the right answer is a new
285    /// `FlowFabricWorker` entry point that wraps `new` + acquires a
286    /// permit — not promoting this constructor.
287    #[allow(clippy::too_many_arguments)]
288    pub(crate) fn new(
289        client: Client,
290        backend: Arc<dyn EngineBackend>,
291        partition_config: PartitionConfig,
292        execution_id: ExecutionId,
293        attempt_index: AttemptIndex,
294        attempt_id: AttemptId,
295        lease_id: LeaseId,
296        lease_epoch: LeaseEpoch,
297        lease_ttl_ms: u64,
298        lane_id: LaneId,
299        worker_instance_id: WorkerInstanceId,
300        input_payload: Vec<u8>,
301        execution_kind: String,
302        tags: HashMap<String, String>,
303    ) -> Self {
304        let renewal_stop = Arc::new(Notify::new());
305        let renewal_failures = Arc::new(AtomicU32::new(0));
306
307        // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
308        // Build the handle once at construction time and hand both Arc clones
309        // into the spawned task.
310        let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
311            execution_id.clone(),
312            attempt_index,
313            attempt_id.clone(),
314            lease_id.clone(),
315            lease_epoch,
316            lease_ttl_ms,
317            lane_id.clone(),
318            worker_instance_id.clone(),
319            HandleKind::Fresh,
320        );
321        let renewal_handle = spawn_renewal_task(
322            backend.clone(),
323            renewal_handle_cookie,
324            execution_id.clone(),
325            lease_ttl_ms,
326            renewal_stop.clone(),
327            renewal_failures.clone(),
328        );
329
330        Self {
331            client,
332            backend,
333            partition_config,
334            execution_id,
335            attempt_index,
336            attempt_id,
337            lease_id,
338            lease_epoch,
339            lease_ttl_ms,
340            lane_id,
341            worker_instance_id,
342            input_payload,
343            execution_kind,
344            tags,
345            renewal_handle,
346            renewal_stop,
347            renewal_failures,
348            terminal_op_called: AtomicBool::new(false),
349            _concurrency_permit: None,
350        }
351    }
352
353    /// Attach a concurrency permit from the worker's semaphore.
354    /// The permit is held for the task's lifetime and released on drop.
355    #[allow(dead_code)]
356    pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
357        self._concurrency_permit = Some(permit);
358    }
359
360    // ── Accessors ──
361
362    pub fn execution_id(&self) -> &ExecutionId {
363        &self.execution_id
364    }
365
366    pub fn attempt_index(&self) -> AttemptIndex {
367        self.attempt_index
368    }
369
370    pub fn attempt_id(&self) -> &AttemptId {
371        &self.attempt_id
372    }
373
374    pub fn lease_id(&self) -> &LeaseId {
375        &self.lease_id
376    }
377
378    pub fn lease_epoch(&self) -> LeaseEpoch {
379        self.lease_epoch
380    }
381
382    pub fn input_payload(&self) -> &[u8] {
383        &self.input_payload
384    }
385
386    pub fn execution_kind(&self) -> &str {
387        &self.execution_kind
388    }
389
390    pub fn tags(&self) -> &HashMap<String, String> {
391        &self.tags
392    }
393
394    pub fn lane_id(&self) -> &LaneId {
395        &self.lane_id
396    }
397
398    /// Read frames from this task's attempt stream.
399    ///
400    /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
401    /// no longer need the `ferriskey::Client` leak (#87). See
402    /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
403    /// for the backend-level contract.
404    ///
405    /// Validation (count_limit bounds) runs at this boundary — out-of-range
406    /// input surfaces as [`SdkError::Config`] before reaching the backend.
407    pub async fn read_stream(
408        &self,
409        from: StreamCursor,
410        to: StreamCursor,
411        count_limit: u64,
412    ) -> Result<StreamFrames, SdkError> {
413        validate_stream_read_count(count_limit)?;
414        Ok(self
415            .backend
416            .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
417            .await?)
418    }
419
420    /// Tail this task's attempt stream for new frames.
421    ///
422    /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
423    /// for the head-of-line warning — the task's backend is shared
424    /// with claim/complete/fail hot paths. Consumers that need
425    /// isolation should build a dedicated `EngineBackend` for tail
426    /// reads.
427    pub async fn tail_stream(
428        &self,
429        after: StreamCursor,
430        block_ms: u64,
431        count_limit: u64,
432    ) -> Result<StreamFrames, SdkError> {
433        if block_ms > MAX_TAIL_BLOCK_MS {
434            return Err(SdkError::Config {
435                context: "tail_stream".into(),
436                field: Some("block_ms".into()),
437                message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
438            });
439        }
440        validate_stream_read_count(count_limit)?;
441        validate_tail_cursor(&after)?;
442        Ok(self
443            .backend
444            .tail_stream(
445                &self.execution_id,
446                self.attempt_index,
447                after,
448                block_ms,
449                count_limit,
450            )
451            .await?)
452    }
453
454    /// Synthesise a Valkey-backend `Handle` encoding this task's
455    /// attempt-cookie fields. Private to the SDK — the trait
456    /// forwarders call this per op to produce the `&Handle` argument
457    /// the `EngineBackend` methods take.
458    ///
459    /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
460    /// carry one cached `Handle` instead of synthesising per op is
461    /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
462    /// a single allocation of a small byte buffer — negligible
463    /// compared to the FCALL round-trip that follows.
464    ///
465    /// Kind is always `HandleKind::Fresh` because today the 9
466    /// Stage-1b ops all treat the backend tag + opaque bytes as
467    /// authoritative; they do not dispatch on kind. Stage 1d routes
468    /// resumed-claim callers through a `Resumed` synth.
469    fn synth_handle(&self) -> Handle {
470        ff_backend_valkey::ValkeyBackend::encode_handle(
471            self.execution_id.clone(),
472            self.attempt_index,
473            self.attempt_id.clone(),
474            self.lease_id.clone(),
475            self.lease_epoch,
476            self.lease_ttl_ms,
477            self.lane_id.clone(),
478            self.worker_instance_id.clone(),
479            HandleKind::Fresh,
480        )
481    }
482
483    /// Check if the lease is likely still valid based on renewal success.
484    ///
485    /// Returns `false` if 3 or more consecutive renewal attempts have failed.
486    /// Workers should check this before committing expensive or irreversible
487    /// side effects. A `false` return means Valkey may have already expired
488    /// the lease and another worker could be processing this execution.
489    pub fn is_lease_healthy(&self) -> bool {
490        self.renewal_failures.load(Ordering::Relaxed) < 3
491    }
492
493    /// Number of consecutive lease renewal failures since the last success.
494    ///
495    /// Returns 0 when renewals are working normally. Useful for observability
496    /// and custom health policies beyond the default threshold of 3.
497    pub fn consecutive_renewal_failures(&self) -> u32 {
498        self.renewal_failures.load(Ordering::Relaxed)
499    }
500
501    // ── Terminal operations (consume self) ──
502
503    /// Delay the execution until `delay_until`.
504    ///
505    /// Releases the lease. The execution moves to `delayed` state.
506    /// Consumes self — the task cannot be used after delay.
507    pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
508        // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
509        // the pre-migration `stop_renewal` contract: called only when
510        // the FCALL round-tripped (Ok or a typed Lua/engine error);
511        // raw transport errors bubble up without stopping renewal so
512        // the `Drop` warning still fires if the caller later drops
513        // `self` without retrying.
514        let handle = self.synth_handle();
515        let out = self.backend.delay(&handle, delay_until).await;
516        if fcall_landed(&out) {
517            self.stop_renewal();
518        }
519        out.map_err(SdkError::from)
520    }
521
522    /// Move execution to waiting_children state.
523    ///
524    /// Releases the lease. The execution waits for child dependencies to complete.
525    /// Consumes self.
526    pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
527        // RFC-012 Stage 1b: forwards through `backend.wait_children`.
528        // See `delay_execution` for the stop_renewal contract.
529        let handle = self.synth_handle();
530        let out = self.backend.wait_children(&handle).await;
531        if fcall_landed(&out) {
532            self.stop_renewal();
533        }
534        out.map_err(SdkError::from)
535    }
536
537    /// Complete the execution successfully.
538    ///
539    /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
540    /// Renewal continues during the FCALL to prevent lease expiry under
541    /// network latency — the Lua fences on lease_id+epoch atomically.
542    /// Consumes self — the task cannot be used after completion.
543    ///
544    /// # Connection errors and replay
545    ///
546    /// If the Valkey connection drops after the Lua commit but before the
547    /// client reads the response, retrying `complete()` with the same
548    /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
549    /// reconciles the "already terminal with matching outcome" response
550    /// into `Ok(())`. A retry after a different terminal op has raced in
551    /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
552    /// populated `terminal_outcome` so the caller can see what actually
553    /// happened.
554    pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
555        // RFC-012 Stage 1b: forwards through `backend.complete`.
556        // The FCALL body + the `ExecutionNotActive` replay reconciliation
557        // (match same epoch + attempt_id + outcome=="success" → Ok)
558        // moved to `ff_backend_valkey::complete_impl`.
559        let handle = self.synth_handle();
560        let out = self.backend.complete(&handle, result_payload).await;
561        if fcall_landed(&out) {
562            self.stop_renewal();
563        }
564        out.map_err(SdkError::from)
565    }
566
567    /// Fail the execution with a reason and error category.
568    ///
569    /// If the execution policy allows retries, the engine schedules a retry
570    /// (delayed backoff). Otherwise, the execution becomes terminal failed.
571    /// Returns [`FailOutcome`] so the caller knows what happened.
572    ///
573    /// # Connection errors and replay
574    ///
575    /// `fail()` is replay-safe under the same conditions as `complete()`:
576    /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
577    /// reconciled into the outcome the server actually committed
578    /// (`TerminalFailed` if no retries left; `RetryScheduled` with
579    /// `delay_until = 0` if a retry was scheduled — the exact delay is
580    /// not recovered on the replay path).
581    pub async fn fail(
582        self,
583        reason: &str,
584        error_category: &str,
585    ) -> Result<FailOutcome, SdkError> {
586        // RFC-012 Stage 1b: forwards through `backend.fail`.
587        // The FCALL body + retry-policy read + the two-shape replay
588        // reconciliation (terminal/failed → TerminalFailed, runnable
589        // → RetryScheduled{0}) moved to
590        // `ff_backend_valkey::fail_impl`.
591        //
592        // **Preserving the caller's raw category string.** The
593        // pre-Stage-1b code passed `error_category` straight to the
594        // Lua `failure_category` ARGV. Real callers use arbitrary
595        // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
596        // `"inference_error"`, …) that do not correspond to the 5
597        // named `FailureClass` variants. A lossy enum conversion
598        // would rewrite every non-canonical category to
599        // `Transient` — a real behavior change for
600        // ff-readiness-tests + the examples crates.
601        //
602        // Instead: pack the raw category bytes into
603        // `FailureReason.detail`; `fail_impl` reads them back and
604        // threads them to Lua verbatim. The `classification` enum
605        // is derived from the string for the few consumers who
606        // match on the trait return today (none in-workspace).
607        // Stage 1d (or issue #117) widens `FailureClass` with a
608        // `Custom(String)` arm; this stash carrier retires then.
609        let handle = self.synth_handle();
610        let failure_reason = ff_core::backend::FailureReason::with_detail(
611            reason.to_owned(),
612            error_category.as_bytes().to_vec(),
613        );
614        let classification = error_category_to_class(error_category);
615        let out = self.backend.fail(&handle, failure_reason, classification).await;
616        if fcall_landed(&out) {
617            self.stop_renewal();
618        }
619        out.map_err(SdkError::from)
620    }
621
622    /// Cancel the execution.
623    ///
624    /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
625    /// Consumes self.
626    ///
627    /// # Connection errors and replay
628    ///
629    /// `cancel()` is replay-safe under the same conditions as `complete()`:
630    /// a retry by the same caller matching `lease_epoch` + `attempt_id`
631    /// returns `Ok(())` if the server's stored `terminal_outcome` is
632    /// `cancelled`. A retry that finds a different outcome (because a
633    /// concurrent `complete()` or `fail()` won the race) surfaces as
634    /// `ExecutionNotActive` with the populated `terminal_outcome` so the
635    /// caller can see that the cancel intent was NOT honored.
636    pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
637        // RFC-012 Stage 1b: forwards through `backend.cancel`.
638        // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
639        // and the `ExecutionNotActive` → outcome=="cancelled" replay
640        // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
641        let handle = self.synth_handle();
642        let out = self.backend.cancel(&handle, reason).await;
643        if fcall_landed(&out) {
644            self.stop_renewal();
645        }
646        out.map_err(SdkError::from)
647    }
648
649    // ── Non-terminal operations ──
650
651    /// Manually renew the lease.
652    ///
653    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
654    /// trait. The background renewal task calls
655    /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
656    /// this method is the public entry point for workers that want
657    /// to force a renew out-of-band.
658    pub async fn renew_lease(&self) -> Result<(), SdkError> {
659        let handle = self.synth_handle();
660        self.backend
661            .renew(&handle)
662            .await
663            .map(|_renewal| ())
664            .map_err(SdkError::from)
665    }
666
667    /// Update progress (pct 0-100 and optional message).
668    ///
669    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
670    /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
671    pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
672        let handle = self.synth_handle();
673        self.backend
674            .progress(&handle, Some(pct), Some(message.to_owned()))
675            .await
676            .map_err(SdkError::from)
677    }
678
679    /// Report usage against a budget and check limits.
680    ///
681    /// Non-consuming — the worker can report usage multiple times.
682    /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
683    /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
684    ///
685    /// **RFC-012 §R7.2.3:** forwards through
686    /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
687    /// The trait's `UsageDimensions.dedup_key` carries the raw key;
688    /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
689    /// wrap so the dedup state co-locates with the budget partition
690    /// (PR #108).
691    pub async fn report_usage(
692        &self,
693        budget_id: &BudgetId,
694        dimensions: &[(&str, u64)],
695        dedup_key: Option<&str>,
696    ) -> Result<ReportUsageResult, SdkError> {
697        let handle = self.synth_handle();
698        let mut dims = ff_core::backend::UsageDimensions::default();
699        for (name, delta) in dimensions {
700            dims.custom.insert((*name).to_owned(), *delta);
701        }
702        dims.dedup_key = dedup_key
703            .filter(|k| !k.is_empty())
704            .map(|k| k.to_owned());
705        self.backend
706            .report_usage(&handle, budget_id, dims)
707            .await
708            .map_err(SdkError::from)
709    }
710
711    /// Create a pending waitpoint for future signal delivery.
712    ///
713    /// Non-consuming — the worker keeps the lease. Signals delivered to the
714    /// waitpoint are buffered. When the worker later calls `suspend()` with
715    /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
716    /// resume condition.
717    ///
718    /// Returns both the waitpoint_id AND the HMAC token required by external
719    /// callers to buffer signals against this pending waitpoint
720    /// (RFC-004 §Waitpoint Security).
721    ///
722    /// **RFC-012 §R7.2.2:** forwards through
723    /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
724    /// The trait returns
725    /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
726    /// `hmac_token` is the same wire HMAC this method has always
727    /// produced; the SDK unwraps it back to the historical
728    /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
729    pub async fn create_pending_waitpoint(
730        &self,
731        waitpoint_key: &str,
732        expires_in_ms: u64,
733    ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
734        let handle = self.synth_handle();
735        let expires_in = std::time::Duration::from_millis(expires_in_ms);
736        let pending = self
737            .backend
738            .create_waitpoint(&handle, waitpoint_key, expires_in)
739            .await
740            .map_err(SdkError::from)?;
741        // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
742        // `.token()` accessor borrows the underlying
743        // `WaitpointToken`, which is the caller's historical return
744        // shape.
745        Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
746    }
747
748    // ── Phase 4: Streaming ──
749
750    /// Append a frame to the current attempt's output stream.
751    ///
752    /// Non-consuming — the worker can append many frames during execution.
753    /// The stream is created lazily on the first append.
754    ///
755    /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
756    /// `EngineBackend` trait. The free-form `frame_type` tag and
757    /// optional `metadata` (wire `correlation_id`) travel on the
758    /// extended [`ff_core::backend::Frame`] shape (`frame_type:
759    /// String`, `correlation_id: Option<String>`), giving byte-for-byte
760    /// wire parity with the pre-migration direct-FCALL path.
761    pub async fn append_frame(
762        &self,
763        frame_type: &str,
764        payload: &[u8],
765        metadata: Option<&str>,
766    ) -> Result<AppendFrameOutcome, SdkError> {
767        let handle = self.synth_handle();
768        let mut frame = ff_core::backend::Frame::new(
769            payload.to_vec(),
770            ff_core::backend::FrameKind::Event,
771        )
772        .with_frame_type(frame_type);
773        if let Some(cid) = metadata {
774            frame = frame.with_correlation_id(cid);
775        }
776        self.backend
777            .append_frame(&handle, frame)
778            .await
779            .map_err(SdkError::from)
780    }
781
782    // ── Phase 3: Suspend ──
783
784    /// Suspend the execution, releasing the lease and creating a waitpoint.
785    ///
786    /// The execution transitions to `suspended` and the worker loses ownership.
787    /// An external signal matching the condition will resume the execution.
788    ///
789    /// If `condition_matchers` is empty, a wildcard matcher is created that
790    /// matches ANY signal name. To require an explicit operator resume with
791    /// no signal match, pass a sentinel name that no real signal will use.
792    ///
793    /// If buffered signals on a pending waitpoint already satisfy the condition,
794    /// returns `AlreadySatisfied` and the lease is NOT released.
795    ///
796    /// Consumes self — the task cannot be used after suspension.
797    // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
798    // method `suspend` returns `Handle` (a resumed-kind attempt
799    // cookie); this SDK method returns `SuspendOutcome { suspension_id,
800    // waitpoint_id, waitpoint_key, waitpoint_token }` — semantically
801    // distinct return shapes. Also: SDK takes `&[ConditionMatcher]` +
802    // `TimeoutBehavior` with no `WaitpointSpec` mapping. Tracked in #117.
803    pub async fn suspend(
804        self,
805        reason_code: &str,
806        condition_matchers: &[ConditionMatcher],
807        timeout_ms: Option<u64>,
808        timeout_behavior: TimeoutBehavior,
809    ) -> Result<SuspendOutcome, SdkError> {
810        let partition = execution_partition(&self.execution_id, &self.partition_config);
811        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
812        let idx = IndexKeys::new(&partition);
813
814        let suspension_id = SuspensionId::new();
815        let waitpoint_id = WaitpointId::new();
816        // For now, use a simple opaque key (real implementation would use HMAC token)
817        let waitpoint_key = format!("wpk:{}", waitpoint_id);
818
819        let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
820
821        // Build resume condition JSON
822        let required_signal_names: Vec<&str> = condition_matchers
823            .iter()
824            .map(|m| m.signal_name.as_str())
825            .collect();
826        let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
827        let resume_condition_json = serde_json::json!({
828            "condition_type": "signal_set",
829            "required_signal_names": required_signal_names,
830            "signal_match_mode": match_mode,
831            "minimum_signal_count": 1,
832            "timeout_behavior": timeout_behavior.as_str(),
833            "allow_operator_override": true,
834        }).to_string();
835
836        let resume_policy_json = serde_json::json!({
837            "resume_target": "runnable",
838            "close_waitpoint_on_resume": true,
839            "consume_matched_signals": true,
840            "retain_signal_buffer_until_closed": true,
841        }).to_string();
842
843        // KEYS (17): exec_core, attempt_record, lease_current, lease_history,
844        //            lease_expiry, worker_leases, suspension_current, waitpoint_hash,
845        //            waitpoint_signals, suspension_timeout, pending_wp_expiry,
846        //            active_index, suspended_index, waitpoint_history, wp_condition,
847        //            attempt_timeout, hmac_secrets
848        let keys: Vec<String> = vec![
849            ctx.core(),                                          // 1
850            ctx.attempt_hash(self.attempt_index),                // 2
851            ctx.lease_current(),                                 // 3
852            ctx.lease_history(),                                 // 4
853            idx.lease_expiry(),                                  // 5
854            idx.worker_leases(&self.worker_instance_id),         // 6
855            ctx.suspension_current(),                            // 7
856            ctx.waitpoint(&waitpoint_id),                        // 8
857            ctx.waitpoint_signals(&waitpoint_id),                // 9
858            idx.suspension_timeout(),                            // 10
859            idx.pending_waitpoint_expiry(),                      // 11
860            idx.lane_active(&self.lane_id),                      // 12
861            idx.lane_suspended(&self.lane_id),                   // 13
862            ctx.waitpoints(),                                    // 14
863            ctx.waitpoint_condition(&waitpoint_id),              // 15
864            idx.attempt_timeout(),                               // 16
865            idx.waitpoint_hmac_secrets(),                        // 17
866        ];
867
868        // ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
869        //            lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
870        //            reason_code, requested_by, timeout_at, resume_condition_json,
871        //            resume_policy_json, continuation_metadata_pointer,
872        //            use_pending_waitpoint, timeout_behavior, lease_history_maxlen
873        let args: Vec<String> = vec![
874            self.execution_id.to_string(),                          // 1
875            self.attempt_index.to_string(),                         // 2
876            self.attempt_id.to_string(),                            // 3
877            self.lease_id.to_string(),                              // 4
878            self.lease_epoch.to_string(),                           // 5
879            suspension_id.to_string(),                              // 6
880            waitpoint_id.to_string(),                               // 7
881            waitpoint_key.clone(),                                  // 8
882            reason_code.to_owned(),                                 // 9
883            "worker".to_owned(),                                    // 10
884            timeout_at.map_or(String::new(), |t| t.to_string()),   // 11
885            resume_condition_json,                                  // 12
886            resume_policy_json,                                     // 13
887            String::new(),                                          // 14 continuation_metadata_ptr
888            String::new(),                                          // 15 use_pending_waitpoint
889            timeout_behavior.as_str().to_owned(),                   // 16
890            "1000".to_owned(),                                      // 17 lease_history_maxlen
891        ];
892
893        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
894        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
895
896        let raw: Value = self
897            .client
898            .fcall("ff_suspend_execution", &key_refs, &arg_refs)
899            .await
900            .map_err(SdkError::from)?;
901
902        self.stop_renewal();
903        parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
904    }
905
906    /// Read the signals that satisfied the waitpoint and triggered this
907    /// resume.
908    ///
909    /// Non-consuming. Intended to be called immediately after re-claim via
910    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
911    /// subsequent `suspend()` (which replaces `suspension:current`).
912    ///
913    /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
914    ///
915    /// - No prior suspension on this execution.
916    /// - The prior suspension belonged to an earlier attempt (e.g. the
917    ///   attempt was cancelled/failed and a retry is now claiming).
918    /// - The prior suspension was closed by timeout / cancel / operator
919    ///   override rather than by a matched signal.
920    ///
921    /// Reads `suspension:current` once, filters by `attempt_index` to
922    /// guard against stale prior-attempt records, then fetches the matched
923    /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
924    /// fields and reads each signal's metadata + payload directly.
925    pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
926        // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
927        // Pre-migration body (HGETALL suspension_current + HMGET
928        // matchers + pipelined HGETALL signal_hash / GET
929        // signal_payload) lives in
930        // `ff_backend_valkey::observe_signals_impl`.
931        let handle = self.synth_handle();
932        self.backend
933            .observe_signals(&handle)
934            .await
935            .map_err(SdkError::from)
936    }
937
938    /// Signal the renewal task to stop. Called by every terminal op
939    /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
940    /// `move_to_waiting_children`) after the FCALL returns. Also marks
941    /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
942    /// consumption from a genuine drop-without-terminal-op.
943    fn stop_renewal(&self) {
944        self.terminal_op_called.store(true, Ordering::Release);
945        self.renewal_stop.notify_one();
946    }
947
948}
949
950/// True iff the backend's FCALL result represents a round-trip that
951/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
952/// contention, conflict, state, bug) all count as "landed" — the
953/// server either committed or rejected with a typed response. Only
954/// raw `Transport` errors (connection drops, request timeouts, parse
955/// failures) count as "did not land", which matches the pre-Stage-1b
956/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
957/// — those errors returned before `stop_renewal()` ran, preserving
958/// the `Drop` warning for genuine "lease will leak" cases.
959///
960/// Stage 1b terminal-op forwarders use this predicate to decide
961/// whether to call `stop_renewal()`: yes for landed responses, no
962/// for transport errors so the caller's retry path still sees a
963/// running renewal task.
964fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
965    match r {
966        Ok(_) => true,
967        Err(crate::EngineError::Transport { .. }) => false,
968        Err(_) => true,
969    }
970}
971
972/// Map the SDK's free-form `error_category: &str` to the typed
973/// `FailureClass` the trait's `fail` method takes. Unknown categories
974/// fall through to `Transient` — the Lua side already tolerated
975/// arbitrary strings, so the worst a category drift does under the
976/// Stage 1b forwarder is reclassify a novel category as transient.
977/// Stage 1d (or issue #117) widens `FailureClass` with a
978/// `Custom(String)` arm for exact round-trip.
979fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
980    use ff_core::backend::FailureClass;
981    match s {
982        "transient" => FailureClass::Transient,
983        "permanent" => FailureClass::Permanent,
984        "infra_crash" => FailureClass::InfraCrash,
985        "timeout" => FailureClass::Timeout,
986        "cancelled" => FailureClass::Cancelled,
987        _ => FailureClass::Transient,
988    }
989}
990
991impl Drop for ClaimedTask {
992    fn drop(&mut self) {
993        // Abort the background renewal task on drop.
994        // This is a safety net — complete/fail/cancel already stop renewal
995        // via notify before consuming self. But if the task is dropped
996        // without being consumed (e.g., panic), abort prevents leaked renewals.
997        //
998        // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
999        // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1000        // and then self is consumed into Drop immediately. The renewal task
1001        // has not yet been polled by the runtime, so `is_finished()` is still
1002        // `false` here — which previously fired the warning on every
1003        // complete/fail/cancel/suspend call. `terminal_op_called` is the
1004        // authoritative signal that a terminal-op path ran to the point of
1005        // stopping renewal; it does not by itself certify the Lua side
1006        // succeeded (see the field doc). The caller surfaces any error via
1007        // the op's return value, so a `Drop` warning is unneeded there.
1008        if !self.terminal_op_called.load(Ordering::Acquire) {
1009            tracing::warn!(
1010                execution_id = %self.execution_id,
1011                "ClaimedTask dropped without terminal operation — lease will expire"
1012            );
1013        }
1014        self.renewal_handle.abort();
1015    }
1016}
1017
1018// ── Lease renewal ──
1019
1020/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1021/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1022/// hooks still see one span per renewal (restores PR #119 Cursor
1023/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1024/// construction time, not per-tick).
1025///
1026/// See `benches/harness/src/bin/long_running.rs` for the bench
1027/// consumer that depends on this span naming.
1028#[tracing::instrument(
1029    name = "renew_lease",
1030    skip_all,
1031    fields(execution_id = %execution_id)
1032)]
1033async fn renew_once(
1034    backend: &dyn EngineBackend,
1035    handle: &Handle,
1036    execution_id: &ExecutionId,
1037) -> Result<(), crate::EngineError> {
1038    backend.renew(handle).await.map(|_| ())
1039}
1040
1041/// Spawn a background tokio task that renews the lease at `ttl / 3`
1042/// intervals.
1043///
1044/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1045/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1046/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1047/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1048/// + the encoded `Handle` instead. Per-tick tracing lives on
1049///   [`renew_once`]; this function itself is sync + one-shot.
1050///
1051/// Stops when:
1052/// - `stop_signal` is notified (complete/fail/cancel called)
1053/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1054/// - The task handle is aborted (ClaimedTask dropped)
1055fn spawn_renewal_task(
1056    backend: Arc<dyn EngineBackend>,
1057    handle: Handle,
1058    execution_id: ExecutionId,
1059    lease_ttl_ms: u64,
1060    stop_signal: Arc<Notify>,
1061    failure_counter: Arc<AtomicU32>,
1062) -> JoinHandle<()> {
1063    // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1064    // panics if a caller (or a misconfigured test) passes a
1065    // lease_ttl_ms < 3. The SDK config validator already enforces
1066    // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1067    // is a cheap belt-and-suspenders (Copilot review finding on
1068    // PR #119).
1069    let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1070
1071    tokio::spawn(async move {
1072        let mut tick = tokio::time::interval(interval);
1073        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1074        // Skip the first immediate tick — the lease was just acquired.
1075        tick.tick().await;
1076
1077        loop {
1078            tokio::select! {
1079                _ = stop_signal.notified() => {
1080                    tracing::debug!(
1081                        execution_id = %execution_id,
1082                        "lease renewal stopped by signal"
1083                    );
1084                    return;
1085                }
1086                _ = tick.tick() => {
1087                    match renew_once(backend.as_ref(), &handle, &execution_id).await {
1088                        Ok(_renewal) => {
1089                            failure_counter.store(0, Ordering::Relaxed);
1090                            tracing::trace!(
1091                                execution_id = %execution_id,
1092                                "lease renewed"
1093                            );
1094                        }
1095                        Err(e) if is_terminal_renewal_error(&e) => {
1096                            failure_counter.fetch_add(1, Ordering::Relaxed);
1097                            tracing::warn!(
1098                                execution_id = %execution_id,
1099                                error = %e,
1100                                "lease renewal failed with terminal error, stopping renewal"
1101                            );
1102                            return;
1103                        }
1104                        Err(e) => {
1105                            let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1106                            tracing::warn!(
1107                                execution_id = %execution_id,
1108                                error = %e,
1109                                consecutive_failures = count,
1110                                "lease renewal failed (will retry next interval)"
1111                            );
1112                        }
1113                    }
1114                }
1115            }
1116        }
1117    })
1118}
1119
1120/// Check if an engine error means renewal should stop permanently.
1121#[allow(dead_code)]
1122fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1123    use crate::{ContentionKind, EngineError, StateKind};
1124    matches!(
1125        err,
1126        EngineError::State(
1127            StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1128        ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1129            | EngineError::NotFound { entity: "execution" }
1130    )
1131}
1132
1133// ── FCALL result parsing ──
1134
1135/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1136/// function into a typed [`ReportUsageResult`].
1137///
1138/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1139///                  `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1140/// Status code `!= 1` is parsed as a [`ScriptError`] via
1141/// [`ScriptError::from_code_with_detail`].
1142///
1143/// Exposed as `pub` so downstream SDKs that speak the same wire format
1144/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1145/// call this directly instead of re-implementing the parse. Keeping one
1146/// parser paired with the producer (the Lua function registered at
1147/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1148/// against silent format drift between producer and consumer.
1149pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1150    let arr = match raw {
1151        Value::Array(arr) => arr,
1152        _ => {
1153            return Err(SdkError::from(ScriptError::Parse {
1154                fcall: "parse_report_usage_result".into(),
1155                execution_id: None,
1156                message: "ff_report_usage_and_check: expected Array".into(),
1157            }));
1158        }
1159    };
1160    let status_code = match arr.first() {
1161        Some(Ok(Value::Int(n))) => *n,
1162        _ => {
1163            return Err(SdkError::from(ScriptError::Parse {
1164                fcall: "parse_report_usage_result".into(),
1165                execution_id: None,
1166                message: "ff_report_usage_and_check: expected Int status code".into(),
1167            }));
1168        }
1169    };
1170    if status_code != 1 {
1171        let error_code = usage_field_str(arr, 1);
1172        let detail = usage_field_str(arr, 2);
1173        return Err(SdkError::from(
1174            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1175                ScriptError::Parse {
1176                    fcall: "parse_report_usage_result".into(),
1177                    execution_id: None,
1178                    message: format!("ff_report_usage_and_check: {error_code}"),
1179                }
1180            }),
1181        ));
1182    }
1183    let sub_status = usage_field_str(arr, 1);
1184    match sub_status.as_str() {
1185        "OK" => Ok(ReportUsageResult::Ok),
1186        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1187        "SOFT_BREACH" => {
1188            let dim = usage_field_str(arr, 2);
1189            let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1190            let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1191            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1192        }
1193        "HARD_BREACH" => {
1194            let dim = usage_field_str(arr, 2);
1195            let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1196            let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1197            Ok(ReportUsageResult::HardBreach {
1198                dimension: dim,
1199                current_usage: current,
1200                hard_limit: limit,
1201            })
1202        }
1203        _ => Err(SdkError::from(ScriptError::Parse {
1204            fcall: "parse_report_usage_result".into(),
1205            execution_id: None,
1206            message: format!(
1207            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1208        ),
1209        })),
1210    }
1211}
1212
1213fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1214    match arr.get(index) {
1215        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1216        Some(Ok(Value::SimpleString(s))) => s.clone(),
1217        Some(Ok(Value::Int(n))) => n.to_string(),
1218        _ => String::new(),
1219    }
1220}
1221
1222/// Parse a required numeric usage field (u64) from the wire array at
1223/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1224/// holds a non-string/non-int value, or contains a string that does
1225/// not parse as u64.
1226///
1227/// Rationale: the Lua producer (`lua/budget.lua:99`,
1228/// `ff_report_usage_and_check`) always emits
1229/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1230/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1231/// non-numeric value here means the Lua and Rust sides drifted;
1232/// silently coercing to `0` would surface drift as "zero-usage breach"
1233/// — arithmetically correct but semantically nonsense. Fail loudly
1234/// instead so drift shows up as a parse error at the first call site.
1235fn parse_usage_u64(
1236    arr: &[Result<Value, ferriskey::Error>],
1237    index: usize,
1238    sub_status: &str,
1239    field_name: &str,
1240) -> Result<u64, SdkError> {
1241    match arr.get(index) {
1242        Some(Ok(Value::Int(n))) => {
1243            u64::try_from(*n).map_err(|_| {
1244                SdkError::from(ScriptError::Parse {
1245                    fcall: "parse_usage_u64".into(),
1246                    execution_id: None,
1247                    message: format!(
1248                    "ff_report_usage_and_check {sub_status}: {field_name} \
1249                     (index {index}) negative int {n} cannot be u64"
1250                ),
1251                })
1252            })
1253        }
1254        Some(Ok(Value::BulkString(b))) => {
1255            let s = String::from_utf8_lossy(b);
1256            s.parse::<u64>().map_err(|_| {
1257                SdkError::from(ScriptError::Parse {
1258                    fcall: "parse_usage_u64".into(),
1259                    execution_id: None,
1260                    message: format!(
1261                    "ff_report_usage_and_check {sub_status}: {field_name} \
1262                     (index {index}) not a u64 string: {s:?}"
1263                ),
1264                })
1265            })
1266        }
1267        Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1268            SdkError::from(ScriptError::Parse {
1269                fcall: "parse_usage_u64".into(),
1270                execution_id: None,
1271                message: format!(
1272                "ff_report_usage_and_check {sub_status}: {field_name} \
1273                 (index {index}) not a u64 string: {s:?}"
1274            ),
1275            })
1276        }),
1277        Some(_) => Err(SdkError::from(ScriptError::Parse {
1278            fcall: "parse_usage_u64".into(),
1279            execution_id: None,
1280            message: format!(
1281            "ff_report_usage_and_check {sub_status}: {field_name} \
1282             (index {index}) wrong wire type (expected Int or String)"
1283        ),
1284        })),
1285        None => Err(SdkError::from(ScriptError::Parse {
1286            fcall: "parse_usage_u64".into(),
1287            execution_id: None,
1288            message: format!(
1289            "ff_report_usage_and_check {sub_status}: {field_name} \
1290             (index {index}) missing from response"
1291        ),
1292        })),
1293    }
1294}
1295
1296/// Pure helper: decide whether `suspension:current` represents a
1297/// signal-driven resume for the currently-claimed attempt, and extract
1298/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1299/// (no record, stale prior-attempt, non-resumed close). Returns an error
1300/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1301/// bug rather than a missing-data case.
1302///
1303/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1304/// `EngineBackend::observe_signals`, which re-implements this invariant
1305/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1306/// tests so the parsing contract stays exercised at the SDK layer —
1307/// Stage 1d will consolidate (either promote the helper into ff-core
1308/// or drop these tests once the backend-side tests cover equivalent
1309/// ground).
1310#[allow(dead_code)]
1311fn resume_waitpoint_id_from_suspension(
1312    susp: &HashMap<String, String>,
1313    claimed_attempt: AttemptIndex,
1314) -> Result<Option<WaitpointId>, SdkError> {
1315    if susp.is_empty() {
1316        return Ok(None);
1317    }
1318    let susp_att: u32 = susp
1319        .get("attempt_index")
1320        .and_then(|s| s.parse().ok())
1321        .unwrap_or(u32::MAX);
1322    if susp_att != claimed_attempt.0 {
1323        return Ok(None);
1324    }
1325    let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1326    if close_reason != "resumed" {
1327        return Ok(None);
1328    }
1329    let wp_id_str = susp
1330        .get("waitpoint_id")
1331        .map(String::as_str)
1332        .unwrap_or_default();
1333    if wp_id_str.is_empty() {
1334        return Ok(None);
1335    }
1336    let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1337        SdkError::from(ScriptError::Parse {
1338            fcall: "resume_waitpoint_id_from_suspension".into(),
1339            execution_id: None,
1340            message: format!(
1341            "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1342        ),
1343        })
1344    })?;
1345    Ok(Some(waitpoint_id))
1346}
1347
1348pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1349    let arr = match raw {
1350        Value::Array(arr) => arr,
1351        _ => {
1352            return Err(SdkError::from(ScriptError::Parse {
1353                fcall: "parse_success_result".into(),
1354                execution_id: None,
1355                message: format!(
1356                "{function_name}: expected Array, got non-array"
1357            ),
1358            }));
1359        }
1360    };
1361
1362    if arr.is_empty() {
1363        return Err(SdkError::from(ScriptError::Parse {
1364            fcall: "parse_success_result".into(),
1365            execution_id: None,
1366            message: format!(
1367            "{function_name}: empty result array"
1368        ),
1369        }));
1370    }
1371
1372    let status_code = match arr.first() {
1373        Some(Ok(Value::Int(n))) => *n,
1374        _ => {
1375            return Err(SdkError::from(ScriptError::Parse {
1376                fcall: "parse_success_result".into(),
1377                execution_id: None,
1378                message: format!(
1379                "{function_name}: expected Int at index 0"
1380            ),
1381            }));
1382        }
1383    };
1384
1385    if status_code == 1 {
1386        Ok(())
1387    } else {
1388        // Extract error code from index 1 and optional detail from index 2
1389        // (e.g. `capability_mismatch` ships missing tokens there). Variants
1390        // that carry a String payload pick the detail up via
1391        // `from_code_with_detail`; other variants ignore it.
1392        let field_str = |idx: usize| -> String {
1393            arr.get(idx)
1394                .and_then(|v| match v {
1395                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1396                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1397                    _ => None,
1398                })
1399                .unwrap_or_default()
1400        };
1401        let error_code = {
1402            let s = field_str(1);
1403            if s.is_empty() { "unknown".to_owned() } else { s }
1404        };
1405        // Collect all detail slots (idx >= 2). Most variants only read
1406        // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1407        // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1408        // reconciliation after a network drop.
1409        let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1410        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1411
1412        let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1413            .unwrap_or_else(|| {
1414                ScriptError::Parse {
1415                    fcall: "parse_success_result".into(),
1416                    execution_id: None,
1417                    message: format!("{function_name}: unknown error: {error_code}"),
1418                }
1419            });
1420
1421        Err(SdkError::from(script_err))
1422    }
1423}
1424
1425/// Parse ff_suspend_execution result:
1426///   ok(suspension_id, waitpoint_id, waitpoint_key)
1427///   ok_already_satisfied(suspension_id, waitpoint_id, waitpoint_key)
1428fn parse_suspend_result(
1429    raw: &Value,
1430    suspension_id: SuspensionId,
1431    waitpoint_id: WaitpointId,
1432    waitpoint_key: String,
1433) -> Result<SuspendOutcome, SdkError> {
1434    let arr = match raw {
1435        Value::Array(arr) => arr,
1436        _ => {
1437            return Err(SdkError::from(ScriptError::Parse {
1438                fcall: "parse_suspend_result".into(),
1439                execution_id: None,
1440                message: "ff_suspend_execution: expected Array".into(),
1441            }));
1442        }
1443    };
1444
1445    let status_code = match arr.first() {
1446        Some(Ok(Value::Int(n))) => *n,
1447        _ => {
1448            return Err(SdkError::from(ScriptError::Parse {
1449                fcall: "parse_suspend_result".into(),
1450                execution_id: None,
1451                message: "ff_suspend_execution: bad status code".into(),
1452            }));
1453        }
1454    };
1455
1456    if status_code != 1 {
1457        let err_field_str = |idx: usize| -> String {
1458            arr.get(idx)
1459                .and_then(|v| match v {
1460                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1461                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1462                    _ => None,
1463                })
1464                .unwrap_or_default()
1465        };
1466        let error_code = {
1467            let s = err_field_str(1);
1468            if s.is_empty() { "unknown".to_owned() } else { s }
1469        };
1470        let detail = err_field_str(2);
1471        return Err(SdkError::from(
1472            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1473                ScriptError::Parse {
1474                    fcall: "parse_suspend_result".into(),
1475                    execution_id: None,
1476                    message: format!("ff_suspend_execution: {error_code}"),
1477                }
1478            }),
1479        ));
1480    }
1481
1482    // Check sub-status at index 1
1483    let sub_status = arr
1484        .get(1)
1485        .and_then(|v| match v {
1486            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1487            Ok(Value::SimpleString(s)) => Some(s.clone()),
1488            _ => None,
1489        })
1490        .unwrap_or_default();
1491
1492    // Lua returns: {1, status, suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
1493    // The suspension_id/waitpoint_id/waitpoint_key values the worker passed in are
1494    // authoritative (Lua echoes them); waitpoint_token however is MINTED by Lua and
1495    // must be read from the response.
1496    let waitpoint_token = arr
1497        .get(5)
1498        .and_then(|v| match v {
1499            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1500            Ok(Value::SimpleString(s)) => Some(s.clone()),
1501            _ => None,
1502        })
1503        .map(WaitpointToken::new)
1504        .ok_or_else(|| {
1505            SdkError::from(ScriptError::Parse {
1506                fcall: "parse_suspend_result".into(),
1507                execution_id: None,
1508                message: "ff_suspend_execution: missing waitpoint_token in response".into(),
1509            })
1510        })?;
1511
1512    if sub_status == "ALREADY_SATISFIED" {
1513        Ok(SuspendOutcome::AlreadySatisfied {
1514            suspension_id,
1515            waitpoint_id,
1516            waitpoint_key,
1517            waitpoint_token,
1518        })
1519    } else {
1520        Ok(SuspendOutcome::Suspended {
1521            suspension_id,
1522            waitpoint_id,
1523            waitpoint_key,
1524            waitpoint_token,
1525        })
1526    }
1527}
1528
1529/// Parse ff_deliver_signal result:
1530///   ok(signal_id, effect)
1531///   ok_duplicate(existing_signal_id)
1532pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1533    let arr = match raw {
1534        Value::Array(arr) => arr,
1535        _ => {
1536            return Err(SdkError::from(ScriptError::Parse {
1537                fcall: "parse_signal_result".into(),
1538                execution_id: None,
1539                message: "ff_deliver_signal: expected Array".into(),
1540            }));
1541        }
1542    };
1543
1544    let status_code = match arr.first() {
1545        Some(Ok(Value::Int(n))) => *n,
1546        _ => {
1547            return Err(SdkError::from(ScriptError::Parse {
1548                fcall: "parse_signal_result".into(),
1549                execution_id: None,
1550                message: "ff_deliver_signal: bad status code".into(),
1551            }));
1552        }
1553    };
1554
1555    if status_code != 1 {
1556        let err_field_str = |idx: usize| -> String {
1557            arr.get(idx)
1558                .and_then(|v| match v {
1559                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1560                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1561                    _ => None,
1562                })
1563                .unwrap_or_default()
1564        };
1565        let error_code = {
1566            let s = err_field_str(1);
1567            if s.is_empty() { "unknown".to_owned() } else { s }
1568        };
1569        let detail = err_field_str(2);
1570        return Err(SdkError::from(
1571            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1572                ScriptError::Parse {
1573                    fcall: "parse_signal_result".into(),
1574                    execution_id: None,
1575                    message: format!("ff_deliver_signal: {error_code}"),
1576                }
1577            }),
1578        ));
1579    }
1580
1581    let sub_status = arr
1582        .get(1)
1583        .and_then(|v| match v {
1584            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1585            Ok(Value::SimpleString(s)) => Some(s.clone()),
1586            _ => None,
1587        })
1588        .unwrap_or_default();
1589
1590    if sub_status == "DUPLICATE" {
1591        let existing_id = arr
1592            .get(2)
1593            .and_then(|v| match v {
1594                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1595                Ok(Value::SimpleString(s)) => Some(s.clone()),
1596                _ => None,
1597            })
1598            .unwrap_or_default();
1599        return Ok(SignalOutcome::Duplicate {
1600            existing_signal_id: existing_id,
1601        });
1602    }
1603
1604    // Parse: {1, "OK", signal_id, effect}
1605    let signal_id_str = arr
1606        .get(2)
1607        .and_then(|v| match v {
1608            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1609            Ok(Value::SimpleString(s)) => Some(s.clone()),
1610            _ => None,
1611        })
1612        .unwrap_or_default();
1613
1614    let effect = arr
1615        .get(3)
1616        .and_then(|v| match v {
1617            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1618            Ok(Value::SimpleString(s)) => Some(s.clone()),
1619            _ => None,
1620        })
1621        .unwrap_or_default();
1622
1623    let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1624        SdkError::from(ScriptError::Parse {
1625            fcall: "parse_signal_result".into(),
1626            execution_id: None,
1627            message: format!(
1628            "ff_deliver_signal: invalid signal_id from Lua: {e}"
1629        ),
1630        })
1631    })?;
1632
1633    if effect == "resume_condition_satisfied" {
1634        Ok(SignalOutcome::TriggeredResume { signal_id })
1635    } else {
1636        Ok(SignalOutcome::Accepted { signal_id, effect })
1637    }
1638}
1639
1640/// Parse ff_fail_execution result:
1641///   ok("retry_scheduled", delay_until)
1642///   ok("terminal_failed")
1643#[allow(dead_code)]
1644fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1645    let arr = match raw {
1646        Value::Array(arr) => arr,
1647        _ => {
1648            return Err(SdkError::from(ScriptError::Parse {
1649                fcall: "parse_fail_result".into(),
1650                execution_id: None,
1651                message: "ff_fail_execution: expected Array".into(),
1652            }));
1653        }
1654    };
1655
1656    let status_code = match arr.first() {
1657        Some(Ok(Value::Int(n))) => *n,
1658        _ => {
1659            return Err(SdkError::from(ScriptError::Parse {
1660                fcall: "parse_fail_result".into(),
1661                execution_id: None,
1662                message: "ff_fail_execution: bad status code".into(),
1663            }));
1664        }
1665    };
1666
1667    if status_code != 1 {
1668        let err_field_str = |idx: usize| -> String {
1669            arr.get(idx)
1670                .and_then(|v| match v {
1671                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1672                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1673                    _ => None,
1674                })
1675                .unwrap_or_default()
1676        };
1677        let error_code = {
1678            let s = err_field_str(1);
1679            if s.is_empty() { "unknown".to_owned() } else { s }
1680        };
1681        let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1682        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1683        return Err(SdkError::from(
1684            ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1685                ScriptError::Parse {
1686                    fcall: "parse_fail_result".into(),
1687                    execution_id: None,
1688                    message: format!("ff_fail_execution: {error_code}"),
1689                }
1690            }),
1691        ));
1692    }
1693
1694    // Parse sub-status from field[2] (index 2 = first field after status+OK)
1695    let sub_status = arr
1696        .get(2)
1697        .and_then(|v| match v {
1698            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1699            Ok(Value::SimpleString(s)) => Some(s.clone()),
1700            _ => None,
1701        })
1702        .unwrap_or_default();
1703
1704    match sub_status.as_str() {
1705        "retry_scheduled" => {
1706            // Lua returns: ok("retry_scheduled", tostring(delay_until))
1707            // arr[3] = delay_until
1708            let delay_str = arr
1709                .get(3)
1710                .and_then(|v| match v {
1711                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1712                    Ok(Value::Int(n)) => Some(n.to_string()),
1713                    _ => None,
1714                })
1715                .unwrap_or_default();
1716            let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1717
1718            Ok(FailOutcome::RetryScheduled {
1719                delay_until: TimestampMs::from_millis(delay_until),
1720            })
1721        }
1722        "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1723        _ => Err(SdkError::from(ScriptError::Parse {
1724            fcall: "parse_fail_result".into(),
1725            execution_id: None,
1726            message: format!(
1727            "ff_fail_execution: unexpected sub-status: {sub_status}"
1728        ),
1729        })),
1730    }
1731}
1732
1733// ── Stream read / tail (consumer API, RFC-006 #2) ──
1734
1735/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1736/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1737/// server would accept.
1738pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1739
1740/// Maximum frames per read/tail call. Mirrors
1741/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1742/// callers don't need to import ff-core just to read the bound.
1743pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1744
1745/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1746/// signal so polling consumers can exit cleanly.
1747///
1748/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1749pub use ff_core::contracts::StreamFrames;
1750
1751/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1752/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1753/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1754/// `StreamCursor::Start` / `StreamCursor::End` instead.
1755pub use ff_core::contracts::StreamCursor;
1756
1757/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1758/// — XREAD does not accept the open markers. Pulled out as a bare
1759/// function so unit tests can exercise the guard without constructing a
1760/// live `ferriskey::Client`.
1761fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1762    if !after.is_concrete() {
1763        return Err(SdkError::Config {
1764            context: "tail_stream".into(),
1765            field: Some("after".into()),
1766            message: "XREAD cursor must be a concrete entry id; pass \
1767                      StreamCursor::from_beginning() to start from the \
1768                      beginning"
1769                .into(),
1770        });
1771    }
1772    Ok(())
1773}
1774
1775fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1776    if count_limit == 0 {
1777        return Err(SdkError::Config {
1778            context: "read_stream_frames".into(),
1779            field: Some("count_limit".into()),
1780            message: "count_limit must be >= 1".into(),
1781        });
1782    }
1783    if count_limit > STREAM_READ_HARD_CAP {
1784        return Err(SdkError::Config {
1785            context: "read_stream_frames".into(),
1786            field: Some("count_limit".into()),
1787            message: format!(
1788                "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1789            ),
1790        });
1791    }
1792    Ok(())
1793}
1794
1795/// Read frames from a completed or in-flight attempt's stream.
1796///
1797/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1798/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1799/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1800/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1801/// `0` returns [`SdkError::Config`].
1802///
1803/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1804/// consumers know when the producer has finalized the stream. A
1805/// never-written attempt and an in-progress stream are indistinguishable
1806/// here — both present as `frames=[]`, `closed_at=None`.
1807///
1808/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1809/// client but are not the lease-holding worker — no lease check is
1810/// performed.
1811///
1812/// # Head-of-line note
1813///
1814/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1815/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1816/// calling this on a `client` that is also serving FCALLs stalls those
1817/// FCALLs behind the reply. The REST server isolates reads on its
1818/// `tail_client`; direct SDK callers should either use a dedicated
1819/// client OR paginate through smaller `count_limit` slices.
1820pub async fn read_stream(
1821    backend: &dyn EngineBackend,
1822    execution_id: &ExecutionId,
1823    attempt_index: AttemptIndex,
1824    from: StreamCursor,
1825    to: StreamCursor,
1826    count_limit: u64,
1827) -> Result<StreamFrames, SdkError> {
1828    validate_stream_read_count(count_limit)?;
1829    Ok(backend
1830        .read_stream(execution_id, attempt_index, from, to, count_limit)
1831        .await?)
1832}
1833
1834/// Tail a live attempt's stream.
1835///
1836/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1837/// with id strictly greater than `after`. Pass
1838/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1839/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1840/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1841/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1842///
1843/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1844/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1845/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1846/// SDK and REST ceilings aligned.
1847///
1848/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1849/// polling consumers should loop until `result.is_closed()` is true, then
1850/// drain and exit. Timeout with no new frames presents as
1851/// `frames=[], closed_at=None`.
1852///
1853/// # Head-of-line warning — use a dedicated client
1854///
1855/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1856/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1857/// the read side until a frame arrives or the block elapses. If the
1858/// `client` you pass here is ALSO used for claims, completes, fails,
1859/// appends, or any other FCALL, a 30-second tail will stall all those
1860/// calls for up to 30 seconds.
1861///
1862/// **Strongly recommended**: build a separate `ferriskey::Client` for
1863/// tail callers — mirrors the `Server::tail_client` split that the REST
1864/// server uses internally (see `crates/ff-server/src/server.rs` and
1865/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1866///
1867/// # Tail parallelism caveat (same mux)
1868///
1869/// Even a dedicated tail client is still one multiplexed TCP connection.
1870/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1871/// ferriskey's per-call `request_timeout` starts at future-poll — so
1872/// two concurrent tails against the same client can time out spuriously:
1873/// the second call's BLOCK budget elapses while it waits for the first
1874/// BLOCK to return. The REST server handles this internally with a
1875/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1876/// each call its full `block_ms` budget at the server.
1877///
1878/// **Direct SDK callers that need concurrent tails**: either
1879///   (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1880///       pool of clients, rotated by the caller), OR
1881///   (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1882///       only one BLOCK is in flight per client at a time.
1883/// If you need the REST-side backpressure (429 on contention) and the
1884/// built-in serializer, go through the
1885/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1886/// than calling this directly.
1887///
1888/// This SDK does not enforce either pattern — the mutex belongs at the
1889/// application layer, and the connection pool belongs at the SDK
1890/// caller's DI layer; neither has a structured place inside this
1891/// helper.
1892///
1893/// # Timeout handling
1894///
1895/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1896/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1897/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1898/// 500ms)`, overriding the client's default per-call. A tail with
1899/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1900/// the client was built with a shorter `request_timeout`. No custom client
1901/// configuration is required for timeout reasons — only for head-of-line
1902/// isolation above.
1903pub async fn tail_stream(
1904    backend: &dyn EngineBackend,
1905    execution_id: &ExecutionId,
1906    attempt_index: AttemptIndex,
1907    after: StreamCursor,
1908    block_ms: u64,
1909    count_limit: u64,
1910) -> Result<StreamFrames, SdkError> {
1911    if block_ms > MAX_TAIL_BLOCK_MS {
1912        return Err(SdkError::Config {
1913            context: "tail_stream".into(),
1914            field: Some("block_ms".into()),
1915            message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1916        });
1917    }
1918    validate_stream_read_count(count_limit)?;
1919    // XREAD does not accept `-` / `+` markers as cursors — reject at
1920    // the SDK boundary with `SdkError::Config` rather than forwarding
1921    // an invalid `-`/`+` into the backend (which would surface as an
1922    // opaque `EngineError::Transport`).
1923    validate_tail_cursor(&after)?;
1924
1925    Ok(backend
1926        .tail_stream(execution_id, attempt_index, after, block_ms, count_limit)
1927        .await?)
1928}
1929
1930#[cfg(test)]
1931mod tail_stream_boundary_tests {
1932    use super::*;
1933
1934    // `validate_tail_cursor` rejects `StreamCursor::Start` and
1935    // `StreamCursor::End` before `tail_stream` touches the client —
1936    // same shape as the `count_limit` guard above. The matching
1937    // full-path rejection on the REST layer is covered by
1938    // `ff-server::api`.
1939
1940    #[test]
1941    fn rejects_start_cursor() {
1942        let err = validate_tail_cursor(&StreamCursor::Start)
1943            .expect_err("Start must be rejected");
1944        match err {
1945            SdkError::Config { field, context, .. } => {
1946                assert_eq!(field.as_deref(), Some("after"));
1947                assert_eq!(context, "tail_stream");
1948            }
1949            other => panic!("expected SdkError::Config, got {other:?}"),
1950        }
1951    }
1952
1953    #[test]
1954    fn rejects_end_cursor() {
1955        let err = validate_tail_cursor(&StreamCursor::End)
1956            .expect_err("End must be rejected");
1957        assert!(matches!(err, SdkError::Config { .. }));
1958    }
1959
1960    #[test]
1961    fn accepts_at_cursor() {
1962        validate_tail_cursor(&StreamCursor::At("0-0".into()))
1963            .expect("At cursor must be accepted");
1964        validate_tail_cursor(&StreamCursor::from_beginning())
1965            .expect("from_beginning() must be accepted");
1966        validate_tail_cursor(&StreamCursor::At("123-0".into()))
1967            .expect("concrete id must be accepted");
1968    }
1969}
1970
1971#[cfg(test)]
1972mod parse_report_usage_result_tests {
1973    use super::*;
1974
1975    /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
1976    /// BulkString and SimpleString uniformly (see
1977    /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
1978    /// `Value::SimpleString(s)` → clone). SimpleString avoids a
1979    /// dev-dependency on `bytes` just for test construction.
1980    fn s(v: &str) -> Result<Value, ferriskey::Error> {
1981        Ok(Value::SimpleString(v.to_owned()))
1982    }
1983
1984    fn int(n: i64) -> Result<Value, ferriskey::Error> {
1985        Ok(Value::Int(n))
1986    }
1987
1988    fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
1989        Value::Array(items)
1990    }
1991
1992    #[test]
1993    fn ok_status() {
1994        let raw = arr(vec![int(1), s("OK")]);
1995        assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
1996    }
1997
1998    #[test]
1999    fn already_applied_status() {
2000        let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2001        assert_eq!(
2002            parse_report_usage_result(&raw).unwrap(),
2003            ReportUsageResult::AlreadyApplied
2004        );
2005    }
2006
2007    #[test]
2008    fn soft_breach_status() {
2009        let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2010        match parse_report_usage_result(&raw).unwrap() {
2011            ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2012                assert_eq!(dimension, "tokens");
2013                assert_eq!(current_usage, 150);
2014                assert_eq!(soft_limit, 100);
2015            }
2016            other => panic!("expected SoftBreach, got {other:?}"),
2017        }
2018    }
2019
2020    #[test]
2021    fn hard_breach_status() {
2022        let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2023        match parse_report_usage_result(&raw).unwrap() {
2024            ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2025                assert_eq!(dimension, "requests");
2026                assert_eq!(current_usage, 10001);
2027                assert_eq!(hard_limit, 10000);
2028            }
2029            other => panic!("expected HardBreach, got {other:?}"),
2030        }
2031    }
2032
2033    /// Negative case: non-Array input. Guards against a future Lua refactor
2034    /// that accidentally returns a bare string/int — the parser must fail
2035    /// loudly rather than silently succeed or panic.
2036    #[test]
2037    fn non_array_input_is_parse_error() {
2038        let raw = Value::SimpleString("OK".to_owned());
2039        let err = parse_report_usage_result(&raw).unwrap_err();
2040        let msg = format!("{err}");
2041        assert!(
2042            msg.to_lowercase().contains("expected array"),
2043            "error should mention expected shape, got: {msg}"
2044        );
2045    }
2046
2047    /// Negative case: Array whose first element isn't an Int status code.
2048    /// The Lua function's first return slot is always `status_code` (1 on
2049    /// success, an error code otherwise); a non-Int there is a wire-format
2050    /// break that must surface as a parse error.
2051    #[test]
2052    fn first_element_non_int_is_parse_error() {
2053        let raw = arr(vec![s("not_an_int"), s("OK")]);
2054        let err = parse_report_usage_result(&raw).unwrap_err();
2055        let msg = format!("{err}");
2056        assert!(
2057            msg.to_lowercase().contains("int"),
2058            "error should mention Int status code, got: {msg}"
2059        );
2060    }
2061
2062    /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2063    /// field. Guards against the silent-coercion defect cross-review
2064    /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2065    /// which would have surfaced Lua-side wire-format drift as a
2066    /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2067    /// but semantically wrong (a "breach with zero usage" is nonsense
2068    /// and masks the real error).
2069    #[test]
2070    fn soft_breach_non_numeric_current_is_parse_error() {
2071        let raw = arr(vec![
2072            int(1),
2073            s("SOFT_BREACH"),
2074            s("tokens"),
2075            s("not_a_number"), // current_usage — must fail, not coerce to 0
2076            s("100"),
2077        ]);
2078        let err = parse_report_usage_result(&raw).unwrap_err();
2079        let msg = format!("{err}");
2080        assert!(
2081            msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2082            "error should identify sub-status + field, got: {msg}"
2083        );
2084        assert!(
2085            msg.to_lowercase().contains("u64"),
2086            "error should mention the expected type (u64), got: {msg}"
2087        );
2088    }
2089
2090    /// Negative case: HARD_BREACH with the limit slot missing
2091    /// entirely. Same defence as the non-numeric test above: a
2092    /// truncated response must fail loudly rather than coerce to 0.
2093    #[test]
2094    fn hard_breach_missing_limit_is_parse_error() {
2095        let raw = arr(vec![
2096            int(1),
2097            s("HARD_BREACH"),
2098            s("requests"),
2099            s("10001"),
2100            // no index 4 — hard_limit missing
2101        ]);
2102        let err = parse_report_usage_result(&raw).unwrap_err();
2103        let msg = format!("{err}");
2104        assert!(
2105            msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2106            "error should identify sub-status + field, got: {msg}"
2107        );
2108        assert!(
2109            msg.to_lowercase().contains("missing"),
2110            "error should say 'missing', got: {msg}"
2111        );
2112    }
2113}
2114
2115
2116#[cfg(test)]
2117mod resume_signals_tests {
2118    use super::*;
2119
2120    fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2121        pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2122    }
2123
2124    #[test]
2125    fn empty_suspension_returns_none() {
2126        let susp = m(&[]);
2127        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2128        assert!(out.is_none(), "no suspension record → None");
2129    }
2130
2131    #[test]
2132    fn stale_prior_attempt_returns_none() {
2133        let wp = WaitpointId::new();
2134        let susp = m(&[
2135            ("attempt_index", "0"),
2136            ("close_reason", "resumed"),
2137            ("waitpoint_id", &wp.to_string()),
2138        ]);
2139        // Claimed attempt is 1; suspension belongs to 0 → stale.
2140        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2141        assert!(out.is_none(), "attempt_index mismatch → None");
2142    }
2143
2144    #[test]
2145    fn non_resumed_close_returns_none() {
2146        let wp = WaitpointId::new();
2147        for reason in ["timeout", "cancelled", "", "expired"] {
2148            let susp = m(&[
2149                ("attempt_index", "0"),
2150                ("close_reason", reason),
2151                ("waitpoint_id", &wp.to_string()),
2152            ]);
2153            let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2154            assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2155        }
2156    }
2157
2158    #[test]
2159    fn resumed_same_attempt_returns_waitpoint() {
2160        let wp = WaitpointId::new();
2161        let susp = m(&[
2162            ("attempt_index", "2"),
2163            ("close_reason", "resumed"),
2164            ("waitpoint_id", &wp.to_string()),
2165        ]);
2166        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2167        assert_eq!(out, Some(wp));
2168    }
2169
2170    #[test]
2171    fn malformed_waitpoint_id_is_error() {
2172        let susp = m(&[
2173            ("attempt_index", "0"),
2174            ("close_reason", "resumed"),
2175            ("waitpoint_id", "not-a-uuid"),
2176        ]);
2177        let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2178        assert!(
2179            format!("{err}").contains("not a valid UUID"),
2180            "error should mention invalid UUID, got: {err}"
2181        );
2182    }
2183
2184    #[test]
2185    fn empty_waitpoint_id_returns_none() {
2186        // Defensive: an empty waitpoint_id field (shouldn't happen on
2187        // resumed records, but guard against partial writes) is None, not an error.
2188        let susp = m(&[
2189            ("attempt_index", "0"),
2190            ("close_reason", "resumed"),
2191            ("waitpoint_id", ""),
2192        ]);
2193        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2194        assert!(out.is_none());
2195    }
2196
2197    // The previous `matched_signal_ids_from_condition` helper was
2198    // removed when the production path switched from unbounded
2199    // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2200    // feedback on unbounded condition-hash reply size). That loop
2201    // is exercised by the integration tests in `ff-test`.
2202}
2203
2204#[cfg(test)]
2205mod terminal_replay_parsing_tests {
2206    //! Unit tests for the SDK's parse path of the enriched
2207    //! `execution_not_active` error returned on a terminal-op replay.
2208    //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2209    //! the Lua side emits the 4-slot detail; these tests prove the
2210    //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2211    //! variant so the reconciler in complete()/fail()/cancel() can
2212    //! match on them.
2213
2214    use super::*;
2215    use ferriskey::Value;
2216
2217    // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2218    // in the test harness. parse_success_result + parse_fail_result handle both.
2219    fn bulk(s: &str) -> Value {
2220        Value::SimpleString(s.to_owned())
2221    }
2222
2223    /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2224    #[test]
2225    fn parse_success_result_extracts_all_four_detail_slots() {
2226        let raw = Value::Array(vec![
2227            Ok(Value::Int(0)),
2228            Ok(bulk("execution_not_active")),
2229            Ok(bulk("success")),
2230            Ok(bulk("42")),
2231            Ok(bulk("terminal")),
2232            Ok(bulk("11111111-1111-1111-1111-111111111111")),
2233        ]);
2234        let err = parse_success_result(&raw, "test").unwrap_err();
2235        let unboxed = match err {
2236            SdkError::Engine(b) => *b,
2237            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2238        };
2239        match unboxed {
2240            crate::EngineError::Contention(
2241                crate::ContentionKind::ExecutionNotActive {
2242                    terminal_outcome,
2243                    lease_epoch,
2244                    lifecycle_phase,
2245                    attempt_id,
2246                },
2247            ) => {
2248                assert_eq!(terminal_outcome, "success");
2249                assert_eq!(lease_epoch, "42");
2250                assert_eq!(lifecycle_phase, "terminal");
2251                assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2252            }
2253            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2254        }
2255    }
2256
2257    /// parse_fail_result must extract all detail slots too so the
2258    /// reconciler in fail() can match lifecycle_phase = "runnable" for
2259    /// retry-scheduled replays.
2260    #[test]
2261    fn parse_fail_result_extracts_all_four_detail_slots() {
2262        let raw = Value::Array(vec![
2263            Ok(Value::Int(0)),
2264            Ok(bulk("execution_not_active")),
2265            Ok(bulk("none")),
2266            Ok(bulk("7")),
2267            Ok(bulk("runnable")),
2268            Ok(bulk("22222222-2222-2222-2222-222222222222")),
2269        ]);
2270        let err = parse_fail_result(&raw).unwrap_err();
2271        let unboxed = match err {
2272            SdkError::Engine(b) => *b,
2273            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2274        };
2275        match unboxed {
2276            crate::EngineError::Contention(
2277                crate::ContentionKind::ExecutionNotActive {
2278                    terminal_outcome,
2279                    lease_epoch,
2280                    lifecycle_phase,
2281                    attempt_id,
2282                },
2283            ) => {
2284                assert_eq!(terminal_outcome, "none");
2285                assert_eq!(lease_epoch, "7");
2286                assert_eq!(lifecycle_phase, "runnable");
2287                assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2288            }
2289            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2290        }
2291    }
2292
2293    /// Empty detail slots must default to "" (not panic) so older-Lua
2294    /// producers or malformed replies degrade to an unreconcilable
2295    /// variant rather than a Parse error.
2296    #[test]
2297    fn parse_success_result_missing_slots_defaults_to_empty() {
2298        let raw = Value::Array(vec![
2299            Ok(Value::Int(0)),
2300            Ok(bulk("execution_not_active")),
2301        ]);
2302        let err = parse_success_result(&raw, "test").unwrap_err();
2303        let unboxed = match err {
2304            SdkError::Engine(b) => *b,
2305            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2306        };
2307        match unboxed {
2308            crate::EngineError::Contention(
2309                crate::ContentionKind::ExecutionNotActive {
2310                    terminal_outcome,
2311                    lease_epoch,
2312                    lifecycle_phase,
2313                    attempt_id,
2314                },
2315            ) => {
2316                assert_eq!(terminal_outcome, "");
2317                assert_eq!(lease_epoch, "");
2318                assert_eq!(lifecycle_phase, "");
2319                assert_eq!(attempt_id, "");
2320            }
2321            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2322        }
2323    }
2324}