ff_sdk/task.rs
1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6#[cfg(feature = "valkey-default")]
7use ferriskey::Value;
8use ff_core::backend::Handle;
9use ff_core::backend::PendingWaitpoint;
10use ff_core::contracts::ReportUsageResult;
11use ff_core::engine_backend::EngineBackend;
12use ff_core::engine_error::StateKind;
13use ff_script::error::ScriptError;
14use ff_core::partition::PartitionConfig;
15use ff_core::types::*;
16use tokio::sync::{Notify, OwnedSemaphorePermit};
17use tokio::task::JoinHandle;
18
19use crate::SdkError;
20
21// ── Phase 3: Suspend/Signal types ──
22
23// RFC-013 Stage 1d — `TimeoutBehavior`, `SuspendOutcome`,
24// `ResumeCondition`, `SignalMatcher`, `ResumePolicy`,
25// `SuspensionReasonCode`, `SuspensionRequester`, `WaitpointBinding`,
26// `SuspendArgs`, `SuspendOutcomeDetails`, `IdempotencyKey`,
27// `CompositeBody` all live in `ff_core::contracts`. The `SuspendOutcome`
28// re-export preserves the `ff_sdk::SuspendOutcome` path; the other
29// paths are new.
30pub use ff_core::contracts::{
31 CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
32 SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
33 SuspensionRequester, TimeoutBehavior, WaitpointBinding,
34};
35
36/// RFC-013 Stage 1d — cookie returned by the strict `suspend` wrapper.
37///
38/// The returned `handle` is a `HandleKind::Suspended` cookie the caller
39/// uses for `observe_signals` and the eventual re-claim via
40/// `claim_resumed_execution`. Consuming `ClaimedTask::suspend`
41/// invalidates the pre-suspend `ClaimedTask` (already moved by the
42/// `self: ClaimedTask` receiver); this handle is the only live cookie
43/// left on the caller side.
44#[derive(Debug)]
45pub struct SuspendedHandle {
46 pub handle: Handle,
47 pub details: SuspendOutcomeDetails,
48}
49
50/// RFC-013 Stage 1d — outcome of the fallible `try_suspend` wrapper.
51///
52/// Unlike the strict wrapper, `AlreadySatisfied` hands the `ClaimedTask`
53/// back to the caller so work continues on the retained lease.
54#[allow(clippy::large_enum_variant)]
55pub enum TrySuspendOutcome {
56 Suspended(SuspendedHandle),
57 AlreadySatisfied {
58 task: ClaimedTask,
59 details: SuspendOutcomeDetails,
60 },
61}
62
63impl std::fmt::Debug for TrySuspendOutcome {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 match self {
66 Self::Suspended(h) => f.debug_tuple("Suspended").field(h).finish(),
67 Self::AlreadySatisfied { details, .. } => f
68 .debug_struct("AlreadySatisfied")
69 .field("details", details)
70 .finish_non_exhaustive(),
71 }
72 }
73}
74
75/// A signal to deliver to a suspended execution's waitpoint.
76#[derive(Clone, Debug)]
77pub struct Signal {
78 pub signal_name: String,
79 pub signal_category: String,
80 pub payload: Option<Vec<u8>>,
81 pub source_type: String,
82 pub source_identity: String,
83 pub idempotency_key: Option<String>,
84 /// HMAC token issued when the waitpoint was created. Required for
85 /// authenticated signal delivery (RFC-004 §Waitpoint Security).
86 pub waitpoint_token: WaitpointToken,
87}
88
89/// Outcome of `deliver_signal()`.
90#[derive(Clone, Debug, PartialEq, Eq)]
91pub enum SignalOutcome {
92 /// Signal accepted, appended to waitpoint but condition not yet satisfied.
93 Accepted { signal_id: SignalId, effect: String },
94 /// Signal triggered resume — execution is now runnable.
95 TriggeredResume { signal_id: SignalId },
96 /// Duplicate signal (idempotency key matched).
97 Duplicate { existing_signal_id: String },
98}
99
100#[cfg(feature = "valkey-default")]
101impl SignalOutcome {
102 /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
103 ///
104 /// Consuming packages that call `ff_deliver_signal` directly can use this
105 /// to interpret the Lua return value without depending on SDK internals.
106 pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
107 parse_signal_result(raw)
108 }
109}
110
111/// A signal that triggered a resume, readable by a worker after re-claim.
112///
113/// **RFC-012 Stage 0:** the canonical definition has moved to
114/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
115/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
116/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
117/// scheduled for removal in 0.5.0.
118pub use ff_core::backend::ResumeSignal;
119
120/// Outcome of `append_frame()`.
121///
122/// **RFC-012 §R7.2.1:** canonical definition moved to
123/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
124/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
125/// 0.4.x window. Existing consumers construct + match exhaustively
126/// without change. The derive set widened from `Clone, Debug` to
127/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
128pub use ff_core::backend::AppendFrameOutcome;
129
130/// Outcome of a `fail()` call.
131///
132/// **RFC-012 Stage 1a:** canonical definition moved to
133/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
134/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
135/// re-export in `lib.rs`) through the 0.4.x window. Existing
136/// consumers construct + match exhaustively without change.
137pub use ff_core::backend::FailOutcome;
138
139/// A claimed execution with an active lease. The worker processes this task
140/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
141///
142/// The lease is automatically renewed in the background at `lease_ttl / 3`
143/// intervals. Renewal stops when the task is consumed or dropped.
144///
145/// `complete`, `fail`, and `cancel` consume `self` — this prevents
146/// double-complete bugs at the type level.
147pub struct ClaimedTask {
148 /// `EngineBackend` the trait-migrated ops forward through.
149 ///
150 /// **RFC-012 Stage 1b + Round-7.** Today this is always a
151 /// `ValkeyBackend` wrapping an internal `ferriskey::Client`. Most
152 /// per-task ops
153 /// (`complete`/`fail`/`cancel`/`delay_execution`/
154 /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
155 /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
156 /// through `backend.*()`. `suspend` still uses the backend's
157 /// internal `fcall("ff_suspend_execution", ...)` directly — this
158 /// is the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
159 /// input-shape work. Lease renewal goes through
160 /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
161 /// trait-shape gaps tracked by #117.
162 ///
163 /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
164 /// deliver_signal) through the same trait surface; Stage 1d
165 /// refactors this struct to carry a single `Handle` rather than
166 /// synthesising one per op via `cloned_handle`.
167 backend: Arc<dyn EngineBackend>,
168 /// Partition config for key construction.
169 #[allow(dead_code)]
170 partition_config: PartitionConfig,
171 /// Execution identity.
172 execution_id: ExecutionId,
173 /// Current attempt.
174 attempt_index: AttemptIndex,
175 attempt_id: AttemptId,
176 /// Lease identity.
177 lease_id: LeaseId,
178 lease_epoch: LeaseEpoch,
179 /// Lease timing.
180 #[allow(dead_code)]
181 lease_ttl_ms: u64,
182 /// Lane used at claim time.
183 lane_id: LaneId,
184 /// Worker instance that holds this lease (for index cleanup keys).
185 #[allow(dead_code)]
186 worker_instance_id: WorkerInstanceId,
187 /// Backend-minted attempt handle cached at claim time.
188 ///
189 /// **RFC-012 Stage 1d (v0.12 PR-5.5).** Replaces the pre-PR
190 /// `cloned_handle` helper that per-op synthesised a Valkey-specific
191 /// `Handle` via `ValkeyBackend::encode_handle`. The handle is now
192 /// produced by the owning backend at `claim_execution` /
193 /// `claim_resumed_execution` time and rides on the
194 /// `ClaimedExecution` / `ClaimedResumedExecution` result. Every
195 /// per-op trait forwarder clones it into `backend.*()` — no
196 /// runtime cost difference vs. the pre-PR-5.5 per-op
197 /// `encode_handle` (one `Box<[u8]>` allocation per op).
198 handle: Handle,
199 /// Execution data.
200 input_payload: Vec<u8>,
201 execution_kind: String,
202 tags: HashMap<String, String>,
203 /// Background renewal task handle.
204 renewal_handle: JoinHandle<()>,
205 /// Signal to stop renewal (used before consuming self).
206 renewal_stop: Arc<Notify>,
207 /// Consecutive lease renewal failures. Shared with the background renewal
208 /// task. Reset to 0 on each successful renewal. Workers should check
209 /// `is_lease_healthy()` before committing expensive side effects.
210 renewal_failures: Arc<AtomicU32>,
211 /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
212 /// response is received, just before `self` is consumed into `Drop`.
213 /// `Drop` reads this instead of `renewal_handle.is_finished()` to
214 /// suppress the false-positive "dropped without terminal operation"
215 /// warning: after `notify_one`, the renewal task has not yet been
216 /// polled by the runtime, so `is_finished()` is still `false` on the
217 /// happy path when self is being consumed.
218 ///
219 /// Note: the flag is set for any terminal-op path that reaches
220 /// `stop_renewal()`, which includes Lua-level script errors (the
221 /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
222 /// the caller already receives the `Err` via the op's return value,
223 /// so an additional `Drop` warning would be noise. The warning is
224 /// reserved for genuine drop-without-terminal-op cases (panic, early
225 /// return, transport failure before stop_renewal ran).
226 terminal_op_called: AtomicBool,
227 /// Concurrency permit from the worker's semaphore. Held for the lifetime
228 /// of the task; released on complete/fail/cancel/drop.
229 _concurrency_permit: Option<OwnedSemaphorePermit>,
230}
231
232impl ClaimedTask {
233 /// Construct a `ClaimedTask` from the results of a successful
234 /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
235 ///
236 /// # Arguments
237 ///
238 /// * `backend` — `Arc<dyn EngineBackend>` the per-task ops
239 /// (`complete`/`fail`/`cancel`/`renew`/`append_frame`/…) forward
240 /// through, plus the background lease-renewal task. Today this
241 /// is always a `ValkeyBackend` (see the struct doc on `backend`
242 /// above); RFC-023 widens the admissible backends.
243 /// * `partition_config` — partition topology snapshot read at
244 /// `FlowFabricWorker::connect`. Used for key construction on
245 /// the lifetime of this task.
246 /// * `execution_id` — the claimed execution's UUID.
247 /// * `attempt_index` / `attempt_id` — current attempt identity.
248 /// `attempt_index` is 0 on a fresh claim, preserved on a
249 /// resumed claim.
250 /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
251 /// is returned by the Lua FCALL and bumped on each resumed
252 /// claim.
253 /// * `lease_ttl_ms` — lease TTL used to schedule the background
254 /// renewal task (renews at `lease_ttl_ms / 3`).
255 /// * `lane_id` — the lane the task was claimed on. Used for
256 /// index key construction (`lane_active`, `lease_expiry`,
257 /// etc.).
258 /// * `worker_instance_id` — this worker's identity. Used to
259 /// resolve `worker_leases` index entries during renewal and
260 /// completion.
261 /// * `input_payload` / `execution_kind` / `tags` — pre-read
262 /// execution metadata, exposed via getters so the worker
263 /// doesn't round-trip to Valkey to inspect what it just
264 /// claimed.
265 ///
266 /// # Invariant: constructor is `pub(crate)` on purpose
267 ///
268 /// **External callers cannot construct a `ClaimedTask`** — only
269 /// the in-crate claim entry points (`claim_next`,
270 /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
271 /// load-bearing: those entry points are the ONLY sites that
272 /// acquire a permit from the worker's concurrency semaphore
273 /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
274 /// it through `ClaimedTask::set_concurrency_permit` before
275 /// returning the task. Promoting `new` to `pub` would let
276 /// consumers build tasks that bypass the concurrency contract
277 /// the worker's `max_concurrent_tasks` config advertises — the
278 /// returned task would run alongside other in-flight work
279 /// without debiting the permit bank, and the
280 /// complete/fail/cancel/drop path would have nothing to
281 /// release.
282 ///
283 /// If an external callsite genuinely needs to rehydrate a
284 /// task from saved FCALL results, the right answer is a new
285 /// `FlowFabricWorker` entry point that wraps `new` + acquires a
286 /// permit — not promoting this constructor.
287 #[allow(clippy::too_many_arguments)]
288 pub(crate) fn new(
289 backend: Arc<dyn EngineBackend>,
290 partition_config: PartitionConfig,
291 handle: Handle,
292 execution_id: ExecutionId,
293 attempt_index: AttemptIndex,
294 attempt_id: AttemptId,
295 lease_id: LeaseId,
296 lease_epoch: LeaseEpoch,
297 lease_ttl_ms: u64,
298 lane_id: LaneId,
299 worker_instance_id: WorkerInstanceId,
300 input_payload: Vec<u8>,
301 execution_kind: String,
302 tags: HashMap<String, String>,
303 ) -> Self {
304 let renewal_stop = Arc::new(Notify::new());
305 let renewal_failures = Arc::new(AtomicU32::new(0));
306
307 // v0.12 PR-5.5: the renewal task forwards through
308 // `backend.renew(&handle)` using the backend-minted handle. No
309 // Valkey-specific `encode_handle` call here — the handle rode in
310 // on the claim result.
311 let renewal_handle = spawn_renewal_task(
312 backend.clone(),
313 handle.clone(),
314 execution_id.clone(),
315 lease_ttl_ms,
316 renewal_stop.clone(),
317 renewal_failures.clone(),
318 );
319
320 Self {
321 backend,
322 partition_config,
323 execution_id,
324 attempt_index,
325 attempt_id,
326 lease_id,
327 lease_epoch,
328 lease_ttl_ms,
329 lane_id,
330 worker_instance_id,
331 handle,
332 input_payload,
333 execution_kind,
334 tags,
335 renewal_handle,
336 renewal_stop,
337 renewal_failures,
338 terminal_op_called: AtomicBool::new(false),
339 _concurrency_permit: None,
340 }
341 }
342
343 /// Attach a concurrency permit from the worker's semaphore.
344 /// The permit is held for the task's lifetime and released on drop.
345 #[allow(dead_code)]
346 pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
347 self._concurrency_permit = Some(permit);
348 }
349
350 // ── Accessors ──
351
352 pub fn execution_id(&self) -> &ExecutionId {
353 &self.execution_id
354 }
355
356 pub fn attempt_index(&self) -> AttemptIndex {
357 self.attempt_index
358 }
359
360 pub fn attempt_id(&self) -> &AttemptId {
361 &self.attempt_id
362 }
363
364 pub fn lease_id(&self) -> &LeaseId {
365 &self.lease_id
366 }
367
368 pub fn lease_epoch(&self) -> LeaseEpoch {
369 self.lease_epoch
370 }
371
372 pub fn input_payload(&self) -> &[u8] {
373 &self.input_payload
374 }
375
376 pub fn execution_kind(&self) -> &str {
377 &self.execution_kind
378 }
379
380 pub fn tags(&self) -> &HashMap<String, String> {
381 &self.tags
382 }
383
384 pub fn lane_id(&self) -> &LaneId {
385 &self.lane_id
386 }
387
388 /// Read frames from this task's attempt stream.
389 ///
390 /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
391 /// no longer need the `ferriskey::Client` leak (#87). See
392 /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
393 /// for the backend-level contract.
394 ///
395 /// Validation (count_limit bounds) runs at this boundary — out-of-range
396 /// input surfaces as [`SdkError::Config`] before reaching the backend.
397 ///
398 /// Gated on `valkey-default` — the backing
399 /// [`EngineBackend::read_stream`] trait method requires ff-core's
400 /// `streaming` feature, which the `valkey-default` feature set
401 /// activates. Sqlite-only builds (`--no-default-features,
402 /// features = ["sqlite"]`) do not expose this method today.
403 #[cfg(feature = "valkey-default")]
404 pub async fn read_stream(
405 &self,
406 from: StreamCursor,
407 to: StreamCursor,
408 count_limit: u64,
409 ) -> Result<StreamFrames, SdkError> {
410 validate_stream_read_count(count_limit)?;
411 Ok(self
412 .backend
413 .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
414 .await?)
415 }
416
417 /// Tail this task's attempt stream for new frames.
418 ///
419 /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
420 /// for the head-of-line warning — the task's backend is shared
421 /// with claim/complete/fail hot paths. Consumers that need
422 /// isolation should build a dedicated `EngineBackend` for tail
423 /// reads.
424 ///
425 /// Gated on `valkey-default` — see [`Self::read_stream`].
426 #[cfg(feature = "valkey-default")]
427 pub async fn tail_stream(
428 &self,
429 after: StreamCursor,
430 block_ms: u64,
431 count_limit: u64,
432 ) -> Result<StreamFrames, SdkError> {
433 self.tail_stream_with_visibility(
434 after,
435 block_ms,
436 count_limit,
437 ff_core::backend::TailVisibility::All,
438 )
439 .await
440 }
441
442 /// Tail with an explicit [`TailVisibility`](ff_core::backend::TailVisibility)
443 /// filter (RFC-015 §6). Use [`TailVisibility::ExcludeBestEffort`]
444 /// (ff_core::backend::TailVisibility::ExcludeBestEffort) to drop
445 /// [`StreamMode::BestEffortLive`](ff_core::backend::StreamMode::BestEffortLive)
446 /// frames server-side.
447 ///
448 /// Gated on `valkey-default` — see [`Self::read_stream`].
449 #[cfg(feature = "valkey-default")]
450 pub async fn tail_stream_with_visibility(
451 &self,
452 after: StreamCursor,
453 block_ms: u64,
454 count_limit: u64,
455 visibility: ff_core::backend::TailVisibility,
456 ) -> Result<StreamFrames, SdkError> {
457 if block_ms > MAX_TAIL_BLOCK_MS {
458 return Err(SdkError::Config {
459 context: "tail_stream".into(),
460 field: Some("block_ms".into()),
461 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
462 });
463 }
464 validate_stream_read_count(count_limit)?;
465 validate_tail_cursor(&after)?;
466 Ok(self
467 .backend
468 .tail_stream(
469 &self.execution_id,
470 self.attempt_index,
471 after,
472 block_ms,
473 count_limit,
474 visibility,
475 )
476 .await?)
477 }
478
479 /// Clone this task's cached attempt handle for passing to a trait
480 /// op (`backend.complete(&handle, …)`, `backend.fail(&handle, …)`,
481 /// etc.).
482 ///
483 /// **RFC-012 Stage 1d (v0.12 PR-5.5).** The handle rides in on the
484 /// claim result and is stored on the struct — per-op calls clone
485 /// it instead of re-synthesising via backend-specific code. The
486 /// `Handle` payload is `Box<[u8]> + 2 small enums`, so a clone is
487 /// effectively one heap allocation — the same cost as the
488 /// pre-PR-5.5 `cloned_handle`.
489 fn cloned_handle(&self) -> Handle {
490 self.handle.clone()
491 }
492
493 /// Check if the lease is likely still valid based on renewal success.
494 ///
495 /// Returns `false` if 3 or more consecutive renewal attempts have failed.
496 /// Workers should check this before committing expensive or irreversible
497 /// side effects. A `false` return means Valkey may have already expired
498 /// the lease and another worker could be processing this execution.
499 pub fn is_lease_healthy(&self) -> bool {
500 self.renewal_failures.load(Ordering::Relaxed) < 3
501 }
502
503 /// Number of consecutive lease renewal failures since the last success.
504 ///
505 /// Returns 0 when renewals are working normally. Useful for observability
506 /// and custom health policies beyond the default threshold of 3.
507 pub fn consecutive_renewal_failures(&self) -> u32 {
508 self.renewal_failures.load(Ordering::Relaxed)
509 }
510
511 // ── Terminal operations (consume self) ──
512
513 /// Delay the execution until `delay_until`.
514 ///
515 /// Releases the lease. The execution moves to `delayed` state.
516 /// Consumes self — the task cannot be used after delay.
517 pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
518 // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
519 // the pre-migration `stop_renewal` contract: called only when
520 // the FCALL round-tripped (Ok or a typed Lua/engine error);
521 // raw transport errors bubble up without stopping renewal so
522 // the `Drop` warning still fires if the caller later drops
523 // `self` without retrying.
524 let handle = self.cloned_handle();
525 let out = self.backend.delay(&handle, delay_until).await;
526 if fcall_landed(&out) {
527 self.stop_renewal();
528 }
529 out.map_err(SdkError::from)
530 }
531
532 /// Move execution to waiting_children state.
533 ///
534 /// Releases the lease. The execution waits for child dependencies to complete.
535 /// Consumes self.
536 pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
537 // RFC-012 Stage 1b: forwards through `backend.wait_children`.
538 // See `delay_execution` for the stop_renewal contract.
539 let handle = self.cloned_handle();
540 let out = self.backend.wait_children(&handle).await;
541 if fcall_landed(&out) {
542 self.stop_renewal();
543 }
544 out.map_err(SdkError::from)
545 }
546
547 /// Complete the execution successfully.
548 ///
549 /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
550 /// Renewal continues during the FCALL to prevent lease expiry under
551 /// network latency — the Lua fences on lease_id+epoch atomically.
552 /// Consumes self — the task cannot be used after completion.
553 ///
554 /// # Connection errors and replay
555 ///
556 /// If the Valkey connection drops after the Lua commit but before the
557 /// client reads the response, retrying `complete()` with the same
558 /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
559 /// reconciles the "already terminal with matching outcome" response
560 /// into `Ok(())`. A retry after a different terminal op has raced in
561 /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
562 /// populated `terminal_outcome` so the caller can see what actually
563 /// happened.
564 pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
565 // RFC-012 Stage 1b: forwards through `backend.complete`.
566 // The FCALL body + the `ExecutionNotActive` replay reconciliation
567 // (match same epoch + attempt_id + outcome=="success" → Ok)
568 // moved to `ff_backend_valkey::complete_impl`.
569 let handle = self.cloned_handle();
570 let out = self.backend.complete(&handle, result_payload).await;
571 if fcall_landed(&out) {
572 self.stop_renewal();
573 }
574 out.map_err(SdkError::from)
575 }
576
577 /// Fail the execution with a reason and error category.
578 ///
579 /// If the execution policy allows retries, the engine schedules a retry
580 /// (delayed backoff). Otherwise, the execution becomes terminal failed.
581 /// Returns [`FailOutcome`] so the caller knows what happened.
582 ///
583 /// # Connection errors and replay
584 ///
585 /// `fail()` is replay-safe under the same conditions as `complete()`:
586 /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
587 /// reconciled into the outcome the server actually committed
588 /// (`TerminalFailed` if no retries left; `RetryScheduled` with
589 /// `delay_until = 0` if a retry was scheduled — the exact delay is
590 /// not recovered on the replay path).
591 pub async fn fail(
592 self,
593 reason: &str,
594 error_category: &str,
595 ) -> Result<FailOutcome, SdkError> {
596 // RFC-012 Stage 1b: forwards through `backend.fail`.
597 // The FCALL body + retry-policy read + the two-shape replay
598 // reconciliation (terminal/failed → TerminalFailed, runnable
599 // → RetryScheduled{0}) moved to
600 // `ff_backend_valkey::fail_impl`.
601 //
602 // **Preserving the caller's raw category string.** The
603 // pre-Stage-1b code passed `error_category` straight to the
604 // Lua `failure_category` ARGV. Real callers use arbitrary
605 // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
606 // `"inference_error"`, …) that do not correspond to the 5
607 // named `FailureClass` variants. A lossy enum conversion
608 // would rewrite every non-canonical category to
609 // `Transient` — a real behavior change for
610 // ff-readiness-tests + the examples crates.
611 //
612 // Instead: pack the raw category bytes into
613 // `FailureReason.detail`; `fail_impl` reads them back and
614 // threads them to Lua verbatim. The `classification` enum
615 // is derived from the string for the few consumers who
616 // match on the trait return today (none in-workspace).
617 // Stage 1d (or issue #117) widens `FailureClass` with a
618 // `Custom(String)` arm; this stash carrier retires then.
619 let handle = self.cloned_handle();
620 let failure_reason = ff_core::backend::FailureReason::with_detail(
621 reason.to_owned(),
622 error_category.as_bytes().to_vec(),
623 );
624 let classification = error_category_to_class(error_category);
625 let out = self.backend.fail(&handle, failure_reason, classification).await;
626 if fcall_landed(&out) {
627 self.stop_renewal();
628 }
629 out.map_err(SdkError::from)
630 }
631
632 /// Cancel the execution.
633 ///
634 /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
635 /// Consumes self.
636 ///
637 /// # Connection errors and replay
638 ///
639 /// `cancel()` is replay-safe under the same conditions as `complete()`:
640 /// a retry by the same caller matching `lease_epoch` + `attempt_id`
641 /// returns `Ok(())` if the server's stored `terminal_outcome` is
642 /// `cancelled`. A retry that finds a different outcome (because a
643 /// concurrent `complete()` or `fail()` won the race) surfaces as
644 /// `ExecutionNotActive` with the populated `terminal_outcome` so the
645 /// caller can see that the cancel intent was NOT honored.
646 pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
647 // RFC-012 Stage 1b: forwards through `backend.cancel`.
648 // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
649 // and the `ExecutionNotActive` → outcome=="cancelled" replay
650 // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
651 let handle = self.cloned_handle();
652 let out = self.backend.cancel(&handle, reason).await;
653 if fcall_landed(&out) {
654 self.stop_renewal();
655 }
656 out.map_err(SdkError::from)
657 }
658
659 // ── Non-terminal operations ──
660
661 /// Manually renew the lease.
662 ///
663 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
664 /// trait. The background renewal task calls
665 /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
666 /// this method is the public entry point for workers that want
667 /// to force a renew out-of-band.
668 pub async fn renew_lease(&self) -> Result<(), SdkError> {
669 let handle = self.cloned_handle();
670 self.backend
671 .renew(&handle)
672 .await
673 .map(|_renewal| ())
674 .map_err(SdkError::from)
675 }
676
677 /// Update progress (pct 0-100 and optional message).
678 ///
679 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
680 /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
681 ///
682 /// # Not for stream frames
683 ///
684 /// `update_progress` writes the `progress_percent` / `progress_message`
685 /// fields on `exec_core` (the execution's state hash). It is a
686 /// scalar heartbeat — each call overwrites the previous value and
687 /// nothing is appended to the output stream. Producers emitting
688 /// stream frames (arbitrary `frame_type` + payload, consumed via
689 /// `ClaimedTask::read_stream` / `tail_stream` or the HTTP
690 /// stream-tail routes) MUST use [`Self::append_frame`] instead;
691 /// `update_progress` is invisible to stream consumers.
692 pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
693 let handle = self.cloned_handle();
694 self.backend
695 .progress(&handle, Some(pct), Some(message.to_owned()))
696 .await
697 .map_err(SdkError::from)
698 }
699
700 /// Report usage against a budget and check limits.
701 ///
702 /// Non-consuming — the worker can report usage multiple times.
703 /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
704 /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
705 ///
706 /// **RFC-012 §R7.2.3:** forwards through
707 /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
708 /// The trait's `UsageDimensions.dedup_key` carries the raw key;
709 /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
710 /// wrap so the dedup state co-locates with the budget partition
711 /// (PR #108).
712 pub async fn report_usage(
713 &self,
714 budget_id: &BudgetId,
715 dimensions: &[(&str, u64)],
716 dedup_key: Option<&str>,
717 ) -> Result<ReportUsageResult, SdkError> {
718 let handle = self.cloned_handle();
719 let mut dims = ff_core::backend::UsageDimensions::default();
720 for (name, delta) in dimensions {
721 dims.custom.insert((*name).to_owned(), *delta);
722 }
723 dims.dedup_key = dedup_key
724 .filter(|k| !k.is_empty())
725 .map(|k| k.to_owned());
726 self.backend
727 .report_usage(&handle, budget_id, dims)
728 .await
729 .map_err(SdkError::from)
730 }
731
732 /// Create a pending waitpoint for future signal delivery.
733 ///
734 /// Non-consuming — the worker keeps the lease. Signals delivered to the
735 /// waitpoint are buffered. When the worker later calls `suspend()` with
736 /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
737 /// resume condition.
738 ///
739 /// Returns both the waitpoint_id AND the HMAC token required by external
740 /// callers to buffer signals against this pending waitpoint
741 /// (RFC-004 §Waitpoint Security).
742 ///
743 /// **RFC-012 §R7.2.2:** forwards through
744 /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
745 /// The trait returns
746 /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
747 /// `hmac_token` is the same wire HMAC this method has always
748 /// produced; the SDK unwraps it back to the historical
749 /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
750 pub async fn create_pending_waitpoint(
751 &self,
752 waitpoint_key: &str,
753 expires_in_ms: u64,
754 ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
755 let handle = self.cloned_handle();
756 let expires_in = std::time::Duration::from_millis(expires_in_ms);
757 let pending = self
758 .backend
759 .create_waitpoint(&handle, waitpoint_key, expires_in)
760 .await
761 .map_err(SdkError::from)?;
762 // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
763 // `.token()` accessor borrows the underlying
764 // `WaitpointToken`, which is the caller's historical return
765 // shape.
766 Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
767 }
768
769 // ── Phase 4: Streaming ──
770
771 /// Append a frame to the current attempt's output stream.
772 ///
773 /// Non-consuming — the worker can append many frames during execution.
774 /// The stream is created lazily on the first append.
775 ///
776 /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
777 /// `EngineBackend` trait. The free-form `frame_type` tag and
778 /// optional `metadata` (wire `correlation_id`) travel on the
779 /// extended [`ff_core::backend::Frame`] shape (`frame_type:
780 /// String`, `correlation_id: Option<String>`), giving byte-for-byte
781 /// wire parity with the pre-migration direct-FCALL path.
782 ///
783 /// # `append_frame` vs [`Self::update_progress`]
784 ///
785 /// Stream-frame producers (arbitrary `frame_type` + payload
786 /// consumed via `ClaimedTask::read_stream` / `tail_stream` or the
787 /// HTTP stream-tail routes) MUST use `append_frame`.
788 /// `update_progress` writes scalar `progress_percent` /
789 /// `progress_message` fields to `exec_core` and is invisible to
790 /// stream consumers.
791 pub async fn append_frame(
792 &self,
793 frame_type: &str,
794 payload: &[u8],
795 metadata: Option<&str>,
796 ) -> Result<AppendFrameOutcome, SdkError> {
797 self.append_frame_with_mode(
798 frame_type,
799 payload,
800 metadata,
801 ff_core::backend::StreamMode::Durable,
802 )
803 .await
804 }
805
806 /// Append a frame under an explicit RFC-015
807 /// [`StreamMode`](ff_core::backend::StreamMode).
808 /// Defaults via [`Self::append_frame`] preserve pre-015 behaviour
809 /// ([`StreamMode::Durable`](ff_core::backend::StreamMode::Durable)).
810 pub async fn append_frame_with_mode(
811 &self,
812 frame_type: &str,
813 payload: &[u8],
814 metadata: Option<&str>,
815 mode: ff_core::backend::StreamMode,
816 ) -> Result<AppendFrameOutcome, SdkError> {
817 let handle = self.cloned_handle();
818 let mut frame = ff_core::backend::Frame::new(
819 payload.to_vec(),
820 ff_core::backend::FrameKind::Event,
821 )
822 .with_frame_type(frame_type)
823 .with_mode(mode);
824 if let Some(cid) = metadata {
825 frame = frame.with_correlation_id(cid);
826 }
827 self.backend
828 .append_frame(&handle, frame)
829 .await
830 .map_err(SdkError::from)
831 }
832
833 // ── RFC-013 Stage 1d: Suspend ──
834
835 /// **Strict suspend.** Consumes `self` and yields a
836 /// [`SuspendedHandle`] or errors. On the early-satisfied path
837 /// (buffered signals already matched the condition), returns
838 /// `EngineError::State(AlreadySatisfied)` — use `try_suspend` when
839 /// that branch is part of the flow's happy path.
840 ///
841 /// RFC-013 §5.1 — this is the classic Rust `foo` / `try_foo`
842 /// split; `suspend` is the unconditional form. Defaults
843 /// `WaitpointBinding::fresh()` for the waitpoint. For pending
844 /// waitpoints previously issued via `create_pending_waitpoint`,
845 /// use `try_suspend_on_pending`.
846 pub async fn suspend(
847 self,
848 reason_code: SuspensionReasonCode,
849 resume_condition: ResumeCondition,
850 timeout: Option<(TimestampMs, TimeoutBehavior)>,
851 resume_policy: ResumePolicy,
852 ) -> Result<SuspendedHandle, SdkError> {
853 let outcome = self
854 .try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
855 .await?;
856 match outcome {
857 TrySuspendOutcome::Suspended(h) => Ok(h),
858 TrySuspendOutcome::AlreadySatisfied { .. } => Err(SdkError::from(
859 crate::EngineError::State(StateKind::AlreadySatisfied),
860 )),
861 }
862 }
863
864 /// **Fallible suspend.** On the early-satisfied path, the
865 /// `ClaimedTask` is handed back unchanged so the worker can
866 /// continue running against the retained lease.
867 ///
868 /// RFC-013 §5.1 — use this when consuming a pending waitpoint whose
869 /// buffered signals may already match the condition.
870 pub async fn try_suspend(
871 self,
872 reason_code: SuspensionReasonCode,
873 resume_condition: ResumeCondition,
874 timeout: Option<(TimestampMs, TimeoutBehavior)>,
875 resume_policy: ResumePolicy,
876 ) -> Result<TrySuspendOutcome, SdkError> {
877 self.try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
878 .await
879 }
880
881 /// Convenience: `try_suspend` against a pending waitpoint previously
882 /// issued via `create_pending_waitpoint`.
883 pub async fn try_suspend_on_pending(
884 self,
885 pending: &PendingWaitpoint,
886 reason_code: SuspensionReasonCode,
887 resume_condition: ResumeCondition,
888 timeout: Option<(TimestampMs, TimeoutBehavior)>,
889 resume_policy: ResumePolicy,
890 ) -> Result<TrySuspendOutcome, SdkError> {
891 self.try_suspend_inner(
892 WaitpointBinding::use_pending(pending),
893 reason_code,
894 resume_condition,
895 timeout,
896 resume_policy,
897 )
898 .await
899 }
900
901 async fn try_suspend_inner(
902 self,
903 waitpoint: WaitpointBinding,
904 reason_code: SuspensionReasonCode,
905 resume_condition: ResumeCondition,
906 timeout: Option<(TimestampMs, TimeoutBehavior)>,
907 resume_policy: ResumePolicy,
908 ) -> Result<TrySuspendOutcome, SdkError> {
909 let handle = self.cloned_handle();
910 let (timeout_at, timeout_behavior) = match timeout {
911 Some((at, b)) => (Some(at), b),
912 None => (None, TimeoutBehavior::Fail),
913 };
914 // If the caller passed `ResumeCondition::Single { waitpoint_key }`
915 // alongside the default `WaitpointBinding::fresh()` (which mints a
916 // random internal key), rebind the Fresh binding to use the
917 // condition's key so RFC-013 §2.4 "waitpoint_key cross-field
918 // invariant" is honored. UsePending retains its own binding.
919 // RFC-014: for `Composite`, rebind to the first waitpoint_key
920 // observed in the composite tree (Single.waitpoint_key or
921 // Count.waitpoints[0]); consumers using single-waitpoint
922 // composites must keep all Single/Count waitpoint_keys equal
923 // (see RFC-014 single-waitpoint scoping).
924 let waitpoint = match (&waitpoint, &resume_condition) {
925 (
926 WaitpointBinding::Fresh { waitpoint_id, .. },
927 ResumeCondition::Single { waitpoint_key, .. },
928 ) => WaitpointBinding::Fresh {
929 waitpoint_id: waitpoint_id.clone(),
930 waitpoint_key: waitpoint_key.clone(),
931 },
932 (
933 WaitpointBinding::Fresh { waitpoint_id, .. },
934 ResumeCondition::Composite(body),
935 ) => {
936 if let Some(key) = composite_first_waitpoint_key(body) {
937 WaitpointBinding::Fresh {
938 waitpoint_id: waitpoint_id.clone(),
939 waitpoint_key: key,
940 }
941 } else {
942 waitpoint
943 }
944 }
945 _ => waitpoint,
946 };
947 let mut args = SuspendArgs::new(
948 SuspensionId::new(),
949 waitpoint,
950 resume_condition,
951 resume_policy,
952 reason_code,
953 TimestampMs::now(),
954 )
955 .with_requester(SuspensionRequester::Worker);
956 if let Some(at) = timeout_at {
957 args = args.with_timeout(at, timeout_behavior);
958 }
959
960 let outcome = self
961 .backend
962 .suspend(&handle, args)
963 .await
964 .map_err(SdkError::from)?;
965 match outcome {
966 SuspendOutcome::Suspended { details, handle: new_handle } => {
967 // Renewal is stopped only AFTER a successful Suspended
968 // outcome. The suspended-kind handle is what the caller
969 // will later hand to `observe_signals` / `claim_from_resume_grant`.
970 self.stop_renewal();
971 Ok(TrySuspendOutcome::Suspended(SuspendedHandle {
972 handle: new_handle,
973 details,
974 }))
975 }
976 SuspendOutcome::AlreadySatisfied { details } => {
977 // Lease is retained Lua-side; keep the ClaimedTask alive
978 // so the worker continues against the existing lease.
979 Ok(TrySuspendOutcome::AlreadySatisfied {
980 task: self,
981 details,
982 })
983 }
984 _ => Err(SdkError::from(ScriptError::Parse {
985 fcall: "try_suspend_inner".into(),
986 execution_id: None,
987 message: "unexpected SuspendOutcome variant".into(),
988 })),
989 }
990 }
991
992 /// Read the signals that satisfied the waitpoint and triggered this
993 /// resume.
994 ///
995 /// Non-consuming. Intended to be called immediately after re-claim via
996 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
997 /// subsequent `suspend()` (which replaces `suspension:current`).
998 ///
999 /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
1000 ///
1001 /// - No prior suspension on this execution.
1002 /// - The prior suspension belonged to an earlier attempt (e.g. the
1003 /// attempt was cancelled/failed and a retry is now claiming).
1004 /// - The prior suspension was closed by timeout / cancel / operator
1005 /// override rather than by a matched signal.
1006 ///
1007 /// Reads `suspension:current` once, filters by `attempt_index` to
1008 /// guard against stale prior-attempt records, then fetches the matched
1009 /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
1010 /// fields and reads each signal's metadata + payload directly.
1011 pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
1012 // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
1013 // Pre-migration body (HGETALL suspension_current + HMGET
1014 // matchers + pipelined HGETALL signal_hash / GET
1015 // signal_payload) lives in
1016 // `ff_backend_valkey::observe_signals_impl`.
1017 let handle = self.cloned_handle();
1018 self.backend
1019 .observe_signals(&handle)
1020 .await
1021 .map_err(SdkError::from)
1022 }
1023
1024 /// Signal the renewal task to stop. Called by every terminal op
1025 /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
1026 /// `move_to_waiting_children`) after the FCALL returns. Also marks
1027 /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
1028 /// consumption from a genuine drop-without-terminal-op.
1029 fn stop_renewal(&self) {
1030 self.terminal_op_called.store(true, Ordering::Release);
1031 self.renewal_stop.notify_one();
1032 }
1033
1034}
1035
1036/// True iff the backend's FCALL result represents a round-trip that
1037/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
1038/// contention, conflict, state, bug) all count as "landed" — the
1039/// server either committed or rejected with a typed response. Only
1040/// raw `Transport` errors (connection drops, request timeouts, parse
1041/// failures) count as "did not land", which matches the pre-Stage-1b
1042/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
1043/// — those errors returned before `stop_renewal()` ran, preserving
1044/// the `Drop` warning for genuine "lease will leak" cases.
1045///
1046/// Stage 1b terminal-op forwarders use this predicate to decide
1047/// whether to call `stop_renewal()`: yes for landed responses, no
1048/// for transport errors so the caller's retry path still sees a
1049/// running renewal task.
1050fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
1051 match r {
1052 Ok(_) => true,
1053 Err(crate::EngineError::Transport { .. }) => false,
1054 Err(_) => true,
1055 }
1056}
1057
1058/// Map the SDK's free-form `error_category: &str` to the typed
1059/// `FailureClass` the trait's `fail` method takes. Unknown categories
1060/// fall through to `Transient` — the Lua side already tolerated
1061/// arbitrary strings, so the worst a category drift does under the
1062/// Stage 1b forwarder is reclassify a novel category as transient.
1063/// Stage 1d (or issue #117) widens `FailureClass` with a
1064/// `Custom(String)` arm for exact round-trip.
1065fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
1066 use ff_core::backend::FailureClass;
1067 match s {
1068 "transient" => FailureClass::Transient,
1069 "permanent" => FailureClass::Permanent,
1070 "infra_crash" => FailureClass::InfraCrash,
1071 "timeout" => FailureClass::Timeout,
1072 "cancelled" => FailureClass::Cancelled,
1073 _ => FailureClass::Transient,
1074 }
1075}
1076
1077impl Drop for ClaimedTask {
1078 fn drop(&mut self) {
1079 // Abort the background renewal task on drop.
1080 // This is a safety net — complete/fail/cancel already stop renewal
1081 // via notify before consuming self. But if the task is dropped
1082 // without being consumed (e.g., panic), abort prevents leaked renewals.
1083 //
1084 // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1085 // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1086 // and then self is consumed into Drop immediately. The renewal task
1087 // has not yet been polled by the runtime, so `is_finished()` is still
1088 // `false` here — which previously fired the warning on every
1089 // complete/fail/cancel/suspend call. `terminal_op_called` is the
1090 // authoritative signal that a terminal-op path ran to the point of
1091 // stopping renewal; it does not by itself certify the Lua side
1092 // succeeded (see the field doc). The caller surfaces any error via
1093 // the op's return value, so a `Drop` warning is unneeded there.
1094 if !self.terminal_op_called.load(Ordering::Acquire) {
1095 tracing::warn!(
1096 execution_id = %self.execution_id,
1097 "ClaimedTask dropped without terminal operation — lease will expire"
1098 );
1099 }
1100 self.renewal_handle.abort();
1101 }
1102}
1103
1104// ── Lease renewal ──
1105
1106/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1107/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1108/// hooks still see one span per renewal (restores PR #119 Cursor
1109/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1110/// construction time, not per-tick).
1111///
1112/// See `benches/harness/src/bin/long_running.rs` for the bench
1113/// consumer that depends on this span naming.
1114#[tracing::instrument(
1115 name = "renew_lease",
1116 skip_all,
1117 fields(execution_id = %execution_id)
1118)]
1119async fn renew_once(
1120 backend: &dyn EngineBackend,
1121 handle: &Handle,
1122 execution_id: &ExecutionId,
1123) -> Result<(), crate::EngineError> {
1124 backend.renew(handle).await.map(|_| ())
1125}
1126
1127/// Spawn a background tokio task that renews the lease at `ttl / 3`
1128/// intervals.
1129///
1130/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1131/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1132/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1133/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1134/// + the encoded `Handle` instead. Per-tick tracing lives on
1135/// [`renew_once`]; this function itself is sync + one-shot.
1136///
1137/// Stops when:
1138/// - `stop_signal` is notified (complete/fail/cancel called)
1139/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1140/// - The task handle is aborted (ClaimedTask dropped)
1141fn spawn_renewal_task(
1142 backend: Arc<dyn EngineBackend>,
1143 handle: Handle,
1144 execution_id: ExecutionId,
1145 lease_ttl_ms: u64,
1146 stop_signal: Arc<Notify>,
1147 failure_counter: Arc<AtomicU32>,
1148) -> JoinHandle<()> {
1149 // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1150 // panics if a caller (or a misconfigured test) passes a
1151 // lease_ttl_ms < 3. The SDK config validator already enforces
1152 // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1153 // is a cheap belt-and-suspenders (Copilot review finding on
1154 // PR #119).
1155 let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1156
1157 tokio::spawn(async move {
1158 let mut tick = tokio::time::interval(interval);
1159 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1160 // Skip the first immediate tick — the lease was just acquired.
1161 tick.tick().await;
1162
1163 loop {
1164 tokio::select! {
1165 _ = stop_signal.notified() => {
1166 tracing::debug!(
1167 execution_id = %execution_id,
1168 "lease renewal stopped by signal"
1169 );
1170 return;
1171 }
1172 _ = tick.tick() => {
1173 match renew_once(backend.as_ref(), &handle, &execution_id).await {
1174 Ok(_renewal) => {
1175 failure_counter.store(0, Ordering::Relaxed);
1176 tracing::trace!(
1177 execution_id = %execution_id,
1178 "lease renewed"
1179 );
1180 }
1181 Err(e) if is_terminal_renewal_error(&e) => {
1182 failure_counter.fetch_add(1, Ordering::Relaxed);
1183 tracing::warn!(
1184 execution_id = %execution_id,
1185 error = %e,
1186 "lease renewal failed with terminal error, stopping renewal"
1187 );
1188 return;
1189 }
1190 Err(e) => {
1191 let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1192 tracing::warn!(
1193 execution_id = %execution_id,
1194 error = %e,
1195 consecutive_failures = count,
1196 "lease renewal failed (will retry next interval)"
1197 );
1198 }
1199 }
1200 }
1201 }
1202 }
1203 })
1204}
1205
1206/// Check if an engine error means renewal should stop permanently.
1207#[allow(dead_code)]
1208fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1209 use crate::{ContentionKind, EngineError, StateKind};
1210 matches!(
1211 err,
1212 EngineError::State(
1213 StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1214 ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1215 | EngineError::NotFound { entity: "execution" }
1216 )
1217}
1218
1219// ── FCALL result parsing ──
1220
1221/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1222/// function into a typed [`ReportUsageResult`].
1223///
1224/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1225/// `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1226/// Status code `!= 1` is parsed as a [`ScriptError`] via
1227/// [`ScriptError::from_code_with_detail`].
1228///
1229/// Exposed as `pub` so downstream SDKs that speak the same wire format
1230/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1231/// call this directly instead of re-implementing the parse. Keeping one
1232/// parser paired with the producer (the Lua function registered at
1233/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1234/// against silent format drift between producer and consumer.
1235#[cfg(feature = "valkey-default")]
1236pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1237 let arr = match raw {
1238 Value::Array(arr) => arr,
1239 _ => {
1240 return Err(SdkError::from(ScriptError::Parse {
1241 fcall: "parse_report_usage_result".into(),
1242 execution_id: None,
1243 message: "ff_report_usage_and_check: expected Array".into(),
1244 }));
1245 }
1246 };
1247 let status_code = match arr.first() {
1248 Some(Ok(Value::Int(n))) => *n,
1249 _ => {
1250 return Err(SdkError::from(ScriptError::Parse {
1251 fcall: "parse_report_usage_result".into(),
1252 execution_id: None,
1253 message: "ff_report_usage_and_check: expected Int status code".into(),
1254 }));
1255 }
1256 };
1257 if status_code != 1 {
1258 let error_code = usage_field_str(arr, 1);
1259 let detail = usage_field_str(arr, 2);
1260 return Err(SdkError::from(
1261 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1262 ScriptError::Parse {
1263 fcall: "parse_report_usage_result".into(),
1264 execution_id: None,
1265 message: format!("ff_report_usage_and_check: {error_code}"),
1266 }
1267 }),
1268 ));
1269 }
1270 let sub_status = usage_field_str(arr, 1);
1271 match sub_status.as_str() {
1272 "OK" => Ok(ReportUsageResult::Ok),
1273 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1274 "SOFT_BREACH" => {
1275 let dim = usage_field_str(arr, 2);
1276 let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1277 let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1278 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1279 }
1280 "HARD_BREACH" => {
1281 let dim = usage_field_str(arr, 2);
1282 let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1283 let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1284 Ok(ReportUsageResult::HardBreach {
1285 dimension: dim,
1286 current_usage: current,
1287 hard_limit: limit,
1288 })
1289 }
1290 _ => Err(SdkError::from(ScriptError::Parse {
1291 fcall: "parse_report_usage_result".into(),
1292 execution_id: None,
1293 message: format!(
1294 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1295 ),
1296 })),
1297 }
1298}
1299
1300#[cfg(feature = "valkey-default")]
1301fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1302 match arr.get(index) {
1303 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1304 Some(Ok(Value::SimpleString(s))) => s.clone(),
1305 Some(Ok(Value::Int(n))) => n.to_string(),
1306 _ => String::new(),
1307 }
1308}
1309
1310/// Parse a required numeric usage field (u64) from the wire array at
1311/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1312/// holds a non-string/non-int value, or contains a string that does
1313/// not parse as u64.
1314///
1315/// Rationale: the Lua producer (`lua/budget.lua:99`,
1316/// `ff_report_usage_and_check`) always emits
1317/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1318/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1319/// non-numeric value here means the Lua and Rust sides drifted;
1320/// silently coercing to `0` would surface drift as "zero-usage breach"
1321/// — arithmetically correct but semantically nonsense. Fail loudly
1322/// instead so drift shows up as a parse error at the first call site.
1323#[cfg(feature = "valkey-default")]
1324fn parse_usage_u64(
1325 arr: &[Result<Value, ferriskey::Error>],
1326 index: usize,
1327 sub_status: &str,
1328 field_name: &str,
1329) -> Result<u64, SdkError> {
1330 match arr.get(index) {
1331 Some(Ok(Value::Int(n))) => {
1332 u64::try_from(*n).map_err(|_| {
1333 SdkError::from(ScriptError::Parse {
1334 fcall: "parse_usage_u64".into(),
1335 execution_id: None,
1336 message: format!(
1337 "ff_report_usage_and_check {sub_status}: {field_name} \
1338 (index {index}) negative int {n} cannot be u64"
1339 ),
1340 })
1341 })
1342 }
1343 Some(Ok(Value::BulkString(b))) => {
1344 let s = String::from_utf8_lossy(b);
1345 s.parse::<u64>().map_err(|_| {
1346 SdkError::from(ScriptError::Parse {
1347 fcall: "parse_usage_u64".into(),
1348 execution_id: None,
1349 message: format!(
1350 "ff_report_usage_and_check {sub_status}: {field_name} \
1351 (index {index}) not a u64 string: {s:?}"
1352 ),
1353 })
1354 })
1355 }
1356 Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1357 SdkError::from(ScriptError::Parse {
1358 fcall: "parse_usage_u64".into(),
1359 execution_id: None,
1360 message: format!(
1361 "ff_report_usage_and_check {sub_status}: {field_name} \
1362 (index {index}) not a u64 string: {s:?}"
1363 ),
1364 })
1365 }),
1366 Some(_) => Err(SdkError::from(ScriptError::Parse {
1367 fcall: "parse_usage_u64".into(),
1368 execution_id: None,
1369 message: format!(
1370 "ff_report_usage_and_check {sub_status}: {field_name} \
1371 (index {index}) wrong wire type (expected Int or String)"
1372 ),
1373 })),
1374 None => Err(SdkError::from(ScriptError::Parse {
1375 fcall: "parse_usage_u64".into(),
1376 execution_id: None,
1377 message: format!(
1378 "ff_report_usage_and_check {sub_status}: {field_name} \
1379 (index {index}) missing from response"
1380 ),
1381 })),
1382 }
1383}
1384
1385/// Pure helper: decide whether `suspension:current` represents a
1386/// signal-driven resume for the currently-claimed attempt, and extract
1387/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1388/// (no record, stale prior-attempt, non-resumed close). Returns an error
1389/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1390/// bug rather than a missing-data case.
1391///
1392/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1393/// `EngineBackend::observe_signals`, which re-implements this invariant
1394/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1395/// tests so the parsing contract stays exercised at the SDK layer —
1396/// Stage 1d will consolidate (either promote the helper into ff-core
1397/// or drop these tests once the backend-side tests cover equivalent
1398/// ground).
1399#[allow(dead_code)]
1400fn resume_waitpoint_id_from_suspension(
1401 susp: &HashMap<String, String>,
1402 claimed_attempt: AttemptIndex,
1403) -> Result<Option<WaitpointId>, SdkError> {
1404 if susp.is_empty() {
1405 return Ok(None);
1406 }
1407 let susp_att: u32 = susp
1408 .get("attempt_index")
1409 .and_then(|s| s.parse().ok())
1410 .unwrap_or(u32::MAX);
1411 if susp_att != claimed_attempt.0 {
1412 return Ok(None);
1413 }
1414 let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1415 if close_reason != "resumed" {
1416 return Ok(None);
1417 }
1418 let wp_id_str = susp
1419 .get("waitpoint_id")
1420 .map(String::as_str)
1421 .unwrap_or_default();
1422 if wp_id_str.is_empty() {
1423 return Ok(None);
1424 }
1425 let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1426 SdkError::from(ScriptError::Parse {
1427 fcall: "resume_waitpoint_id_from_suspension".into(),
1428 execution_id: None,
1429 message: format!(
1430 "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1431 ),
1432 })
1433 })?;
1434 Ok(Some(waitpoint_id))
1435}
1436
1437// v0.12 PR-5: inline SDK callers removed (the `claim_next` scanner
1438// now routes through `EngineBackend::issue_claim_grant` +
1439// `block_route`, whose Valkey impls own the parse). The helper is
1440// retained for the regression tests in this module (terminal-op
1441// replay detail extraction) until they move to ff-backend-valkey;
1442// future PRs may delete both the helper and its tests together.
1443#[cfg(feature = "valkey-default")]
1444#[allow(dead_code)]
1445pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1446 let arr = match raw {
1447 Value::Array(arr) => arr,
1448 _ => {
1449 return Err(SdkError::from(ScriptError::Parse {
1450 fcall: "parse_success_result".into(),
1451 execution_id: None,
1452 message: format!(
1453 "{function_name}: expected Array, got non-array"
1454 ),
1455 }));
1456 }
1457 };
1458
1459 if arr.is_empty() {
1460 return Err(SdkError::from(ScriptError::Parse {
1461 fcall: "parse_success_result".into(),
1462 execution_id: None,
1463 message: format!(
1464 "{function_name}: empty result array"
1465 ),
1466 }));
1467 }
1468
1469 let status_code = match arr.first() {
1470 Some(Ok(Value::Int(n))) => *n,
1471 _ => {
1472 return Err(SdkError::from(ScriptError::Parse {
1473 fcall: "parse_success_result".into(),
1474 execution_id: None,
1475 message: format!(
1476 "{function_name}: expected Int at index 0"
1477 ),
1478 }));
1479 }
1480 };
1481
1482 if status_code == 1 {
1483 Ok(())
1484 } else {
1485 // Extract error code from index 1 and optional detail from index 2
1486 // (e.g. `capability_mismatch` ships missing tokens there). Variants
1487 // that carry a String payload pick the detail up via
1488 // `from_code_with_detail`; other variants ignore it.
1489 let field_str = |idx: usize| -> String {
1490 arr.get(idx)
1491 .and_then(|v| match v {
1492 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1493 Ok(Value::SimpleString(s)) => Some(s.clone()),
1494 _ => None,
1495 })
1496 .unwrap_or_default()
1497 };
1498 let error_code = {
1499 let s = field_str(1);
1500 if s.is_empty() { "unknown".to_owned() } else { s }
1501 };
1502 // Collect all detail slots (idx >= 2). Most variants only read
1503 // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1504 // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1505 // reconciliation after a network drop.
1506 let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1507 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1508
1509 let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1510 .unwrap_or_else(|| {
1511 ScriptError::Parse {
1512 fcall: "parse_success_result".into(),
1513 execution_id: None,
1514 message: format!("{function_name}: unknown error: {error_code}"),
1515 }
1516 });
1517
1518 Err(SdkError::from(script_err))
1519 }
1520}
1521
1522// RFC-013 Stage 1d — `parse_suspend_result` removed. The backend impl
1523// in `ff_backend_valkey` now parses the Lua return and produces a typed
1524// `SuspendOutcome`; the SDK forwarder consumes that directly.
1525
1526/// Parse ff_deliver_signal result:
1527/// ok(signal_id, effect)
1528/// ok_duplicate(existing_signal_id)
1529#[cfg(feature = "valkey-default")]
1530pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1531 let arr = match raw {
1532 Value::Array(arr) => arr,
1533 _ => {
1534 return Err(SdkError::from(ScriptError::Parse {
1535 fcall: "parse_signal_result".into(),
1536 execution_id: None,
1537 message: "ff_deliver_signal: expected Array".into(),
1538 }));
1539 }
1540 };
1541
1542 let status_code = match arr.first() {
1543 Some(Ok(Value::Int(n))) => *n,
1544 _ => {
1545 return Err(SdkError::from(ScriptError::Parse {
1546 fcall: "parse_signal_result".into(),
1547 execution_id: None,
1548 message: "ff_deliver_signal: bad status code".into(),
1549 }));
1550 }
1551 };
1552
1553 if status_code != 1 {
1554 let err_field_str = |idx: usize| -> String {
1555 arr.get(idx)
1556 .and_then(|v| match v {
1557 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1558 Ok(Value::SimpleString(s)) => Some(s.clone()),
1559 _ => None,
1560 })
1561 .unwrap_or_default()
1562 };
1563 let error_code = {
1564 let s = err_field_str(1);
1565 if s.is_empty() { "unknown".to_owned() } else { s }
1566 };
1567 let detail = err_field_str(2);
1568 return Err(SdkError::from(
1569 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1570 ScriptError::Parse {
1571 fcall: "parse_signal_result".into(),
1572 execution_id: None,
1573 message: format!("ff_deliver_signal: {error_code}"),
1574 }
1575 }),
1576 ));
1577 }
1578
1579 let sub_status = arr
1580 .get(1)
1581 .and_then(|v| match v {
1582 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1583 Ok(Value::SimpleString(s)) => Some(s.clone()),
1584 _ => None,
1585 })
1586 .unwrap_or_default();
1587
1588 if sub_status == "DUPLICATE" {
1589 let existing_id = arr
1590 .get(2)
1591 .and_then(|v| match v {
1592 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1593 Ok(Value::SimpleString(s)) => Some(s.clone()),
1594 _ => None,
1595 })
1596 .unwrap_or_default();
1597 return Ok(SignalOutcome::Duplicate {
1598 existing_signal_id: existing_id,
1599 });
1600 }
1601
1602 // Parse: {1, "OK", signal_id, effect}
1603 let signal_id_str = arr
1604 .get(2)
1605 .and_then(|v| match v {
1606 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1607 Ok(Value::SimpleString(s)) => Some(s.clone()),
1608 _ => None,
1609 })
1610 .unwrap_or_default();
1611
1612 let effect = arr
1613 .get(3)
1614 .and_then(|v| match v {
1615 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1616 Ok(Value::SimpleString(s)) => Some(s.clone()),
1617 _ => None,
1618 })
1619 .unwrap_or_default();
1620
1621 let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1622 SdkError::from(ScriptError::Parse {
1623 fcall: "parse_signal_result".into(),
1624 execution_id: None,
1625 message: format!(
1626 "ff_deliver_signal: invalid signal_id from Lua: {e}"
1627 ),
1628 })
1629 })?;
1630
1631 if effect == "resume_condition_satisfied" {
1632 Ok(SignalOutcome::TriggeredResume { signal_id })
1633 } else {
1634 Ok(SignalOutcome::Accepted { signal_id, effect })
1635 }
1636}
1637
1638/// Parse ff_fail_execution result:
1639/// ok("retry_scheduled", delay_until)
1640/// ok("terminal_failed")
1641#[cfg(feature = "valkey-default")]
1642#[allow(dead_code)]
1643fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1644 let arr = match raw {
1645 Value::Array(arr) => arr,
1646 _ => {
1647 return Err(SdkError::from(ScriptError::Parse {
1648 fcall: "parse_fail_result".into(),
1649 execution_id: None,
1650 message: "ff_fail_execution: expected Array".into(),
1651 }));
1652 }
1653 };
1654
1655 let status_code = match arr.first() {
1656 Some(Ok(Value::Int(n))) => *n,
1657 _ => {
1658 return Err(SdkError::from(ScriptError::Parse {
1659 fcall: "parse_fail_result".into(),
1660 execution_id: None,
1661 message: "ff_fail_execution: bad status code".into(),
1662 }));
1663 }
1664 };
1665
1666 if status_code != 1 {
1667 let err_field_str = |idx: usize| -> String {
1668 arr.get(idx)
1669 .and_then(|v| match v {
1670 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1671 Ok(Value::SimpleString(s)) => Some(s.clone()),
1672 _ => None,
1673 })
1674 .unwrap_or_default()
1675 };
1676 let error_code = {
1677 let s = err_field_str(1);
1678 if s.is_empty() { "unknown".to_owned() } else { s }
1679 };
1680 let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1681 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1682 return Err(SdkError::from(
1683 ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1684 ScriptError::Parse {
1685 fcall: "parse_fail_result".into(),
1686 execution_id: None,
1687 message: format!("ff_fail_execution: {error_code}"),
1688 }
1689 }),
1690 ));
1691 }
1692
1693 // Parse sub-status from field[2] (index 2 = first field after status+OK)
1694 let sub_status = arr
1695 .get(2)
1696 .and_then(|v| match v {
1697 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1698 Ok(Value::SimpleString(s)) => Some(s.clone()),
1699 _ => None,
1700 })
1701 .unwrap_or_default();
1702
1703 match sub_status.as_str() {
1704 "retry_scheduled" => {
1705 // Lua returns: ok("retry_scheduled", tostring(delay_until))
1706 // arr[3] = delay_until
1707 let delay_str = arr
1708 .get(3)
1709 .and_then(|v| match v {
1710 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1711 Ok(Value::Int(n)) => Some(n.to_string()),
1712 _ => None,
1713 })
1714 .unwrap_or_default();
1715 let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1716
1717 Ok(FailOutcome::RetryScheduled {
1718 delay_until: TimestampMs::from_millis(delay_until),
1719 })
1720 }
1721 "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1722 _ => Err(SdkError::from(ScriptError::Parse {
1723 fcall: "parse_fail_result".into(),
1724 execution_id: None,
1725 message: format!(
1726 "ff_fail_execution: unexpected sub-status: {sub_status}"
1727 ),
1728 })),
1729 }
1730}
1731
1732// ── Stream read / tail (consumer API, RFC-006 #2) ──
1733
1734/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1735/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1736/// server would accept.
1737pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1738
1739/// Maximum frames per read/tail call. Mirrors
1740/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1741/// callers don't need to import ff-core just to read the bound.
1742pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1743
1744/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1745/// signal so polling consumers can exit cleanly.
1746///
1747/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1748pub use ff_core::contracts::StreamFrames;
1749
1750/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1751/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1752/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1753/// `StreamCursor::Start` / `StreamCursor::End` instead.
1754pub use ff_core::contracts::StreamCursor;
1755
1756/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1757/// — XREAD does not accept the open markers. Pulled out as a bare
1758/// function so unit tests can exercise the guard without constructing a
1759/// live `ferriskey::Client`.
1760#[allow(dead_code)]
1761fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1762 if !after.is_concrete() {
1763 return Err(SdkError::Config {
1764 context: "tail_stream".into(),
1765 field: Some("after".into()),
1766 message: "XREAD cursor must be a concrete entry id; pass \
1767 StreamCursor::from_beginning() to start from the \
1768 beginning"
1769 .into(),
1770 });
1771 }
1772 Ok(())
1773}
1774
1775#[allow(dead_code)]
1776fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1777 if count_limit == 0 {
1778 return Err(SdkError::Config {
1779 context: "read_stream_frames".into(),
1780 field: Some("count_limit".into()),
1781 message: "count_limit must be >= 1".into(),
1782 });
1783 }
1784 if count_limit > STREAM_READ_HARD_CAP {
1785 return Err(SdkError::Config {
1786 context: "read_stream_frames".into(),
1787 field: Some("count_limit".into()),
1788 message: format!(
1789 "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1790 ),
1791 });
1792 }
1793 Ok(())
1794}
1795
1796/// Read frames from a completed or in-flight attempt's stream.
1797///
1798/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1799/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1800/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1801/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1802/// `0` returns [`SdkError::Config`].
1803///
1804/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1805/// consumers know when the producer has finalized the stream. A
1806/// never-written attempt and an in-progress stream are indistinguishable
1807/// here — both present as `frames=[]`, `closed_at=None`.
1808///
1809/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1810/// client but are not the lease-holding worker — no lease check is
1811/// performed.
1812///
1813/// # Head-of-line note
1814///
1815/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1816/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1817/// calling this on a `client` that is also serving FCALLs stalls those
1818/// FCALLs behind the reply. The REST server isolates reads on its
1819/// `tail_client`; direct SDK callers should either use a dedicated
1820/// client OR paginate through smaller `count_limit` slices.
1821#[cfg(feature = "valkey-default")]
1822pub async fn read_stream(
1823 backend: &dyn EngineBackend,
1824 execution_id: &ExecutionId,
1825 attempt_index: AttemptIndex,
1826 from: StreamCursor,
1827 to: StreamCursor,
1828 count_limit: u64,
1829) -> Result<StreamFrames, SdkError> {
1830 validate_stream_read_count(count_limit)?;
1831 Ok(backend
1832 .read_stream(execution_id, attempt_index, from, to, count_limit)
1833 .await?)
1834}
1835
1836/// Tail a live attempt's stream.
1837///
1838/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1839/// with id strictly greater than `after`. Pass
1840/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1841/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1842/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1843/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1844///
1845/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1846/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1847/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1848/// SDK and REST ceilings aligned.
1849///
1850/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1851/// polling consumers should loop until `result.is_closed()` is true, then
1852/// drain and exit. Timeout with no new frames presents as
1853/// `frames=[], closed_at=None`.
1854///
1855/// # Head-of-line warning — use a dedicated client
1856///
1857/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1858/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1859/// the read side until a frame arrives or the block elapses. If the
1860/// `client` you pass here is ALSO used for claims, completes, fails,
1861/// appends, or any other FCALL, a 30-second tail will stall all those
1862/// calls for up to 30 seconds.
1863///
1864/// **Strongly recommended**: build a separate `ferriskey::Client` for
1865/// tail callers — mirrors the `Server::tail_client` split that the REST
1866/// server uses internally (see `crates/ff-server/src/server.rs` and
1867/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1868///
1869/// # Tail parallelism caveat (same mux)
1870///
1871/// Even a dedicated tail client is still one multiplexed TCP connection.
1872/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1873/// ferriskey's per-call `request_timeout` starts at future-poll — so
1874/// two concurrent tails against the same client can time out spuriously:
1875/// the second call's BLOCK budget elapses while it waits for the first
1876/// BLOCK to return. The REST server handles this internally with a
1877/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1878/// each call its full `block_ms` budget at the server.
1879///
1880/// **Direct SDK callers that need concurrent tails**: either
1881/// (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1882/// pool of clients, rotated by the caller), OR
1883/// (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1884/// only one BLOCK is in flight per client at a time.
1885/// If you need the REST-side backpressure (429 on contention) and the
1886/// built-in serializer, go through the
1887/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1888/// than calling this directly.
1889///
1890/// This SDK does not enforce either pattern — the mutex belongs at the
1891/// application layer, and the connection pool belongs at the SDK
1892/// caller's DI layer; neither has a structured place inside this
1893/// helper.
1894///
1895/// # Timeout handling
1896///
1897/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1898/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1899/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1900/// 500ms)`, overriding the client's default per-call. A tail with
1901/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1902/// the client was built with a shorter `request_timeout`. No custom client
1903/// configuration is required for timeout reasons — only for head-of-line
1904/// isolation above.
1905#[cfg(feature = "valkey-default")]
1906pub async fn tail_stream(
1907 backend: &dyn EngineBackend,
1908 execution_id: &ExecutionId,
1909 attempt_index: AttemptIndex,
1910 after: StreamCursor,
1911 block_ms: u64,
1912 count_limit: u64,
1913) -> Result<StreamFrames, SdkError> {
1914 tail_stream_with_visibility(
1915 backend,
1916 execution_id,
1917 attempt_index,
1918 after,
1919 block_ms,
1920 count_limit,
1921 ff_core::backend::TailVisibility::All,
1922 )
1923 .await
1924}
1925
1926/// Tail helper with an explicit RFC-015
1927/// [`TailVisibility`](ff_core::backend::TailVisibility) filter.
1928#[cfg(feature = "valkey-default")]
1929pub async fn tail_stream_with_visibility(
1930 backend: &dyn EngineBackend,
1931 execution_id: &ExecutionId,
1932 attempt_index: AttemptIndex,
1933 after: StreamCursor,
1934 block_ms: u64,
1935 count_limit: u64,
1936 visibility: ff_core::backend::TailVisibility,
1937) -> Result<StreamFrames, SdkError> {
1938 if block_ms > MAX_TAIL_BLOCK_MS {
1939 return Err(SdkError::Config {
1940 context: "tail_stream".into(),
1941 field: Some("block_ms".into()),
1942 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1943 });
1944 }
1945 validate_stream_read_count(count_limit)?;
1946 // XREAD does not accept `-` / `+` markers as cursors — reject at
1947 // the SDK boundary with `SdkError::Config` rather than forwarding
1948 // an invalid `-`/`+` into the backend (which would surface as an
1949 // opaque `EngineError::Transport`).
1950 validate_tail_cursor(&after)?;
1951
1952 Ok(backend
1953 .tail_stream(
1954 execution_id,
1955 attempt_index,
1956 after,
1957 block_ms,
1958 count_limit,
1959 visibility,
1960 )
1961 .await?)
1962}
1963
1964#[cfg(all(test, feature = "valkey-default"))]
1965mod tail_stream_boundary_tests {
1966 use super::*;
1967
1968 // `validate_tail_cursor` rejects `StreamCursor::Start` and
1969 // `StreamCursor::End` before `tail_stream` touches the client —
1970 // same shape as the `count_limit` guard above. The matching
1971 // full-path rejection on the REST layer is covered by
1972 // `ff-server::api`.
1973
1974 #[test]
1975 fn rejects_start_cursor() {
1976 let err = validate_tail_cursor(&StreamCursor::Start)
1977 .expect_err("Start must be rejected");
1978 match err {
1979 SdkError::Config { field, context, .. } => {
1980 assert_eq!(field.as_deref(), Some("after"));
1981 assert_eq!(context, "tail_stream");
1982 }
1983 other => panic!("expected SdkError::Config, got {other:?}"),
1984 }
1985 }
1986
1987 #[test]
1988 fn rejects_end_cursor() {
1989 let err = validate_tail_cursor(&StreamCursor::End)
1990 .expect_err("End must be rejected");
1991 assert!(matches!(err, SdkError::Config { .. }));
1992 }
1993
1994 #[test]
1995 fn accepts_at_cursor() {
1996 validate_tail_cursor(&StreamCursor::At("0-0".into()))
1997 .expect("At cursor must be accepted");
1998 validate_tail_cursor(&StreamCursor::from_beginning())
1999 .expect("from_beginning() must be accepted");
2000 validate_tail_cursor(&StreamCursor::At("123-0".into()))
2001 .expect("concrete id must be accepted");
2002 }
2003}
2004
2005#[cfg(all(test, feature = "valkey-default"))]
2006mod parse_report_usage_result_tests {
2007 use super::*;
2008
2009 /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
2010 /// BulkString and SimpleString uniformly (see
2011 /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
2012 /// `Value::SimpleString(s)` → clone). SimpleString avoids a
2013 /// dev-dependency on `bytes` just for test construction.
2014 fn s(v: &str) -> Result<Value, ferriskey::Error> {
2015 Ok(Value::SimpleString(v.to_owned()))
2016 }
2017
2018 fn int(n: i64) -> Result<Value, ferriskey::Error> {
2019 Ok(Value::Int(n))
2020 }
2021
2022 fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2023 Value::Array(items)
2024 }
2025
2026 #[test]
2027 fn ok_status() {
2028 let raw = arr(vec![int(1), s("OK")]);
2029 assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2030 }
2031
2032 #[test]
2033 fn already_applied_status() {
2034 let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2035 assert_eq!(
2036 parse_report_usage_result(&raw).unwrap(),
2037 ReportUsageResult::AlreadyApplied
2038 );
2039 }
2040
2041 #[test]
2042 fn soft_breach_status() {
2043 let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2044 match parse_report_usage_result(&raw).unwrap() {
2045 ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2046 assert_eq!(dimension, "tokens");
2047 assert_eq!(current_usage, 150);
2048 assert_eq!(soft_limit, 100);
2049 }
2050 other => panic!("expected SoftBreach, got {other:?}"),
2051 }
2052 }
2053
2054 #[test]
2055 fn hard_breach_status() {
2056 let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2057 match parse_report_usage_result(&raw).unwrap() {
2058 ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2059 assert_eq!(dimension, "requests");
2060 assert_eq!(current_usage, 10001);
2061 assert_eq!(hard_limit, 10000);
2062 }
2063 other => panic!("expected HardBreach, got {other:?}"),
2064 }
2065 }
2066
2067 /// Negative case: non-Array input. Guards against a future Lua refactor
2068 /// that accidentally returns a bare string/int — the parser must fail
2069 /// loudly rather than silently succeed or panic.
2070 #[test]
2071 fn non_array_input_is_parse_error() {
2072 let raw = Value::SimpleString("OK".to_owned());
2073 let err = parse_report_usage_result(&raw).unwrap_err();
2074 let msg = format!("{err}");
2075 assert!(
2076 msg.to_lowercase().contains("expected array"),
2077 "error should mention expected shape, got: {msg}"
2078 );
2079 }
2080
2081 /// Negative case: Array whose first element isn't an Int status code.
2082 /// The Lua function's first return slot is always `status_code` (1 on
2083 /// success, an error code otherwise); a non-Int there is a wire-format
2084 /// break that must surface as a parse error.
2085 #[test]
2086 fn first_element_non_int_is_parse_error() {
2087 let raw = arr(vec![s("not_an_int"), s("OK")]);
2088 let err = parse_report_usage_result(&raw).unwrap_err();
2089 let msg = format!("{err}");
2090 assert!(
2091 msg.to_lowercase().contains("int"),
2092 "error should mention Int status code, got: {msg}"
2093 );
2094 }
2095
2096 /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2097 /// field. Guards against the silent-coercion defect cross-review
2098 /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2099 /// which would have surfaced Lua-side wire-format drift as a
2100 /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2101 /// but semantically wrong (a "breach with zero usage" is nonsense
2102 /// and masks the real error).
2103 #[test]
2104 fn soft_breach_non_numeric_current_is_parse_error() {
2105 let raw = arr(vec![
2106 int(1),
2107 s("SOFT_BREACH"),
2108 s("tokens"),
2109 s("not_a_number"), // current_usage — must fail, not coerce to 0
2110 s("100"),
2111 ]);
2112 let err = parse_report_usage_result(&raw).unwrap_err();
2113 let msg = format!("{err}");
2114 assert!(
2115 msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2116 "error should identify sub-status + field, got: {msg}"
2117 );
2118 assert!(
2119 msg.to_lowercase().contains("u64"),
2120 "error should mention the expected type (u64), got: {msg}"
2121 );
2122 }
2123
2124 /// Negative case: HARD_BREACH with the limit slot missing
2125 /// entirely. Same defence as the non-numeric test above: a
2126 /// truncated response must fail loudly rather than coerce to 0.
2127 #[test]
2128 fn hard_breach_missing_limit_is_parse_error() {
2129 let raw = arr(vec![
2130 int(1),
2131 s("HARD_BREACH"),
2132 s("requests"),
2133 s("10001"),
2134 // no index 4 — hard_limit missing
2135 ]);
2136 let err = parse_report_usage_result(&raw).unwrap_err();
2137 let msg = format!("{err}");
2138 assert!(
2139 msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2140 "error should identify sub-status + field, got: {msg}"
2141 );
2142 assert!(
2143 msg.to_lowercase().contains("missing"),
2144 "error should say 'missing', got: {msg}"
2145 );
2146 }
2147}
2148
2149/// RFC-014 helper: pick the first waitpoint_key out of a composite
2150/// tree so `try_suspend_inner` can rebind a Fresh waitpoint to the
2151/// user-supplied key. Single-waitpoint scoping: all
2152/// Single.waitpoint_key / Count.waitpoints[i] in the tree must be
2153/// equal; this function returns the first one it encounters.
2154fn composite_first_waitpoint_key(body: &CompositeBody) -> Option<String> {
2155 match body {
2156 CompositeBody::AllOf { members } => members.iter().find_map(|m| match m {
2157 ResumeCondition::Single { waitpoint_key, .. } => Some(waitpoint_key.clone()),
2158 ResumeCondition::Composite(inner) => composite_first_waitpoint_key(inner),
2159 _ => None,
2160 }),
2161 CompositeBody::Count { waitpoints, .. } => waitpoints.first().cloned(),
2162 _ => None,
2163 }
2164}
2165
2166
2167#[cfg(test)]
2168mod resume_signals_tests {
2169 use super::*;
2170
2171 fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2172 pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2173 }
2174
2175 #[test]
2176 fn empty_suspension_returns_none() {
2177 let susp = m(&[]);
2178 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2179 assert!(out.is_none(), "no suspension record → None");
2180 }
2181
2182 #[test]
2183 fn stale_prior_attempt_returns_none() {
2184 let wp = WaitpointId::new();
2185 let susp = m(&[
2186 ("attempt_index", "0"),
2187 ("close_reason", "resumed"),
2188 ("waitpoint_id", &wp.to_string()),
2189 ]);
2190 // Claimed attempt is 1; suspension belongs to 0 → stale.
2191 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2192 assert!(out.is_none(), "attempt_index mismatch → None");
2193 }
2194
2195 #[test]
2196 fn non_resumed_close_returns_none() {
2197 let wp = WaitpointId::new();
2198 for reason in ["timeout", "cancelled", "", "expired"] {
2199 let susp = m(&[
2200 ("attempt_index", "0"),
2201 ("close_reason", reason),
2202 ("waitpoint_id", &wp.to_string()),
2203 ]);
2204 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2205 assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2206 }
2207 }
2208
2209 #[test]
2210 fn resumed_same_attempt_returns_waitpoint() {
2211 let wp = WaitpointId::new();
2212 let susp = m(&[
2213 ("attempt_index", "2"),
2214 ("close_reason", "resumed"),
2215 ("waitpoint_id", &wp.to_string()),
2216 ]);
2217 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2218 assert_eq!(out, Some(wp));
2219 }
2220
2221 #[test]
2222 fn malformed_waitpoint_id_is_error() {
2223 let susp = m(&[
2224 ("attempt_index", "0"),
2225 ("close_reason", "resumed"),
2226 ("waitpoint_id", "not-a-uuid"),
2227 ]);
2228 let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2229 assert!(
2230 format!("{err}").contains("not a valid UUID"),
2231 "error should mention invalid UUID, got: {err}"
2232 );
2233 }
2234
2235 #[test]
2236 fn empty_waitpoint_id_returns_none() {
2237 // Defensive: an empty waitpoint_id field (shouldn't happen on
2238 // resumed records, but guard against partial writes) is None, not an error.
2239 let susp = m(&[
2240 ("attempt_index", "0"),
2241 ("close_reason", "resumed"),
2242 ("waitpoint_id", ""),
2243 ]);
2244 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2245 assert!(out.is_none());
2246 }
2247
2248 // The previous `matched_signal_ids_from_condition` helper was
2249 // removed when the production path switched from unbounded
2250 // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2251 // feedback on unbounded condition-hash reply size). That loop
2252 // is exercised by the integration tests in `ff-test`.
2253}
2254
2255#[cfg(all(test, feature = "valkey-default"))]
2256mod terminal_replay_parsing_tests {
2257 //! Unit tests for the SDK's parse path of the enriched
2258 //! `execution_not_active` error returned on a terminal-op replay.
2259 //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2260 //! the Lua side emits the 4-slot detail; these tests prove the
2261 //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2262 //! variant so the reconciler in complete()/fail()/cancel() can
2263 //! match on them.
2264
2265 use super::*;
2266 use ferriskey::Value;
2267
2268 // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2269 // in the test harness. parse_success_result + parse_fail_result handle both.
2270 fn bulk(s: &str) -> Value {
2271 Value::SimpleString(s.to_owned())
2272 }
2273
2274 /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2275 #[test]
2276 fn parse_success_result_extracts_all_four_detail_slots() {
2277 let raw = Value::Array(vec![
2278 Ok(Value::Int(0)),
2279 Ok(bulk("execution_not_active")),
2280 Ok(bulk("success")),
2281 Ok(bulk("42")),
2282 Ok(bulk("terminal")),
2283 Ok(bulk("11111111-1111-1111-1111-111111111111")),
2284 ]);
2285 let err = parse_success_result(&raw, "test").unwrap_err();
2286 let unboxed = match err {
2287 SdkError::Engine(b) => *b,
2288 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2289 };
2290 match unboxed {
2291 crate::EngineError::Contention(
2292 crate::ContentionKind::ExecutionNotActive {
2293 terminal_outcome,
2294 lease_epoch,
2295 lifecycle_phase,
2296 attempt_id,
2297 },
2298 ) => {
2299 assert_eq!(terminal_outcome, "success");
2300 assert_eq!(lease_epoch, "42");
2301 assert_eq!(lifecycle_phase, "terminal");
2302 assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2303 }
2304 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2305 }
2306 }
2307
2308 /// parse_fail_result must extract all detail slots too so the
2309 /// reconciler in fail() can match lifecycle_phase = "runnable" for
2310 /// retry-scheduled replays.
2311 #[test]
2312 fn parse_fail_result_extracts_all_four_detail_slots() {
2313 let raw = Value::Array(vec![
2314 Ok(Value::Int(0)),
2315 Ok(bulk("execution_not_active")),
2316 Ok(bulk("none")),
2317 Ok(bulk("7")),
2318 Ok(bulk("runnable")),
2319 Ok(bulk("22222222-2222-2222-2222-222222222222")),
2320 ]);
2321 let err = parse_fail_result(&raw).unwrap_err();
2322 let unboxed = match err {
2323 SdkError::Engine(b) => *b,
2324 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2325 };
2326 match unboxed {
2327 crate::EngineError::Contention(
2328 crate::ContentionKind::ExecutionNotActive {
2329 terminal_outcome,
2330 lease_epoch,
2331 lifecycle_phase,
2332 attempt_id,
2333 },
2334 ) => {
2335 assert_eq!(terminal_outcome, "none");
2336 assert_eq!(lease_epoch, "7");
2337 assert_eq!(lifecycle_phase, "runnable");
2338 assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2339 }
2340 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2341 }
2342 }
2343
2344 /// Empty detail slots must default to "" (not panic) so older-Lua
2345 /// producers or malformed replies degrade to an unreconcilable
2346 /// variant rather than a Parse error.
2347 #[test]
2348 fn parse_success_result_missing_slots_defaults_to_empty() {
2349 let raw = Value::Array(vec![
2350 Ok(Value::Int(0)),
2351 Ok(bulk("execution_not_active")),
2352 ]);
2353 let err = parse_success_result(&raw, "test").unwrap_err();
2354 let unboxed = match err {
2355 SdkError::Engine(b) => *b,
2356 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2357 };
2358 match unboxed {
2359 crate::EngineError::Contention(
2360 crate::ContentionKind::ExecutionNotActive {
2361 terminal_outcome,
2362 lease_epoch,
2363 lifecycle_phase,
2364 attempt_id,
2365 },
2366 ) => {
2367 assert_eq!(terminal_outcome, "");
2368 assert_eq!(lease_epoch, "");
2369 assert_eq!(lifecycle_phase, "");
2370 assert_eq!(attempt_id, "");
2371 }
2372 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2373 }
2374 }
2375}