Skip to main content

ff_sdk/
task.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_script::error::ScriptError;
11use ff_core::keys::{ExecKeyContext, IndexKeys};
12use ff_core::partition::{execution_partition, PartitionConfig};
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21/// Timeout behavior when a suspension deadline is reached.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum TimeoutBehavior {
24    Fail,
25    Cancel,
26    Expire,
27    AutoResume,
28    Escalate,
29}
30
31impl TimeoutBehavior {
32    pub fn as_str(&self) -> &str {
33        match self {
34            Self::Fail => "fail",
35            Self::Cancel => "cancel",
36            Self::Expire => "expire",
37            Self::AutoResume => "auto_resume_with_timeout_signal",
38            Self::Escalate => "escalate",
39        }
40    }
41}
42
43impl std::fmt::Display for TimeoutBehavior {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.write_str(self.as_str())
46    }
47}
48
49impl std::str::FromStr for TimeoutBehavior {
50    type Err = String;
51
52    fn from_str(s: &str) -> Result<Self, Self::Err> {
53        match s {
54            "fail" => Ok(Self::Fail),
55            "cancel" => Ok(Self::Cancel),
56            "expire" => Ok(Self::Expire),
57            "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
58            "escalate" => Ok(Self::Escalate),
59            other => Err(format!("unknown timeout behavior: {other}")),
60        }
61    }
62}
63
64/// A condition matcher for the resume condition.
65#[derive(Clone, Debug)]
66pub struct ConditionMatcher {
67    /// Signal name to match (empty = wildcard).
68    pub signal_name: String,
69}
70
71/// Outcome of a `suspend()` call.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub enum SuspendOutcome {
74    /// Execution is now suspended, waitpoint active.
75    Suspended {
76        suspension_id: SuspensionId,
77        waitpoint_id: WaitpointId,
78        waitpoint_key: String,
79        /// HMAC token that signal deliverers must supply to this waitpoint.
80        waitpoint_token: WaitpointToken,
81    },
82    /// Buffered signals already satisfied the condition — suspension skipped.
83    /// The caller still holds the lease.
84    AlreadySatisfied {
85        suspension_id: SuspensionId,
86        waitpoint_id: WaitpointId,
87        waitpoint_key: String,
88        waitpoint_token: WaitpointToken,
89    },
90}
91
92/// A signal to deliver to a suspended execution's waitpoint.
93#[derive(Clone, Debug)]
94pub struct Signal {
95    pub signal_name: String,
96    pub signal_category: String,
97    pub payload: Option<Vec<u8>>,
98    pub source_type: String,
99    pub source_identity: String,
100    pub idempotency_key: Option<String>,
101    /// HMAC token issued when the waitpoint was created. Required for
102    /// authenticated signal delivery (RFC-004 §Waitpoint Security).
103    pub waitpoint_token: WaitpointToken,
104}
105
106/// Outcome of `deliver_signal()`.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum SignalOutcome {
109    /// Signal accepted, appended to waitpoint but condition not yet satisfied.
110    Accepted { signal_id: SignalId, effect: String },
111    /// Signal triggered resume — execution is now runnable.
112    TriggeredResume { signal_id: SignalId },
113    /// Duplicate signal (idempotency key matched).
114    Duplicate { existing_signal_id: String },
115}
116
117impl SignalOutcome {
118    /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
119    ///
120    /// Consuming packages that call `ff_deliver_signal` directly can use this
121    /// to interpret the Lua return value without depending on SDK internals.
122    pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
123        parse_signal_result(raw)
124    }
125}
126
127/// A signal that triggered a resume, readable by a worker after re-claim.
128///
129/// **RFC-012 Stage 0:** the canonical definition has moved to
130/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
131/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
132/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
133/// scheduled for removal in 0.5.0.
134pub use ff_core::backend::ResumeSignal;
135
136/// Outcome of `append_frame()`.
137///
138/// **RFC-012 §R7.2.1:** canonical definition moved to
139/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
140/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
141/// 0.4.x window. Existing consumers construct + match exhaustively
142/// without change. The derive set widened from `Clone, Debug` to
143/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
144pub use ff_core::backend::AppendFrameOutcome;
145
146/// Outcome of a `fail()` call.
147///
148/// **RFC-012 Stage 1a:** canonical definition moved to
149/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
150/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
151/// re-export in `lib.rs`) through the 0.4.x window. Existing
152/// consumers construct + match exhaustively without change.
153pub use ff_core::backend::FailOutcome;
154
155/// A claimed execution with an active lease. The worker processes this task
156/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
157///
158/// The lease is automatically renewed in the background at `lease_ttl / 3`
159/// intervals. Renewal stops when the task is consumed or dropped.
160///
161/// `complete`, `fail`, and `cancel` consume `self` — this prevents
162/// double-complete bugs at the type level.
163pub struct ClaimedTask {
164    /// Shared Valkey client.
165    client: Client,
166    /// `EngineBackend` the trait-migrated ops forward through.
167    ///
168    /// **RFC-012 Stage 1b + Round-7.** Today this is always a
169    /// `ValkeyBackend` wrapping `client`. Most per-task ops
170    /// (`complete`/`fail`/`cancel`/`delay_execution`/
171    /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
172    /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
173    /// through `backend.*()`. `suspend` still uses
174    /// `client.fcall("ff_suspend_execution", ...)` directly — this is
175    /// the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
176    /// input-shape work. Lease renewal goes through
177    /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
178    /// trait-shape gaps tracked by #117.
179    ///
180    /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
181    /// deliver_signal) through the same trait surface; Stage 1d
182    /// refactors this struct to carry a single `Handle` rather than
183    /// synthesising one per op via `synth_handle`.
184    backend: Arc<dyn EngineBackend>,
185    /// Partition config for key construction.
186    partition_config: PartitionConfig,
187    /// Execution identity.
188    execution_id: ExecutionId,
189    /// Current attempt.
190    attempt_index: AttemptIndex,
191    attempt_id: AttemptId,
192    /// Lease identity.
193    lease_id: LeaseId,
194    lease_epoch: LeaseEpoch,
195    /// Lease timing.
196    lease_ttl_ms: u64,
197    /// Lane used at claim time.
198    lane_id: LaneId,
199    /// Worker instance that holds this lease (for index cleanup keys).
200    worker_instance_id: WorkerInstanceId,
201    /// Execution data.
202    input_payload: Vec<u8>,
203    execution_kind: String,
204    tags: HashMap<String, String>,
205    /// Background renewal task handle.
206    renewal_handle: JoinHandle<()>,
207    /// Signal to stop renewal (used before consuming self).
208    renewal_stop: Arc<Notify>,
209    /// Consecutive lease renewal failures. Shared with the background renewal
210    /// task. Reset to 0 on each successful renewal. Workers should check
211    /// `is_lease_healthy()` before committing expensive side effects.
212    renewal_failures: Arc<AtomicU32>,
213    /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
214    /// response is received, just before `self` is consumed into `Drop`.
215    /// `Drop` reads this instead of `renewal_handle.is_finished()` to
216    /// suppress the false-positive "dropped without terminal operation"
217    /// warning: after `notify_one`, the renewal task has not yet been
218    /// polled by the runtime, so `is_finished()` is still `false` on the
219    /// happy path when self is being consumed.
220    ///
221    /// Note: the flag is set for any terminal-op path that reaches
222    /// `stop_renewal()`, which includes Lua-level script errors (the
223    /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
224    /// the caller already receives the `Err` via the op's return value,
225    /// so an additional `Drop` warning would be noise. The warning is
226    /// reserved for genuine drop-without-terminal-op cases (panic, early
227    /// return, transport failure before stop_renewal ran).
228    terminal_op_called: AtomicBool,
229    /// Concurrency permit from the worker's semaphore. Held for the lifetime
230    /// of the task; released on complete/fail/cancel/drop.
231    _concurrency_permit: Option<OwnedSemaphorePermit>,
232}
233
234impl ClaimedTask {
235    /// Construct a `ClaimedTask` from the results of a successful
236    /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
237    ///
238    /// # Arguments
239    ///
240    /// * `client` — shared Valkey client used for subsequent lease
241    ///   renewals, signal delivery, and the final
242    ///   complete/fail/cancel FCALL.
243    /// * `partition_config` — partition topology snapshot read at
244    ///   `FlowFabricWorker::connect`. Used for key construction on
245    ///   the lifetime of this task.
246    /// * `execution_id` — the claimed execution's UUID.
247    /// * `attempt_index` / `attempt_id` — current attempt identity.
248    ///   `attempt_index` is 0 on a fresh claim, preserved on a
249    ///   resumed claim.
250    /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
251    ///   is returned by the Lua FCALL and bumped on each resumed
252    ///   claim.
253    /// * `lease_ttl_ms` — lease TTL used to schedule the background
254    ///   renewal task (renews at `lease_ttl_ms / 3`).
255    /// * `lane_id` — the lane the task was claimed on. Used for
256    ///   index key construction (`lane_active`, `lease_expiry`,
257    ///   etc.).
258    /// * `worker_instance_id` — this worker's identity. Used to
259    ///   resolve `worker_leases` index entries during renewal and
260    ///   completion.
261    /// * `input_payload` / `execution_kind` / `tags` — pre-read
262    ///   execution metadata, exposed via getters so the worker
263    ///   doesn't round-trip to Valkey to inspect what it just
264    ///   claimed.
265    ///
266    /// # Invariant: constructor is `pub(crate)` on purpose
267    ///
268    /// **External callers cannot construct a `ClaimedTask`** — only
269    /// the in-crate claim entry points (`claim_next`,
270    /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
271    /// load-bearing: those entry points are the ONLY sites that
272    /// acquire a permit from the worker's concurrency semaphore
273    /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
274    /// it through `ClaimedTask::set_concurrency_permit` before
275    /// returning the task. Promoting `new` to `pub` would let
276    /// consumers build tasks that bypass the concurrency contract
277    /// the worker's `max_concurrent_tasks` config advertises — the
278    /// returned task would run alongside other in-flight work
279    /// without debiting the permit bank, and the
280    /// complete/fail/cancel/drop path would have nothing to
281    /// release.
282    ///
283    /// If an external callsite genuinely needs to rehydrate a
284    /// task from saved FCALL results, the right answer is a new
285    /// `FlowFabricWorker` entry point that wraps `new` + acquires a
286    /// permit — not promoting this constructor.
287    #[allow(clippy::too_many_arguments)]
288    pub(crate) fn new(
289        client: Client,
290        backend: Arc<dyn EngineBackend>,
291        partition_config: PartitionConfig,
292        execution_id: ExecutionId,
293        attempt_index: AttemptIndex,
294        attempt_id: AttemptId,
295        lease_id: LeaseId,
296        lease_epoch: LeaseEpoch,
297        lease_ttl_ms: u64,
298        lane_id: LaneId,
299        worker_instance_id: WorkerInstanceId,
300        input_payload: Vec<u8>,
301        execution_kind: String,
302        tags: HashMap<String, String>,
303    ) -> Self {
304        let renewal_stop = Arc::new(Notify::new());
305        let renewal_failures = Arc::new(AtomicU32::new(0));
306
307        // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
308        // Build the handle once at construction time and hand both Arc clones
309        // into the spawned task.
310        let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
311            execution_id.clone(),
312            attempt_index,
313            attempt_id.clone(),
314            lease_id.clone(),
315            lease_epoch,
316            lease_ttl_ms,
317            lane_id.clone(),
318            worker_instance_id.clone(),
319            HandleKind::Fresh,
320        );
321        let renewal_handle = spawn_renewal_task(
322            backend.clone(),
323            renewal_handle_cookie,
324            execution_id.clone(),
325            lease_ttl_ms,
326            renewal_stop.clone(),
327            renewal_failures.clone(),
328        );
329
330        Self {
331            client,
332            backend,
333            partition_config,
334            execution_id,
335            attempt_index,
336            attempt_id,
337            lease_id,
338            lease_epoch,
339            lease_ttl_ms,
340            lane_id,
341            worker_instance_id,
342            input_payload,
343            execution_kind,
344            tags,
345            renewal_handle,
346            renewal_stop,
347            renewal_failures,
348            terminal_op_called: AtomicBool::new(false),
349            _concurrency_permit: None,
350        }
351    }
352
353    /// Attach a concurrency permit from the worker's semaphore.
354    /// The permit is held for the task's lifetime and released on drop.
355    #[allow(dead_code)]
356    pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
357        self._concurrency_permit = Some(permit);
358    }
359
360    // ── Accessors ──
361
362    pub fn execution_id(&self) -> &ExecutionId {
363        &self.execution_id
364    }
365
366    pub fn attempt_index(&self) -> AttemptIndex {
367        self.attempt_index
368    }
369
370    pub fn attempt_id(&self) -> &AttemptId {
371        &self.attempt_id
372    }
373
374    pub fn lease_id(&self) -> &LeaseId {
375        &self.lease_id
376    }
377
378    pub fn lease_epoch(&self) -> LeaseEpoch {
379        self.lease_epoch
380    }
381
382    pub fn input_payload(&self) -> &[u8] {
383        &self.input_payload
384    }
385
386    pub fn execution_kind(&self) -> &str {
387        &self.execution_kind
388    }
389
390    pub fn tags(&self) -> &HashMap<String, String> {
391        &self.tags
392    }
393
394    pub fn lane_id(&self) -> &LaneId {
395        &self.lane_id
396    }
397
398    /// Read frames from this task's attempt stream.
399    ///
400    /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
401    /// no longer need the `ferriskey::Client` leak (#87). See
402    /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
403    /// for the backend-level contract.
404    ///
405    /// Validation (count_limit bounds) runs at this boundary — out-of-range
406    /// input surfaces as [`SdkError::Config`] before reaching the backend.
407    pub async fn read_stream(
408        &self,
409        from: StreamCursor,
410        to: StreamCursor,
411        count_limit: u64,
412    ) -> Result<StreamFrames, SdkError> {
413        validate_stream_read_count(count_limit)?;
414        Ok(self
415            .backend
416            .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
417            .await?)
418    }
419
420    /// Tail this task's attempt stream for new frames.
421    ///
422    /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
423    /// for the head-of-line warning — the task's backend is shared
424    /// with claim/complete/fail hot paths. Consumers that need
425    /// isolation should build a dedicated `EngineBackend` for tail
426    /// reads.
427    pub async fn tail_stream(
428        &self,
429        after: StreamCursor,
430        block_ms: u64,
431        count_limit: u64,
432    ) -> Result<StreamFrames, SdkError> {
433        if block_ms > MAX_TAIL_BLOCK_MS {
434            return Err(SdkError::Config {
435                context: "tail_stream".into(),
436                field: Some("block_ms".into()),
437                message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
438            });
439        }
440        validate_stream_read_count(count_limit)?;
441        validate_tail_cursor(&after)?;
442        Ok(self
443            .backend
444            .tail_stream(
445                &self.execution_id,
446                self.attempt_index,
447                after,
448                block_ms,
449                count_limit,
450            )
451            .await?)
452    }
453
454    /// Synthesise a Valkey-backend `Handle` encoding this task's
455    /// attempt-cookie fields. Private to the SDK — the trait
456    /// forwarders call this per op to produce the `&Handle` argument
457    /// the `EngineBackend` methods take.
458    ///
459    /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
460    /// carry one cached `Handle` instead of synthesising per op is
461    /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
462    /// a single allocation of a small byte buffer — negligible
463    /// compared to the FCALL round-trip that follows.
464    ///
465    /// Kind is always `HandleKind::Fresh` because today the 9
466    /// Stage-1b ops all treat the backend tag + opaque bytes as
467    /// authoritative; they do not dispatch on kind. Stage 1d routes
468    /// resumed-claim callers through a `Resumed` synth.
469    fn synth_handle(&self) -> Handle {
470        ff_backend_valkey::ValkeyBackend::encode_handle(
471            self.execution_id.clone(),
472            self.attempt_index,
473            self.attempt_id.clone(),
474            self.lease_id.clone(),
475            self.lease_epoch,
476            self.lease_ttl_ms,
477            self.lane_id.clone(),
478            self.worker_instance_id.clone(),
479            HandleKind::Fresh,
480        )
481    }
482
483    /// Check if the lease is likely still valid based on renewal success.
484    ///
485    /// Returns `false` if 3 or more consecutive renewal attempts have failed.
486    /// Workers should check this before committing expensive or irreversible
487    /// side effects. A `false` return means Valkey may have already expired
488    /// the lease and another worker could be processing this execution.
489    pub fn is_lease_healthy(&self) -> bool {
490        self.renewal_failures.load(Ordering::Relaxed) < 3
491    }
492
493    /// Number of consecutive lease renewal failures since the last success.
494    ///
495    /// Returns 0 when renewals are working normally. Useful for observability
496    /// and custom health policies beyond the default threshold of 3.
497    pub fn consecutive_renewal_failures(&self) -> u32 {
498        self.renewal_failures.load(Ordering::Relaxed)
499    }
500
501    // ── Terminal operations (consume self) ──
502
503    /// Delay the execution until `delay_until`.
504    ///
505    /// Releases the lease. The execution moves to `delayed` state.
506    /// Consumes self — the task cannot be used after delay.
507    pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
508        // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
509        // the pre-migration `stop_renewal` contract: called only when
510        // the FCALL round-tripped (Ok or a typed Lua/engine error);
511        // raw transport errors bubble up without stopping renewal so
512        // the `Drop` warning still fires if the caller later drops
513        // `self` without retrying.
514        let handle = self.synth_handle();
515        let out = self.backend.delay(&handle, delay_until).await;
516        if fcall_landed(&out) {
517            self.stop_renewal();
518        }
519        out.map_err(SdkError::from)
520    }
521
522    /// Move execution to waiting_children state.
523    ///
524    /// Releases the lease. The execution waits for child dependencies to complete.
525    /// Consumes self.
526    pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
527        // RFC-012 Stage 1b: forwards through `backend.wait_children`.
528        // See `delay_execution` for the stop_renewal contract.
529        let handle = self.synth_handle();
530        let out = self.backend.wait_children(&handle).await;
531        if fcall_landed(&out) {
532            self.stop_renewal();
533        }
534        out.map_err(SdkError::from)
535    }
536
537    /// Complete the execution successfully.
538    ///
539    /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
540    /// Renewal continues during the FCALL to prevent lease expiry under
541    /// network latency — the Lua fences on lease_id+epoch atomically.
542    /// Consumes self — the task cannot be used after completion.
543    ///
544    /// # Connection errors and replay
545    ///
546    /// If the Valkey connection drops after the Lua commit but before the
547    /// client reads the response, retrying `complete()` with the same
548    /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
549    /// reconciles the "already terminal with matching outcome" response
550    /// into `Ok(())`. A retry after a different terminal op has raced in
551    /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
552    /// populated `terminal_outcome` so the caller can see what actually
553    /// happened.
554    pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
555        // RFC-012 Stage 1b: forwards through `backend.complete`.
556        // The FCALL body + the `ExecutionNotActive` replay reconciliation
557        // (match same epoch + attempt_id + outcome=="success" → Ok)
558        // moved to `ff_backend_valkey::complete_impl`.
559        let handle = self.synth_handle();
560        let out = self.backend.complete(&handle, result_payload).await;
561        if fcall_landed(&out) {
562            self.stop_renewal();
563        }
564        out.map_err(SdkError::from)
565    }
566
567    /// Fail the execution with a reason and error category.
568    ///
569    /// If the execution policy allows retries, the engine schedules a retry
570    /// (delayed backoff). Otherwise, the execution becomes terminal failed.
571    /// Returns [`FailOutcome`] so the caller knows what happened.
572    ///
573    /// # Connection errors and replay
574    ///
575    /// `fail()` is replay-safe under the same conditions as `complete()`:
576    /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
577    /// reconciled into the outcome the server actually committed
578    /// (`TerminalFailed` if no retries left; `RetryScheduled` with
579    /// `delay_until = 0` if a retry was scheduled — the exact delay is
580    /// not recovered on the replay path).
581    pub async fn fail(
582        self,
583        reason: &str,
584        error_category: &str,
585    ) -> Result<FailOutcome, SdkError> {
586        // RFC-012 Stage 1b: forwards through `backend.fail`.
587        // The FCALL body + retry-policy read + the two-shape replay
588        // reconciliation (terminal/failed → TerminalFailed, runnable
589        // → RetryScheduled{0}) moved to
590        // `ff_backend_valkey::fail_impl`.
591        //
592        // **Preserving the caller's raw category string.** The
593        // pre-Stage-1b code passed `error_category` straight to the
594        // Lua `failure_category` ARGV. Real callers use arbitrary
595        // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
596        // `"inference_error"`, …) that do not correspond to the 5
597        // named `FailureClass` variants. A lossy enum conversion
598        // would rewrite every non-canonical category to
599        // `Transient` — a real behavior change for
600        // ff-readiness-tests + the examples crates.
601        //
602        // Instead: pack the raw category bytes into
603        // `FailureReason.detail`; `fail_impl` reads them back and
604        // threads them to Lua verbatim. The `classification` enum
605        // is derived from the string for the few consumers who
606        // match on the trait return today (none in-workspace).
607        // Stage 1d (or issue #117) widens `FailureClass` with a
608        // `Custom(String)` arm; this stash carrier retires then.
609        let handle = self.synth_handle();
610        let failure_reason = ff_core::backend::FailureReason::with_detail(
611            reason.to_owned(),
612            error_category.as_bytes().to_vec(),
613        );
614        let classification = error_category_to_class(error_category);
615        let out = self.backend.fail(&handle, failure_reason, classification).await;
616        if fcall_landed(&out) {
617            self.stop_renewal();
618        }
619        out.map_err(SdkError::from)
620    }
621
622    /// Cancel the execution.
623    ///
624    /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
625    /// Consumes self.
626    ///
627    /// # Connection errors and replay
628    ///
629    /// `cancel()` is replay-safe under the same conditions as `complete()`:
630    /// a retry by the same caller matching `lease_epoch` + `attempt_id`
631    /// returns `Ok(())` if the server's stored `terminal_outcome` is
632    /// `cancelled`. A retry that finds a different outcome (because a
633    /// concurrent `complete()` or `fail()` won the race) surfaces as
634    /// `ExecutionNotActive` with the populated `terminal_outcome` so the
635    /// caller can see that the cancel intent was NOT honored.
636    pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
637        // RFC-012 Stage 1b: forwards through `backend.cancel`.
638        // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
639        // and the `ExecutionNotActive` → outcome=="cancelled" replay
640        // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
641        let handle = self.synth_handle();
642        let out = self.backend.cancel(&handle, reason).await;
643        if fcall_landed(&out) {
644            self.stop_renewal();
645        }
646        out.map_err(SdkError::from)
647    }
648
649    // ── Non-terminal operations ──
650
651    /// Manually renew the lease.
652    ///
653    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
654    /// trait. The background renewal task calls
655    /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
656    /// this method is the public entry point for workers that want
657    /// to force a renew out-of-band.
658    pub async fn renew_lease(&self) -> Result<(), SdkError> {
659        let handle = self.synth_handle();
660        self.backend
661            .renew(&handle)
662            .await
663            .map(|_renewal| ())
664            .map_err(SdkError::from)
665    }
666
667    /// Update progress (pct 0-100 and optional message).
668    ///
669    /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
670    /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
671    ///
672    /// # Not for stream frames
673    ///
674    /// `update_progress` writes the `progress_percent` / `progress_message`
675    /// fields on `exec_core` (the execution's state hash). It is a
676    /// scalar heartbeat — each call overwrites the previous value and
677    /// nothing is appended to the output stream. Producers emitting
678    /// stream frames (arbitrary `frame_type` + payload, consumed via
679    /// `ClaimedTask::read_stream` / `tail_stream` or the HTTP
680    /// stream-tail routes) MUST use [`Self::append_frame`] instead;
681    /// `update_progress` is invisible to stream consumers.
682    pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
683        let handle = self.synth_handle();
684        self.backend
685            .progress(&handle, Some(pct), Some(message.to_owned()))
686            .await
687            .map_err(SdkError::from)
688    }
689
690    /// Report usage against a budget and check limits.
691    ///
692    /// Non-consuming — the worker can report usage multiple times.
693    /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
694    /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
695    ///
696    /// **RFC-012 §R7.2.3:** forwards through
697    /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
698    /// The trait's `UsageDimensions.dedup_key` carries the raw key;
699    /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
700    /// wrap so the dedup state co-locates with the budget partition
701    /// (PR #108).
702    pub async fn report_usage(
703        &self,
704        budget_id: &BudgetId,
705        dimensions: &[(&str, u64)],
706        dedup_key: Option<&str>,
707    ) -> Result<ReportUsageResult, SdkError> {
708        let handle = self.synth_handle();
709        let mut dims = ff_core::backend::UsageDimensions::default();
710        for (name, delta) in dimensions {
711            dims.custom.insert((*name).to_owned(), *delta);
712        }
713        dims.dedup_key = dedup_key
714            .filter(|k| !k.is_empty())
715            .map(|k| k.to_owned());
716        self.backend
717            .report_usage(&handle, budget_id, dims)
718            .await
719            .map_err(SdkError::from)
720    }
721
722    /// Create a pending waitpoint for future signal delivery.
723    ///
724    /// Non-consuming — the worker keeps the lease. Signals delivered to the
725    /// waitpoint are buffered. When the worker later calls `suspend()` with
726    /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
727    /// resume condition.
728    ///
729    /// Returns both the waitpoint_id AND the HMAC token required by external
730    /// callers to buffer signals against this pending waitpoint
731    /// (RFC-004 §Waitpoint Security).
732    ///
733    /// **RFC-012 §R7.2.2:** forwards through
734    /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
735    /// The trait returns
736    /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
737    /// `hmac_token` is the same wire HMAC this method has always
738    /// produced; the SDK unwraps it back to the historical
739    /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
740    pub async fn create_pending_waitpoint(
741        &self,
742        waitpoint_key: &str,
743        expires_in_ms: u64,
744    ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
745        let handle = self.synth_handle();
746        let expires_in = std::time::Duration::from_millis(expires_in_ms);
747        let pending = self
748            .backend
749            .create_waitpoint(&handle, waitpoint_key, expires_in)
750            .await
751            .map_err(SdkError::from)?;
752        // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
753        // `.token()` accessor borrows the underlying
754        // `WaitpointToken`, which is the caller's historical return
755        // shape.
756        Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
757    }
758
759    // ── Phase 4: Streaming ──
760
761    /// Append a frame to the current attempt's output stream.
762    ///
763    /// Non-consuming — the worker can append many frames during execution.
764    /// The stream is created lazily on the first append.
765    ///
766    /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
767    /// `EngineBackend` trait. The free-form `frame_type` tag and
768    /// optional `metadata` (wire `correlation_id`) travel on the
769    /// extended [`ff_core::backend::Frame`] shape (`frame_type:
770    /// String`, `correlation_id: Option<String>`), giving byte-for-byte
771    /// wire parity with the pre-migration direct-FCALL path.
772    ///
773    /// # `append_frame` vs [`Self::update_progress`]
774    ///
775    /// Stream-frame producers (arbitrary `frame_type` + payload
776    /// consumed via `ClaimedTask::read_stream` / `tail_stream` or the
777    /// HTTP stream-tail routes) MUST use `append_frame`.
778    /// `update_progress` writes scalar `progress_percent` /
779    /// `progress_message` fields to `exec_core` and is invisible to
780    /// stream consumers.
781    pub async fn append_frame(
782        &self,
783        frame_type: &str,
784        payload: &[u8],
785        metadata: Option<&str>,
786    ) -> Result<AppendFrameOutcome, SdkError> {
787        let handle = self.synth_handle();
788        let mut frame = ff_core::backend::Frame::new(
789            payload.to_vec(),
790            ff_core::backend::FrameKind::Event,
791        )
792        .with_frame_type(frame_type);
793        if let Some(cid) = metadata {
794            frame = frame.with_correlation_id(cid);
795        }
796        self.backend
797            .append_frame(&handle, frame)
798            .await
799            .map_err(SdkError::from)
800    }
801
802    // ── Phase 3: Suspend ──
803
804    /// Suspend the execution, releasing the lease and creating a waitpoint.
805    ///
806    /// The execution transitions to `suspended` and the worker loses ownership.
807    /// An external signal matching the condition will resume the execution.
808    ///
809    /// If `condition_matchers` is empty, a wildcard matcher is created that
810    /// matches ANY signal name. To require an explicit operator resume with
811    /// no signal match, pass a sentinel name that no real signal will use.
812    ///
813    /// If buffered signals on a pending waitpoint already satisfy the condition,
814    /// returns `AlreadySatisfied` and the lease is NOT released.
815    ///
816    /// Consumes self — the task cannot be used after suspension.
817    // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
818    // method `suspend` returns `Handle` (a resumed-kind attempt
819    // cookie); this SDK method returns `SuspendOutcome { suspension_id,
820    // waitpoint_id, waitpoint_key, waitpoint_token }` — semantically
821    // distinct return shapes. Also: SDK takes `&[ConditionMatcher]` +
822    // `TimeoutBehavior` with no `WaitpointSpec` mapping. Tracked in #117.
823    pub async fn suspend(
824        self,
825        reason_code: &str,
826        condition_matchers: &[ConditionMatcher],
827        timeout_ms: Option<u64>,
828        timeout_behavior: TimeoutBehavior,
829    ) -> Result<SuspendOutcome, SdkError> {
830        let partition = execution_partition(&self.execution_id, &self.partition_config);
831        let ctx = ExecKeyContext::new(&partition, &self.execution_id);
832        let idx = IndexKeys::new(&partition);
833
834        let suspension_id = SuspensionId::new();
835        let waitpoint_id = WaitpointId::new();
836        // For now, use a simple opaque key (real implementation would use HMAC token)
837        let waitpoint_key = format!("wpk:{}", waitpoint_id);
838
839        let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
840
841        // Build resume condition JSON
842        let required_signal_names: Vec<&str> = condition_matchers
843            .iter()
844            .map(|m| m.signal_name.as_str())
845            .collect();
846        let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
847        let resume_condition_json = serde_json::json!({
848            "condition_type": "signal_set",
849            "required_signal_names": required_signal_names,
850            "signal_match_mode": match_mode,
851            "minimum_signal_count": 1,
852            "timeout_behavior": timeout_behavior.as_str(),
853            "allow_operator_override": true,
854        }).to_string();
855
856        let resume_policy_json = serde_json::json!({
857            "resume_target": "runnable",
858            "close_waitpoint_on_resume": true,
859            "consume_matched_signals": true,
860            "retain_signal_buffer_until_closed": true,
861        }).to_string();
862
863        // KEYS (17): exec_core, attempt_record, lease_current, lease_history,
864        //            lease_expiry, worker_leases, suspension_current, waitpoint_hash,
865        //            waitpoint_signals, suspension_timeout, pending_wp_expiry,
866        //            active_index, suspended_index, waitpoint_history, wp_condition,
867        //            attempt_timeout, hmac_secrets
868        let keys: Vec<String> = vec![
869            ctx.core(),                                          // 1
870            ctx.attempt_hash(self.attempt_index),                // 2
871            ctx.lease_current(),                                 // 3
872            ctx.lease_history(),                                 // 4
873            idx.lease_expiry(),                                  // 5
874            idx.worker_leases(&self.worker_instance_id),         // 6
875            ctx.suspension_current(),                            // 7
876            ctx.waitpoint(&waitpoint_id),                        // 8
877            ctx.waitpoint_signals(&waitpoint_id),                // 9
878            idx.suspension_timeout(),                            // 10
879            idx.pending_waitpoint_expiry(),                      // 11
880            idx.lane_active(&self.lane_id),                      // 12
881            idx.lane_suspended(&self.lane_id),                   // 13
882            ctx.waitpoints(),                                    // 14
883            ctx.waitpoint_condition(&waitpoint_id),              // 15
884            idx.attempt_timeout(),                               // 16
885            idx.waitpoint_hmac_secrets(),                        // 17
886        ];
887
888        // ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
889        //            lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
890        //            reason_code, requested_by, timeout_at, resume_condition_json,
891        //            resume_policy_json, continuation_metadata_pointer,
892        //            use_pending_waitpoint, timeout_behavior, lease_history_maxlen
893        let args: Vec<String> = vec![
894            self.execution_id.to_string(),                          // 1
895            self.attempt_index.to_string(),                         // 2
896            self.attempt_id.to_string(),                            // 3
897            self.lease_id.to_string(),                              // 4
898            self.lease_epoch.to_string(),                           // 5
899            suspension_id.to_string(),                              // 6
900            waitpoint_id.to_string(),                               // 7
901            waitpoint_key.clone(),                                  // 8
902            reason_code.to_owned(),                                 // 9
903            "worker".to_owned(),                                    // 10
904            timeout_at.map_or(String::new(), |t| t.to_string()),   // 11
905            resume_condition_json,                                  // 12
906            resume_policy_json,                                     // 13
907            String::new(),                                          // 14 continuation_metadata_ptr
908            String::new(),                                          // 15 use_pending_waitpoint
909            timeout_behavior.as_str().to_owned(),                   // 16
910            "1000".to_owned(),                                      // 17 lease_history_maxlen
911        ];
912
913        let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
914        let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
915
916        let raw: Value = self
917            .client
918            .fcall("ff_suspend_execution", &key_refs, &arg_refs)
919            .await
920            .map_err(SdkError::from)?;
921
922        self.stop_renewal();
923        parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
924    }
925
926    /// Read the signals that satisfied the waitpoint and triggered this
927    /// resume.
928    ///
929    /// Non-consuming. Intended to be called immediately after re-claim via
930    /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
931    /// subsequent `suspend()` (which replaces `suspension:current`).
932    ///
933    /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
934    ///
935    /// - No prior suspension on this execution.
936    /// - The prior suspension belonged to an earlier attempt (e.g. the
937    ///   attempt was cancelled/failed and a retry is now claiming).
938    /// - The prior suspension was closed by timeout / cancel / operator
939    ///   override rather than by a matched signal.
940    ///
941    /// Reads `suspension:current` once, filters by `attempt_index` to
942    /// guard against stale prior-attempt records, then fetches the matched
943    /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
944    /// fields and reads each signal's metadata + payload directly.
945    pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
946        // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
947        // Pre-migration body (HGETALL suspension_current + HMGET
948        // matchers + pipelined HGETALL signal_hash / GET
949        // signal_payload) lives in
950        // `ff_backend_valkey::observe_signals_impl`.
951        let handle = self.synth_handle();
952        self.backend
953            .observe_signals(&handle)
954            .await
955            .map_err(SdkError::from)
956    }
957
958    /// Signal the renewal task to stop. Called by every terminal op
959    /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
960    /// `move_to_waiting_children`) after the FCALL returns. Also marks
961    /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
962    /// consumption from a genuine drop-without-terminal-op.
963    fn stop_renewal(&self) {
964        self.terminal_op_called.store(true, Ordering::Release);
965        self.renewal_stop.notify_one();
966    }
967
968}
969
970/// True iff the backend's FCALL result represents a round-trip that
971/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
972/// contention, conflict, state, bug) all count as "landed" — the
973/// server either committed or rejected with a typed response. Only
974/// raw `Transport` errors (connection drops, request timeouts, parse
975/// failures) count as "did not land", which matches the pre-Stage-1b
976/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
977/// — those errors returned before `stop_renewal()` ran, preserving
978/// the `Drop` warning for genuine "lease will leak" cases.
979///
980/// Stage 1b terminal-op forwarders use this predicate to decide
981/// whether to call `stop_renewal()`: yes for landed responses, no
982/// for transport errors so the caller's retry path still sees a
983/// running renewal task.
984fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
985    match r {
986        Ok(_) => true,
987        Err(crate::EngineError::Transport { .. }) => false,
988        Err(_) => true,
989    }
990}
991
992/// Map the SDK's free-form `error_category: &str` to the typed
993/// `FailureClass` the trait's `fail` method takes. Unknown categories
994/// fall through to `Transient` — the Lua side already tolerated
995/// arbitrary strings, so the worst a category drift does under the
996/// Stage 1b forwarder is reclassify a novel category as transient.
997/// Stage 1d (or issue #117) widens `FailureClass` with a
998/// `Custom(String)` arm for exact round-trip.
999fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
1000    use ff_core::backend::FailureClass;
1001    match s {
1002        "transient" => FailureClass::Transient,
1003        "permanent" => FailureClass::Permanent,
1004        "infra_crash" => FailureClass::InfraCrash,
1005        "timeout" => FailureClass::Timeout,
1006        "cancelled" => FailureClass::Cancelled,
1007        _ => FailureClass::Transient,
1008    }
1009}
1010
1011impl Drop for ClaimedTask {
1012    fn drop(&mut self) {
1013        // Abort the background renewal task on drop.
1014        // This is a safety net — complete/fail/cancel already stop renewal
1015        // via notify before consuming self. But if the task is dropped
1016        // without being consumed (e.g., panic), abort prevents leaked renewals.
1017        //
1018        // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1019        // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1020        // and then self is consumed into Drop immediately. The renewal task
1021        // has not yet been polled by the runtime, so `is_finished()` is still
1022        // `false` here — which previously fired the warning on every
1023        // complete/fail/cancel/suspend call. `terminal_op_called` is the
1024        // authoritative signal that a terminal-op path ran to the point of
1025        // stopping renewal; it does not by itself certify the Lua side
1026        // succeeded (see the field doc). The caller surfaces any error via
1027        // the op's return value, so a `Drop` warning is unneeded there.
1028        if !self.terminal_op_called.load(Ordering::Acquire) {
1029            tracing::warn!(
1030                execution_id = %self.execution_id,
1031                "ClaimedTask dropped without terminal operation — lease will expire"
1032            );
1033        }
1034        self.renewal_handle.abort();
1035    }
1036}
1037
1038// ── Lease renewal ──
1039
1040/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1041/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1042/// hooks still see one span per renewal (restores PR #119 Cursor
1043/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1044/// construction time, not per-tick).
1045///
1046/// See `benches/harness/src/bin/long_running.rs` for the bench
1047/// consumer that depends on this span naming.
1048#[tracing::instrument(
1049    name = "renew_lease",
1050    skip_all,
1051    fields(execution_id = %execution_id)
1052)]
1053async fn renew_once(
1054    backend: &dyn EngineBackend,
1055    handle: &Handle,
1056    execution_id: &ExecutionId,
1057) -> Result<(), crate::EngineError> {
1058    backend.renew(handle).await.map(|_| ())
1059}
1060
1061/// Spawn a background tokio task that renews the lease at `ttl / 3`
1062/// intervals.
1063///
1064/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1065/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1066/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1067/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1068/// + the encoded `Handle` instead. Per-tick tracing lives on
1069///   [`renew_once`]; this function itself is sync + one-shot.
1070///
1071/// Stops when:
1072/// - `stop_signal` is notified (complete/fail/cancel called)
1073/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1074/// - The task handle is aborted (ClaimedTask dropped)
1075fn spawn_renewal_task(
1076    backend: Arc<dyn EngineBackend>,
1077    handle: Handle,
1078    execution_id: ExecutionId,
1079    lease_ttl_ms: u64,
1080    stop_signal: Arc<Notify>,
1081    failure_counter: Arc<AtomicU32>,
1082) -> JoinHandle<()> {
1083    // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1084    // panics if a caller (or a misconfigured test) passes a
1085    // lease_ttl_ms < 3. The SDK config validator already enforces
1086    // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1087    // is a cheap belt-and-suspenders (Copilot review finding on
1088    // PR #119).
1089    let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1090
1091    tokio::spawn(async move {
1092        let mut tick = tokio::time::interval(interval);
1093        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1094        // Skip the first immediate tick — the lease was just acquired.
1095        tick.tick().await;
1096
1097        loop {
1098            tokio::select! {
1099                _ = stop_signal.notified() => {
1100                    tracing::debug!(
1101                        execution_id = %execution_id,
1102                        "lease renewal stopped by signal"
1103                    );
1104                    return;
1105                }
1106                _ = tick.tick() => {
1107                    match renew_once(backend.as_ref(), &handle, &execution_id).await {
1108                        Ok(_renewal) => {
1109                            failure_counter.store(0, Ordering::Relaxed);
1110                            tracing::trace!(
1111                                execution_id = %execution_id,
1112                                "lease renewed"
1113                            );
1114                        }
1115                        Err(e) if is_terminal_renewal_error(&e) => {
1116                            failure_counter.fetch_add(1, Ordering::Relaxed);
1117                            tracing::warn!(
1118                                execution_id = %execution_id,
1119                                error = %e,
1120                                "lease renewal failed with terminal error, stopping renewal"
1121                            );
1122                            return;
1123                        }
1124                        Err(e) => {
1125                            let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1126                            tracing::warn!(
1127                                execution_id = %execution_id,
1128                                error = %e,
1129                                consecutive_failures = count,
1130                                "lease renewal failed (will retry next interval)"
1131                            );
1132                        }
1133                    }
1134                }
1135            }
1136        }
1137    })
1138}
1139
1140/// Check if an engine error means renewal should stop permanently.
1141#[allow(dead_code)]
1142fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1143    use crate::{ContentionKind, EngineError, StateKind};
1144    matches!(
1145        err,
1146        EngineError::State(
1147            StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1148        ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1149            | EngineError::NotFound { entity: "execution" }
1150    )
1151}
1152
1153// ── FCALL result parsing ──
1154
1155/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1156/// function into a typed [`ReportUsageResult`].
1157///
1158/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1159///                  `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1160/// Status code `!= 1` is parsed as a [`ScriptError`] via
1161/// [`ScriptError::from_code_with_detail`].
1162///
1163/// Exposed as `pub` so downstream SDKs that speak the same wire format
1164/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1165/// call this directly instead of re-implementing the parse. Keeping one
1166/// parser paired with the producer (the Lua function registered at
1167/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1168/// against silent format drift between producer and consumer.
1169pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1170    let arr = match raw {
1171        Value::Array(arr) => arr,
1172        _ => {
1173            return Err(SdkError::from(ScriptError::Parse {
1174                fcall: "parse_report_usage_result".into(),
1175                execution_id: None,
1176                message: "ff_report_usage_and_check: expected Array".into(),
1177            }));
1178        }
1179    };
1180    let status_code = match arr.first() {
1181        Some(Ok(Value::Int(n))) => *n,
1182        _ => {
1183            return Err(SdkError::from(ScriptError::Parse {
1184                fcall: "parse_report_usage_result".into(),
1185                execution_id: None,
1186                message: "ff_report_usage_and_check: expected Int status code".into(),
1187            }));
1188        }
1189    };
1190    if status_code != 1 {
1191        let error_code = usage_field_str(arr, 1);
1192        let detail = usage_field_str(arr, 2);
1193        return Err(SdkError::from(
1194            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1195                ScriptError::Parse {
1196                    fcall: "parse_report_usage_result".into(),
1197                    execution_id: None,
1198                    message: format!("ff_report_usage_and_check: {error_code}"),
1199                }
1200            }),
1201        ));
1202    }
1203    let sub_status = usage_field_str(arr, 1);
1204    match sub_status.as_str() {
1205        "OK" => Ok(ReportUsageResult::Ok),
1206        "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1207        "SOFT_BREACH" => {
1208            let dim = usage_field_str(arr, 2);
1209            let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1210            let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1211            Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1212        }
1213        "HARD_BREACH" => {
1214            let dim = usage_field_str(arr, 2);
1215            let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1216            let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1217            Ok(ReportUsageResult::HardBreach {
1218                dimension: dim,
1219                current_usage: current,
1220                hard_limit: limit,
1221            })
1222        }
1223        _ => Err(SdkError::from(ScriptError::Parse {
1224            fcall: "parse_report_usage_result".into(),
1225            execution_id: None,
1226            message: format!(
1227            "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1228        ),
1229        })),
1230    }
1231}
1232
1233fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1234    match arr.get(index) {
1235        Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1236        Some(Ok(Value::SimpleString(s))) => s.clone(),
1237        Some(Ok(Value::Int(n))) => n.to_string(),
1238        _ => String::new(),
1239    }
1240}
1241
1242/// Parse a required numeric usage field (u64) from the wire array at
1243/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1244/// holds a non-string/non-int value, or contains a string that does
1245/// not parse as u64.
1246///
1247/// Rationale: the Lua producer (`lua/budget.lua:99`,
1248/// `ff_report_usage_and_check`) always emits
1249/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1250/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1251/// non-numeric value here means the Lua and Rust sides drifted;
1252/// silently coercing to `0` would surface drift as "zero-usage breach"
1253/// — arithmetically correct but semantically nonsense. Fail loudly
1254/// instead so drift shows up as a parse error at the first call site.
1255fn parse_usage_u64(
1256    arr: &[Result<Value, ferriskey::Error>],
1257    index: usize,
1258    sub_status: &str,
1259    field_name: &str,
1260) -> Result<u64, SdkError> {
1261    match arr.get(index) {
1262        Some(Ok(Value::Int(n))) => {
1263            u64::try_from(*n).map_err(|_| {
1264                SdkError::from(ScriptError::Parse {
1265                    fcall: "parse_usage_u64".into(),
1266                    execution_id: None,
1267                    message: format!(
1268                    "ff_report_usage_and_check {sub_status}: {field_name} \
1269                     (index {index}) negative int {n} cannot be u64"
1270                ),
1271                })
1272            })
1273        }
1274        Some(Ok(Value::BulkString(b))) => {
1275            let s = String::from_utf8_lossy(b);
1276            s.parse::<u64>().map_err(|_| {
1277                SdkError::from(ScriptError::Parse {
1278                    fcall: "parse_usage_u64".into(),
1279                    execution_id: None,
1280                    message: format!(
1281                    "ff_report_usage_and_check {sub_status}: {field_name} \
1282                     (index {index}) not a u64 string: {s:?}"
1283                ),
1284                })
1285            })
1286        }
1287        Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1288            SdkError::from(ScriptError::Parse {
1289                fcall: "parse_usage_u64".into(),
1290                execution_id: None,
1291                message: format!(
1292                "ff_report_usage_and_check {sub_status}: {field_name} \
1293                 (index {index}) not a u64 string: {s:?}"
1294            ),
1295            })
1296        }),
1297        Some(_) => Err(SdkError::from(ScriptError::Parse {
1298            fcall: "parse_usage_u64".into(),
1299            execution_id: None,
1300            message: format!(
1301            "ff_report_usage_and_check {sub_status}: {field_name} \
1302             (index {index}) wrong wire type (expected Int or String)"
1303        ),
1304        })),
1305        None => Err(SdkError::from(ScriptError::Parse {
1306            fcall: "parse_usage_u64".into(),
1307            execution_id: None,
1308            message: format!(
1309            "ff_report_usage_and_check {sub_status}: {field_name} \
1310             (index {index}) missing from response"
1311        ),
1312        })),
1313    }
1314}
1315
1316/// Pure helper: decide whether `suspension:current` represents a
1317/// signal-driven resume for the currently-claimed attempt, and extract
1318/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1319/// (no record, stale prior-attempt, non-resumed close). Returns an error
1320/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1321/// bug rather than a missing-data case.
1322///
1323/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1324/// `EngineBackend::observe_signals`, which re-implements this invariant
1325/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1326/// tests so the parsing contract stays exercised at the SDK layer —
1327/// Stage 1d will consolidate (either promote the helper into ff-core
1328/// or drop these tests once the backend-side tests cover equivalent
1329/// ground).
1330#[allow(dead_code)]
1331fn resume_waitpoint_id_from_suspension(
1332    susp: &HashMap<String, String>,
1333    claimed_attempt: AttemptIndex,
1334) -> Result<Option<WaitpointId>, SdkError> {
1335    if susp.is_empty() {
1336        return Ok(None);
1337    }
1338    let susp_att: u32 = susp
1339        .get("attempt_index")
1340        .and_then(|s| s.parse().ok())
1341        .unwrap_or(u32::MAX);
1342    if susp_att != claimed_attempt.0 {
1343        return Ok(None);
1344    }
1345    let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1346    if close_reason != "resumed" {
1347        return Ok(None);
1348    }
1349    let wp_id_str = susp
1350        .get("waitpoint_id")
1351        .map(String::as_str)
1352        .unwrap_or_default();
1353    if wp_id_str.is_empty() {
1354        return Ok(None);
1355    }
1356    let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1357        SdkError::from(ScriptError::Parse {
1358            fcall: "resume_waitpoint_id_from_suspension".into(),
1359            execution_id: None,
1360            message: format!(
1361            "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1362        ),
1363        })
1364    })?;
1365    Ok(Some(waitpoint_id))
1366}
1367
1368pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1369    let arr = match raw {
1370        Value::Array(arr) => arr,
1371        _ => {
1372            return Err(SdkError::from(ScriptError::Parse {
1373                fcall: "parse_success_result".into(),
1374                execution_id: None,
1375                message: format!(
1376                "{function_name}: expected Array, got non-array"
1377            ),
1378            }));
1379        }
1380    };
1381
1382    if arr.is_empty() {
1383        return Err(SdkError::from(ScriptError::Parse {
1384            fcall: "parse_success_result".into(),
1385            execution_id: None,
1386            message: format!(
1387            "{function_name}: empty result array"
1388        ),
1389        }));
1390    }
1391
1392    let status_code = match arr.first() {
1393        Some(Ok(Value::Int(n))) => *n,
1394        _ => {
1395            return Err(SdkError::from(ScriptError::Parse {
1396                fcall: "parse_success_result".into(),
1397                execution_id: None,
1398                message: format!(
1399                "{function_name}: expected Int at index 0"
1400            ),
1401            }));
1402        }
1403    };
1404
1405    if status_code == 1 {
1406        Ok(())
1407    } else {
1408        // Extract error code from index 1 and optional detail from index 2
1409        // (e.g. `capability_mismatch` ships missing tokens there). Variants
1410        // that carry a String payload pick the detail up via
1411        // `from_code_with_detail`; other variants ignore it.
1412        let field_str = |idx: usize| -> String {
1413            arr.get(idx)
1414                .and_then(|v| match v {
1415                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1416                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1417                    _ => None,
1418                })
1419                .unwrap_or_default()
1420        };
1421        let error_code = {
1422            let s = field_str(1);
1423            if s.is_empty() { "unknown".to_owned() } else { s }
1424        };
1425        // Collect all detail slots (idx >= 2). Most variants only read
1426        // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1427        // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1428        // reconciliation after a network drop.
1429        let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1430        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1431
1432        let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1433            .unwrap_or_else(|| {
1434                ScriptError::Parse {
1435                    fcall: "parse_success_result".into(),
1436                    execution_id: None,
1437                    message: format!("{function_name}: unknown error: {error_code}"),
1438                }
1439            });
1440
1441        Err(SdkError::from(script_err))
1442    }
1443}
1444
1445/// Parse ff_suspend_execution result:
1446///   ok(suspension_id, waitpoint_id, waitpoint_key)
1447///   ok_already_satisfied(suspension_id, waitpoint_id, waitpoint_key)
1448fn parse_suspend_result(
1449    raw: &Value,
1450    suspension_id: SuspensionId,
1451    waitpoint_id: WaitpointId,
1452    waitpoint_key: String,
1453) -> Result<SuspendOutcome, SdkError> {
1454    let arr = match raw {
1455        Value::Array(arr) => arr,
1456        _ => {
1457            return Err(SdkError::from(ScriptError::Parse {
1458                fcall: "parse_suspend_result".into(),
1459                execution_id: None,
1460                message: "ff_suspend_execution: expected Array".into(),
1461            }));
1462        }
1463    };
1464
1465    let status_code = match arr.first() {
1466        Some(Ok(Value::Int(n))) => *n,
1467        _ => {
1468            return Err(SdkError::from(ScriptError::Parse {
1469                fcall: "parse_suspend_result".into(),
1470                execution_id: None,
1471                message: "ff_suspend_execution: bad status code".into(),
1472            }));
1473        }
1474    };
1475
1476    if status_code != 1 {
1477        let err_field_str = |idx: usize| -> String {
1478            arr.get(idx)
1479                .and_then(|v| match v {
1480                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1481                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1482                    _ => None,
1483                })
1484                .unwrap_or_default()
1485        };
1486        let error_code = {
1487            let s = err_field_str(1);
1488            if s.is_empty() { "unknown".to_owned() } else { s }
1489        };
1490        let detail = err_field_str(2);
1491        return Err(SdkError::from(
1492            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1493                ScriptError::Parse {
1494                    fcall: "parse_suspend_result".into(),
1495                    execution_id: None,
1496                    message: format!("ff_suspend_execution: {error_code}"),
1497                }
1498            }),
1499        ));
1500    }
1501
1502    // Check sub-status at index 1
1503    let sub_status = arr
1504        .get(1)
1505        .and_then(|v| match v {
1506            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1507            Ok(Value::SimpleString(s)) => Some(s.clone()),
1508            _ => None,
1509        })
1510        .unwrap_or_default();
1511
1512    // Lua returns: {1, status, suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
1513    // The suspension_id/waitpoint_id/waitpoint_key values the worker passed in are
1514    // authoritative (Lua echoes them); waitpoint_token however is MINTED by Lua and
1515    // must be read from the response.
1516    let waitpoint_token = arr
1517        .get(5)
1518        .and_then(|v| match v {
1519            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1520            Ok(Value::SimpleString(s)) => Some(s.clone()),
1521            _ => None,
1522        })
1523        .map(WaitpointToken::new)
1524        .ok_or_else(|| {
1525            SdkError::from(ScriptError::Parse {
1526                fcall: "parse_suspend_result".into(),
1527                execution_id: None,
1528                message: "ff_suspend_execution: missing waitpoint_token in response".into(),
1529            })
1530        })?;
1531
1532    if sub_status == "ALREADY_SATISFIED" {
1533        Ok(SuspendOutcome::AlreadySatisfied {
1534            suspension_id,
1535            waitpoint_id,
1536            waitpoint_key,
1537            waitpoint_token,
1538        })
1539    } else {
1540        Ok(SuspendOutcome::Suspended {
1541            suspension_id,
1542            waitpoint_id,
1543            waitpoint_key,
1544            waitpoint_token,
1545        })
1546    }
1547}
1548
1549/// Parse ff_deliver_signal result:
1550///   ok(signal_id, effect)
1551///   ok_duplicate(existing_signal_id)
1552pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1553    let arr = match raw {
1554        Value::Array(arr) => arr,
1555        _ => {
1556            return Err(SdkError::from(ScriptError::Parse {
1557                fcall: "parse_signal_result".into(),
1558                execution_id: None,
1559                message: "ff_deliver_signal: expected Array".into(),
1560            }));
1561        }
1562    };
1563
1564    let status_code = match arr.first() {
1565        Some(Ok(Value::Int(n))) => *n,
1566        _ => {
1567            return Err(SdkError::from(ScriptError::Parse {
1568                fcall: "parse_signal_result".into(),
1569                execution_id: None,
1570                message: "ff_deliver_signal: bad status code".into(),
1571            }));
1572        }
1573    };
1574
1575    if status_code != 1 {
1576        let err_field_str = |idx: usize| -> String {
1577            arr.get(idx)
1578                .and_then(|v| match v {
1579                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1580                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1581                    _ => None,
1582                })
1583                .unwrap_or_default()
1584        };
1585        let error_code = {
1586            let s = err_field_str(1);
1587            if s.is_empty() { "unknown".to_owned() } else { s }
1588        };
1589        let detail = err_field_str(2);
1590        return Err(SdkError::from(
1591            ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1592                ScriptError::Parse {
1593                    fcall: "parse_signal_result".into(),
1594                    execution_id: None,
1595                    message: format!("ff_deliver_signal: {error_code}"),
1596                }
1597            }),
1598        ));
1599    }
1600
1601    let sub_status = arr
1602        .get(1)
1603        .and_then(|v| match v {
1604            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1605            Ok(Value::SimpleString(s)) => Some(s.clone()),
1606            _ => None,
1607        })
1608        .unwrap_or_default();
1609
1610    if sub_status == "DUPLICATE" {
1611        let existing_id = arr
1612            .get(2)
1613            .and_then(|v| match v {
1614                Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1615                Ok(Value::SimpleString(s)) => Some(s.clone()),
1616                _ => None,
1617            })
1618            .unwrap_or_default();
1619        return Ok(SignalOutcome::Duplicate {
1620            existing_signal_id: existing_id,
1621        });
1622    }
1623
1624    // Parse: {1, "OK", signal_id, effect}
1625    let signal_id_str = arr
1626        .get(2)
1627        .and_then(|v| match v {
1628            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1629            Ok(Value::SimpleString(s)) => Some(s.clone()),
1630            _ => None,
1631        })
1632        .unwrap_or_default();
1633
1634    let effect = arr
1635        .get(3)
1636        .and_then(|v| match v {
1637            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1638            Ok(Value::SimpleString(s)) => Some(s.clone()),
1639            _ => None,
1640        })
1641        .unwrap_or_default();
1642
1643    let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1644        SdkError::from(ScriptError::Parse {
1645            fcall: "parse_signal_result".into(),
1646            execution_id: None,
1647            message: format!(
1648            "ff_deliver_signal: invalid signal_id from Lua: {e}"
1649        ),
1650        })
1651    })?;
1652
1653    if effect == "resume_condition_satisfied" {
1654        Ok(SignalOutcome::TriggeredResume { signal_id })
1655    } else {
1656        Ok(SignalOutcome::Accepted { signal_id, effect })
1657    }
1658}
1659
1660/// Parse ff_fail_execution result:
1661///   ok("retry_scheduled", delay_until)
1662///   ok("terminal_failed")
1663#[allow(dead_code)]
1664fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1665    let arr = match raw {
1666        Value::Array(arr) => arr,
1667        _ => {
1668            return Err(SdkError::from(ScriptError::Parse {
1669                fcall: "parse_fail_result".into(),
1670                execution_id: None,
1671                message: "ff_fail_execution: expected Array".into(),
1672            }));
1673        }
1674    };
1675
1676    let status_code = match arr.first() {
1677        Some(Ok(Value::Int(n))) => *n,
1678        _ => {
1679            return Err(SdkError::from(ScriptError::Parse {
1680                fcall: "parse_fail_result".into(),
1681                execution_id: None,
1682                message: "ff_fail_execution: bad status code".into(),
1683            }));
1684        }
1685    };
1686
1687    if status_code != 1 {
1688        let err_field_str = |idx: usize| -> String {
1689            arr.get(idx)
1690                .and_then(|v| match v {
1691                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1692                    Ok(Value::SimpleString(s)) => Some(s.clone()),
1693                    _ => None,
1694                })
1695                .unwrap_or_default()
1696        };
1697        let error_code = {
1698            let s = err_field_str(1);
1699            if s.is_empty() { "unknown".to_owned() } else { s }
1700        };
1701        let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1702        let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1703        return Err(SdkError::from(
1704            ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1705                ScriptError::Parse {
1706                    fcall: "parse_fail_result".into(),
1707                    execution_id: None,
1708                    message: format!("ff_fail_execution: {error_code}"),
1709                }
1710            }),
1711        ));
1712    }
1713
1714    // Parse sub-status from field[2] (index 2 = first field after status+OK)
1715    let sub_status = arr
1716        .get(2)
1717        .and_then(|v| match v {
1718            Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1719            Ok(Value::SimpleString(s)) => Some(s.clone()),
1720            _ => None,
1721        })
1722        .unwrap_or_default();
1723
1724    match sub_status.as_str() {
1725        "retry_scheduled" => {
1726            // Lua returns: ok("retry_scheduled", tostring(delay_until))
1727            // arr[3] = delay_until
1728            let delay_str = arr
1729                .get(3)
1730                .and_then(|v| match v {
1731                    Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1732                    Ok(Value::Int(n)) => Some(n.to_string()),
1733                    _ => None,
1734                })
1735                .unwrap_or_default();
1736            let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1737
1738            Ok(FailOutcome::RetryScheduled {
1739                delay_until: TimestampMs::from_millis(delay_until),
1740            })
1741        }
1742        "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1743        _ => Err(SdkError::from(ScriptError::Parse {
1744            fcall: "parse_fail_result".into(),
1745            execution_id: None,
1746            message: format!(
1747            "ff_fail_execution: unexpected sub-status: {sub_status}"
1748        ),
1749        })),
1750    }
1751}
1752
1753// ── Stream read / tail (consumer API, RFC-006 #2) ──
1754
1755/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1756/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1757/// server would accept.
1758pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1759
1760/// Maximum frames per read/tail call. Mirrors
1761/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1762/// callers don't need to import ff-core just to read the bound.
1763pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1764
1765/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1766/// signal so polling consumers can exit cleanly.
1767///
1768/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1769pub use ff_core::contracts::StreamFrames;
1770
1771/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1772/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1773/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1774/// `StreamCursor::Start` / `StreamCursor::End` instead.
1775pub use ff_core::contracts::StreamCursor;
1776
1777/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1778/// — XREAD does not accept the open markers. Pulled out as a bare
1779/// function so unit tests can exercise the guard without constructing a
1780/// live `ferriskey::Client`.
1781fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1782    if !after.is_concrete() {
1783        return Err(SdkError::Config {
1784            context: "tail_stream".into(),
1785            field: Some("after".into()),
1786            message: "XREAD cursor must be a concrete entry id; pass \
1787                      StreamCursor::from_beginning() to start from the \
1788                      beginning"
1789                .into(),
1790        });
1791    }
1792    Ok(())
1793}
1794
1795fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1796    if count_limit == 0 {
1797        return Err(SdkError::Config {
1798            context: "read_stream_frames".into(),
1799            field: Some("count_limit".into()),
1800            message: "count_limit must be >= 1".into(),
1801        });
1802    }
1803    if count_limit > STREAM_READ_HARD_CAP {
1804        return Err(SdkError::Config {
1805            context: "read_stream_frames".into(),
1806            field: Some("count_limit".into()),
1807            message: format!(
1808                "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1809            ),
1810        });
1811    }
1812    Ok(())
1813}
1814
1815/// Read frames from a completed or in-flight attempt's stream.
1816///
1817/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1818/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1819/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1820/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1821/// `0` returns [`SdkError::Config`].
1822///
1823/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1824/// consumers know when the producer has finalized the stream. A
1825/// never-written attempt and an in-progress stream are indistinguishable
1826/// here — both present as `frames=[]`, `closed_at=None`.
1827///
1828/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1829/// client but are not the lease-holding worker — no lease check is
1830/// performed.
1831///
1832/// # Head-of-line note
1833///
1834/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1835/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1836/// calling this on a `client` that is also serving FCALLs stalls those
1837/// FCALLs behind the reply. The REST server isolates reads on its
1838/// `tail_client`; direct SDK callers should either use a dedicated
1839/// client OR paginate through smaller `count_limit` slices.
1840pub async fn read_stream(
1841    backend: &dyn EngineBackend,
1842    execution_id: &ExecutionId,
1843    attempt_index: AttemptIndex,
1844    from: StreamCursor,
1845    to: StreamCursor,
1846    count_limit: u64,
1847) -> Result<StreamFrames, SdkError> {
1848    validate_stream_read_count(count_limit)?;
1849    Ok(backend
1850        .read_stream(execution_id, attempt_index, from, to, count_limit)
1851        .await?)
1852}
1853
1854/// Tail a live attempt's stream.
1855///
1856/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1857/// with id strictly greater than `after`. Pass
1858/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1859/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1860/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1861/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1862///
1863/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1864/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1865/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1866/// SDK and REST ceilings aligned.
1867///
1868/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1869/// polling consumers should loop until `result.is_closed()` is true, then
1870/// drain and exit. Timeout with no new frames presents as
1871/// `frames=[], closed_at=None`.
1872///
1873/// # Head-of-line warning — use a dedicated client
1874///
1875/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1876/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1877/// the read side until a frame arrives or the block elapses. If the
1878/// `client` you pass here is ALSO used for claims, completes, fails,
1879/// appends, or any other FCALL, a 30-second tail will stall all those
1880/// calls for up to 30 seconds.
1881///
1882/// **Strongly recommended**: build a separate `ferriskey::Client` for
1883/// tail callers — mirrors the `Server::tail_client` split that the REST
1884/// server uses internally (see `crates/ff-server/src/server.rs` and
1885/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1886///
1887/// # Tail parallelism caveat (same mux)
1888///
1889/// Even a dedicated tail client is still one multiplexed TCP connection.
1890/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1891/// ferriskey's per-call `request_timeout` starts at future-poll — so
1892/// two concurrent tails against the same client can time out spuriously:
1893/// the second call's BLOCK budget elapses while it waits for the first
1894/// BLOCK to return. The REST server handles this internally with a
1895/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1896/// each call its full `block_ms` budget at the server.
1897///
1898/// **Direct SDK callers that need concurrent tails**: either
1899///   (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1900///       pool of clients, rotated by the caller), OR
1901///   (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1902///       only one BLOCK is in flight per client at a time.
1903/// If you need the REST-side backpressure (429 on contention) and the
1904/// built-in serializer, go through the
1905/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1906/// than calling this directly.
1907///
1908/// This SDK does not enforce either pattern — the mutex belongs at the
1909/// application layer, and the connection pool belongs at the SDK
1910/// caller's DI layer; neither has a structured place inside this
1911/// helper.
1912///
1913/// # Timeout handling
1914///
1915/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1916/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1917/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1918/// 500ms)`, overriding the client's default per-call. A tail with
1919/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1920/// the client was built with a shorter `request_timeout`. No custom client
1921/// configuration is required for timeout reasons — only for head-of-line
1922/// isolation above.
1923pub async fn tail_stream(
1924    backend: &dyn EngineBackend,
1925    execution_id: &ExecutionId,
1926    attempt_index: AttemptIndex,
1927    after: StreamCursor,
1928    block_ms: u64,
1929    count_limit: u64,
1930) -> Result<StreamFrames, SdkError> {
1931    if block_ms > MAX_TAIL_BLOCK_MS {
1932        return Err(SdkError::Config {
1933            context: "tail_stream".into(),
1934            field: Some("block_ms".into()),
1935            message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1936        });
1937    }
1938    validate_stream_read_count(count_limit)?;
1939    // XREAD does not accept `-` / `+` markers as cursors — reject at
1940    // the SDK boundary with `SdkError::Config` rather than forwarding
1941    // an invalid `-`/`+` into the backend (which would surface as an
1942    // opaque `EngineError::Transport`).
1943    validate_tail_cursor(&after)?;
1944
1945    Ok(backend
1946        .tail_stream(execution_id, attempt_index, after, block_ms, count_limit)
1947        .await?)
1948}
1949
1950#[cfg(test)]
1951mod tail_stream_boundary_tests {
1952    use super::*;
1953
1954    // `validate_tail_cursor` rejects `StreamCursor::Start` and
1955    // `StreamCursor::End` before `tail_stream` touches the client —
1956    // same shape as the `count_limit` guard above. The matching
1957    // full-path rejection on the REST layer is covered by
1958    // `ff-server::api`.
1959
1960    #[test]
1961    fn rejects_start_cursor() {
1962        let err = validate_tail_cursor(&StreamCursor::Start)
1963            .expect_err("Start must be rejected");
1964        match err {
1965            SdkError::Config { field, context, .. } => {
1966                assert_eq!(field.as_deref(), Some("after"));
1967                assert_eq!(context, "tail_stream");
1968            }
1969            other => panic!("expected SdkError::Config, got {other:?}"),
1970        }
1971    }
1972
1973    #[test]
1974    fn rejects_end_cursor() {
1975        let err = validate_tail_cursor(&StreamCursor::End)
1976            .expect_err("End must be rejected");
1977        assert!(matches!(err, SdkError::Config { .. }));
1978    }
1979
1980    #[test]
1981    fn accepts_at_cursor() {
1982        validate_tail_cursor(&StreamCursor::At("0-0".into()))
1983            .expect("At cursor must be accepted");
1984        validate_tail_cursor(&StreamCursor::from_beginning())
1985            .expect("from_beginning() must be accepted");
1986        validate_tail_cursor(&StreamCursor::At("123-0".into()))
1987            .expect("concrete id must be accepted");
1988    }
1989}
1990
1991#[cfg(test)]
1992mod parse_report_usage_result_tests {
1993    use super::*;
1994
1995    /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
1996    /// BulkString and SimpleString uniformly (see
1997    /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
1998    /// `Value::SimpleString(s)` → clone). SimpleString avoids a
1999    /// dev-dependency on `bytes` just for test construction.
2000    fn s(v: &str) -> Result<Value, ferriskey::Error> {
2001        Ok(Value::SimpleString(v.to_owned()))
2002    }
2003
2004    fn int(n: i64) -> Result<Value, ferriskey::Error> {
2005        Ok(Value::Int(n))
2006    }
2007
2008    fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2009        Value::Array(items)
2010    }
2011
2012    #[test]
2013    fn ok_status() {
2014        let raw = arr(vec![int(1), s("OK")]);
2015        assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2016    }
2017
2018    #[test]
2019    fn already_applied_status() {
2020        let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2021        assert_eq!(
2022            parse_report_usage_result(&raw).unwrap(),
2023            ReportUsageResult::AlreadyApplied
2024        );
2025    }
2026
2027    #[test]
2028    fn soft_breach_status() {
2029        let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2030        match parse_report_usage_result(&raw).unwrap() {
2031            ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2032                assert_eq!(dimension, "tokens");
2033                assert_eq!(current_usage, 150);
2034                assert_eq!(soft_limit, 100);
2035            }
2036            other => panic!("expected SoftBreach, got {other:?}"),
2037        }
2038    }
2039
2040    #[test]
2041    fn hard_breach_status() {
2042        let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2043        match parse_report_usage_result(&raw).unwrap() {
2044            ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2045                assert_eq!(dimension, "requests");
2046                assert_eq!(current_usage, 10001);
2047                assert_eq!(hard_limit, 10000);
2048            }
2049            other => panic!("expected HardBreach, got {other:?}"),
2050        }
2051    }
2052
2053    /// Negative case: non-Array input. Guards against a future Lua refactor
2054    /// that accidentally returns a bare string/int — the parser must fail
2055    /// loudly rather than silently succeed or panic.
2056    #[test]
2057    fn non_array_input_is_parse_error() {
2058        let raw = Value::SimpleString("OK".to_owned());
2059        let err = parse_report_usage_result(&raw).unwrap_err();
2060        let msg = format!("{err}");
2061        assert!(
2062            msg.to_lowercase().contains("expected array"),
2063            "error should mention expected shape, got: {msg}"
2064        );
2065    }
2066
2067    /// Negative case: Array whose first element isn't an Int status code.
2068    /// The Lua function's first return slot is always `status_code` (1 on
2069    /// success, an error code otherwise); a non-Int there is a wire-format
2070    /// break that must surface as a parse error.
2071    #[test]
2072    fn first_element_non_int_is_parse_error() {
2073        let raw = arr(vec![s("not_an_int"), s("OK")]);
2074        let err = parse_report_usage_result(&raw).unwrap_err();
2075        let msg = format!("{err}");
2076        assert!(
2077            msg.to_lowercase().contains("int"),
2078            "error should mention Int status code, got: {msg}"
2079        );
2080    }
2081
2082    /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2083    /// field. Guards against the silent-coercion defect cross-review
2084    /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2085    /// which would have surfaced Lua-side wire-format drift as a
2086    /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2087    /// but semantically wrong (a "breach with zero usage" is nonsense
2088    /// and masks the real error).
2089    #[test]
2090    fn soft_breach_non_numeric_current_is_parse_error() {
2091        let raw = arr(vec![
2092            int(1),
2093            s("SOFT_BREACH"),
2094            s("tokens"),
2095            s("not_a_number"), // current_usage — must fail, not coerce to 0
2096            s("100"),
2097        ]);
2098        let err = parse_report_usage_result(&raw).unwrap_err();
2099        let msg = format!("{err}");
2100        assert!(
2101            msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2102            "error should identify sub-status + field, got: {msg}"
2103        );
2104        assert!(
2105            msg.to_lowercase().contains("u64"),
2106            "error should mention the expected type (u64), got: {msg}"
2107        );
2108    }
2109
2110    /// Negative case: HARD_BREACH with the limit slot missing
2111    /// entirely. Same defence as the non-numeric test above: a
2112    /// truncated response must fail loudly rather than coerce to 0.
2113    #[test]
2114    fn hard_breach_missing_limit_is_parse_error() {
2115        let raw = arr(vec![
2116            int(1),
2117            s("HARD_BREACH"),
2118            s("requests"),
2119            s("10001"),
2120            // no index 4 — hard_limit missing
2121        ]);
2122        let err = parse_report_usage_result(&raw).unwrap_err();
2123        let msg = format!("{err}");
2124        assert!(
2125            msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2126            "error should identify sub-status + field, got: {msg}"
2127        );
2128        assert!(
2129            msg.to_lowercase().contains("missing"),
2130            "error should say 'missing', got: {msg}"
2131        );
2132    }
2133}
2134
2135
2136#[cfg(test)]
2137mod resume_signals_tests {
2138    use super::*;
2139
2140    fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2141        pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2142    }
2143
2144    #[test]
2145    fn empty_suspension_returns_none() {
2146        let susp = m(&[]);
2147        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2148        assert!(out.is_none(), "no suspension record → None");
2149    }
2150
2151    #[test]
2152    fn stale_prior_attempt_returns_none() {
2153        let wp = WaitpointId::new();
2154        let susp = m(&[
2155            ("attempt_index", "0"),
2156            ("close_reason", "resumed"),
2157            ("waitpoint_id", &wp.to_string()),
2158        ]);
2159        // Claimed attempt is 1; suspension belongs to 0 → stale.
2160        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2161        assert!(out.is_none(), "attempt_index mismatch → None");
2162    }
2163
2164    #[test]
2165    fn non_resumed_close_returns_none() {
2166        let wp = WaitpointId::new();
2167        for reason in ["timeout", "cancelled", "", "expired"] {
2168            let susp = m(&[
2169                ("attempt_index", "0"),
2170                ("close_reason", reason),
2171                ("waitpoint_id", &wp.to_string()),
2172            ]);
2173            let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2174            assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2175        }
2176    }
2177
2178    #[test]
2179    fn resumed_same_attempt_returns_waitpoint() {
2180        let wp = WaitpointId::new();
2181        let susp = m(&[
2182            ("attempt_index", "2"),
2183            ("close_reason", "resumed"),
2184            ("waitpoint_id", &wp.to_string()),
2185        ]);
2186        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2187        assert_eq!(out, Some(wp));
2188    }
2189
2190    #[test]
2191    fn malformed_waitpoint_id_is_error() {
2192        let susp = m(&[
2193            ("attempt_index", "0"),
2194            ("close_reason", "resumed"),
2195            ("waitpoint_id", "not-a-uuid"),
2196        ]);
2197        let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2198        assert!(
2199            format!("{err}").contains("not a valid UUID"),
2200            "error should mention invalid UUID, got: {err}"
2201        );
2202    }
2203
2204    #[test]
2205    fn empty_waitpoint_id_returns_none() {
2206        // Defensive: an empty waitpoint_id field (shouldn't happen on
2207        // resumed records, but guard against partial writes) is None, not an error.
2208        let susp = m(&[
2209            ("attempt_index", "0"),
2210            ("close_reason", "resumed"),
2211            ("waitpoint_id", ""),
2212        ]);
2213        let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2214        assert!(out.is_none());
2215    }
2216
2217    // The previous `matched_signal_ids_from_condition` helper was
2218    // removed when the production path switched from unbounded
2219    // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2220    // feedback on unbounded condition-hash reply size). That loop
2221    // is exercised by the integration tests in `ff-test`.
2222}
2223
2224#[cfg(test)]
2225mod terminal_replay_parsing_tests {
2226    //! Unit tests for the SDK's parse path of the enriched
2227    //! `execution_not_active` error returned on a terminal-op replay.
2228    //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2229    //! the Lua side emits the 4-slot detail; these tests prove the
2230    //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2231    //! variant so the reconciler in complete()/fail()/cancel() can
2232    //! match on them.
2233
2234    use super::*;
2235    use ferriskey::Value;
2236
2237    // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2238    // in the test harness. parse_success_result + parse_fail_result handle both.
2239    fn bulk(s: &str) -> Value {
2240        Value::SimpleString(s.to_owned())
2241    }
2242
2243    /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2244    #[test]
2245    fn parse_success_result_extracts_all_four_detail_slots() {
2246        let raw = Value::Array(vec![
2247            Ok(Value::Int(0)),
2248            Ok(bulk("execution_not_active")),
2249            Ok(bulk("success")),
2250            Ok(bulk("42")),
2251            Ok(bulk("terminal")),
2252            Ok(bulk("11111111-1111-1111-1111-111111111111")),
2253        ]);
2254        let err = parse_success_result(&raw, "test").unwrap_err();
2255        let unboxed = match err {
2256            SdkError::Engine(b) => *b,
2257            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2258        };
2259        match unboxed {
2260            crate::EngineError::Contention(
2261                crate::ContentionKind::ExecutionNotActive {
2262                    terminal_outcome,
2263                    lease_epoch,
2264                    lifecycle_phase,
2265                    attempt_id,
2266                },
2267            ) => {
2268                assert_eq!(terminal_outcome, "success");
2269                assert_eq!(lease_epoch, "42");
2270                assert_eq!(lifecycle_phase, "terminal");
2271                assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2272            }
2273            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2274        }
2275    }
2276
2277    /// parse_fail_result must extract all detail slots too so the
2278    /// reconciler in fail() can match lifecycle_phase = "runnable" for
2279    /// retry-scheduled replays.
2280    #[test]
2281    fn parse_fail_result_extracts_all_four_detail_slots() {
2282        let raw = Value::Array(vec![
2283            Ok(Value::Int(0)),
2284            Ok(bulk("execution_not_active")),
2285            Ok(bulk("none")),
2286            Ok(bulk("7")),
2287            Ok(bulk("runnable")),
2288            Ok(bulk("22222222-2222-2222-2222-222222222222")),
2289        ]);
2290        let err = parse_fail_result(&raw).unwrap_err();
2291        let unboxed = match err {
2292            SdkError::Engine(b) => *b,
2293            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2294        };
2295        match unboxed {
2296            crate::EngineError::Contention(
2297                crate::ContentionKind::ExecutionNotActive {
2298                    terminal_outcome,
2299                    lease_epoch,
2300                    lifecycle_phase,
2301                    attempt_id,
2302                },
2303            ) => {
2304                assert_eq!(terminal_outcome, "none");
2305                assert_eq!(lease_epoch, "7");
2306                assert_eq!(lifecycle_phase, "runnable");
2307                assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2308            }
2309            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2310        }
2311    }
2312
2313    /// Empty detail slots must default to "" (not panic) so older-Lua
2314    /// producers or malformed replies degrade to an unreconcilable
2315    /// variant rather than a Parse error.
2316    #[test]
2317    fn parse_success_result_missing_slots_defaults_to_empty() {
2318        let raw = Value::Array(vec![
2319            Ok(Value::Int(0)),
2320            Ok(bulk("execution_not_active")),
2321        ]);
2322        let err = parse_success_result(&raw, "test").unwrap_err();
2323        let unboxed = match err {
2324            SdkError::Engine(b) => *b,
2325            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2326        };
2327        match unboxed {
2328            crate::EngineError::Contention(
2329                crate::ContentionKind::ExecutionNotActive {
2330                    terminal_outcome,
2331                    lease_epoch,
2332                    lifecycle_phase,
2333                    attempt_id,
2334                },
2335            ) => {
2336                assert_eq!(terminal_outcome, "");
2337                assert_eq!(lease_epoch, "");
2338                assert_eq!(lifecycle_phase, "");
2339                assert_eq!(attempt_id, "");
2340            }
2341            other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2342        }
2343    }
2344}