ff_sdk/task.rs
1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_script::error::ScriptError;
11use ff_core::keys::{usage_dedup_key, BudgetKeyContext, ExecKeyContext, IndexKeys};
12use ff_core::partition::{budget_partition, execution_partition, PartitionConfig};
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21/// Timeout behavior when a suspension deadline is reached.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum TimeoutBehavior {
24 Fail,
25 Cancel,
26 Expire,
27 AutoResume,
28 Escalate,
29}
30
31impl TimeoutBehavior {
32 pub fn as_str(&self) -> &str {
33 match self {
34 Self::Fail => "fail",
35 Self::Cancel => "cancel",
36 Self::Expire => "expire",
37 Self::AutoResume => "auto_resume_with_timeout_signal",
38 Self::Escalate => "escalate",
39 }
40 }
41}
42
43impl std::fmt::Display for TimeoutBehavior {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.write_str(self.as_str())
46 }
47}
48
49impl std::str::FromStr for TimeoutBehavior {
50 type Err = String;
51
52 fn from_str(s: &str) -> Result<Self, Self::Err> {
53 match s {
54 "fail" => Ok(Self::Fail),
55 "cancel" => Ok(Self::Cancel),
56 "expire" => Ok(Self::Expire),
57 "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
58 "escalate" => Ok(Self::Escalate),
59 other => Err(format!("unknown timeout behavior: {other}")),
60 }
61 }
62}
63
64/// A condition matcher for the resume condition.
65#[derive(Clone, Debug)]
66pub struct ConditionMatcher {
67 /// Signal name to match (empty = wildcard).
68 pub signal_name: String,
69}
70
71/// Outcome of a `suspend()` call.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub enum SuspendOutcome {
74 /// Execution is now suspended, waitpoint active.
75 Suspended {
76 suspension_id: SuspensionId,
77 waitpoint_id: WaitpointId,
78 waitpoint_key: String,
79 /// HMAC token that signal deliverers must supply to this waitpoint.
80 waitpoint_token: WaitpointToken,
81 },
82 /// Buffered signals already satisfied the condition — suspension skipped.
83 /// The caller still holds the lease.
84 AlreadySatisfied {
85 suspension_id: SuspensionId,
86 waitpoint_id: WaitpointId,
87 waitpoint_key: String,
88 waitpoint_token: WaitpointToken,
89 },
90}
91
92/// A signal to deliver to a suspended execution's waitpoint.
93#[derive(Clone, Debug)]
94pub struct Signal {
95 pub signal_name: String,
96 pub signal_category: String,
97 pub payload: Option<Vec<u8>>,
98 pub source_type: String,
99 pub source_identity: String,
100 pub idempotency_key: Option<String>,
101 /// HMAC token issued when the waitpoint was created. Required for
102 /// authenticated signal delivery (RFC-004 §Waitpoint Security).
103 pub waitpoint_token: WaitpointToken,
104}
105
106/// Outcome of `deliver_signal()`.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum SignalOutcome {
109 /// Signal accepted, appended to waitpoint but condition not yet satisfied.
110 Accepted { signal_id: SignalId, effect: String },
111 /// Signal triggered resume — execution is now runnable.
112 TriggeredResume { signal_id: SignalId },
113 /// Duplicate signal (idempotency key matched).
114 Duplicate { existing_signal_id: String },
115}
116
117impl SignalOutcome {
118 /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
119 ///
120 /// Consuming packages that call `ff_deliver_signal` directly can use this
121 /// to interpret the Lua return value without depending on SDK internals.
122 pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
123 parse_signal_result(raw)
124 }
125}
126
127/// A signal that triggered a resume, readable by a worker after re-claim.
128///
129/// **RFC-012 Stage 0:** the canonical definition has moved to
130/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
131/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
132/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
133/// scheduled for removal in 0.5.0.
134pub use ff_core::backend::ResumeSignal;
135
136/// Outcome of `append_frame()`.
137#[derive(Clone, Debug)]
138pub struct AppendFrameOutcome {
139 /// Valkey Stream entry ID assigned to this frame.
140 pub stream_id: String,
141 /// Total frame count in the stream after this append.
142 pub frame_count: u64,
143}
144
145/// Outcome of a `fail()` call.
146///
147/// **RFC-012 Stage 1a:** canonical definition moved to
148/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
149/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
150/// re-export in `lib.rs`) through the 0.4.x window. Existing
151/// consumers construct + match exhaustively without change.
152pub use ff_core::backend::FailOutcome;
153
154/// A claimed execution with an active lease. The worker processes this task
155/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
156///
157/// The lease is automatically renewed in the background at `lease_ttl / 3`
158/// intervals. Renewal stops when the task is consumed or dropped.
159///
160/// `complete`, `fail`, and `cancel` consume `self` — this prevents
161/// double-complete bugs at the type level.
162pub struct ClaimedTask {
163 /// Shared Valkey client.
164 client: Client,
165 /// `EngineBackend` the trait-migrated ops forward through.
166 ///
167 /// **RFC-012 Stage 1b.** Today this is always a `ValkeyBackend`
168 /// wrapping `client`. 8 of 12 ops now route through
169 /// `backend.*()`; the remaining 4 (`create_pending_waitpoint`,
170 /// `append_frame`, `suspend`, `report_usage`) still use
171 /// `client.fcall(...)` directly — see issue #117 for the
172 /// trait-shape alignment that unblocks them.
173 ///
174 /// Stage 1c will migrate the FlowFabricWorker hot paths (claim,
175 /// deliver_signal) through the same trait surface; Stage 1d
176 /// refactors this struct to carry a single `Handle` rather than
177 /// synthesising one per op via `synth_handle`.
178 backend: Arc<dyn EngineBackend>,
179 /// Partition config for key construction.
180 partition_config: PartitionConfig,
181 /// Execution identity.
182 execution_id: ExecutionId,
183 /// Current attempt.
184 attempt_index: AttemptIndex,
185 attempt_id: AttemptId,
186 /// Lease identity.
187 lease_id: LeaseId,
188 lease_epoch: LeaseEpoch,
189 /// Lease timing.
190 lease_ttl_ms: u64,
191 /// Lane used at claim time.
192 lane_id: LaneId,
193 /// Worker instance that holds this lease (for index cleanup keys).
194 worker_instance_id: WorkerInstanceId,
195 /// Execution data.
196 input_payload: Vec<u8>,
197 execution_kind: String,
198 tags: HashMap<String, String>,
199 /// Background renewal task handle.
200 renewal_handle: JoinHandle<()>,
201 /// Signal to stop renewal (used before consuming self).
202 renewal_stop: Arc<Notify>,
203 /// Consecutive lease renewal failures. Shared with the background renewal
204 /// task. Reset to 0 on each successful renewal. Workers should check
205 /// `is_lease_healthy()` before committing expensive side effects.
206 renewal_failures: Arc<AtomicU32>,
207 /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
208 /// response is received, just before `self` is consumed into `Drop`.
209 /// `Drop` reads this instead of `renewal_handle.is_finished()` to
210 /// suppress the false-positive "dropped without terminal operation"
211 /// warning: after `notify_one`, the renewal task has not yet been
212 /// polled by the runtime, so `is_finished()` is still `false` on the
213 /// happy path when self is being consumed.
214 ///
215 /// Note: the flag is set for any terminal-op path that reaches
216 /// `stop_renewal()`, which includes Lua-level script errors (the
217 /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
218 /// the caller already receives the `Err` via the op's return value,
219 /// so an additional `Drop` warning would be noise. The warning is
220 /// reserved for genuine drop-without-terminal-op cases (panic, early
221 /// return, transport failure before stop_renewal ran).
222 terminal_op_called: AtomicBool,
223 /// Concurrency permit from the worker's semaphore. Held for the lifetime
224 /// of the task; released on complete/fail/cancel/drop.
225 _concurrency_permit: Option<OwnedSemaphorePermit>,
226}
227
228impl ClaimedTask {
229 /// Construct a `ClaimedTask` from the results of a successful
230 /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
231 ///
232 /// # Arguments
233 ///
234 /// * `client` — shared Valkey client used for subsequent lease
235 /// renewals, signal delivery, and the final
236 /// complete/fail/cancel FCALL.
237 /// * `partition_config` — partition topology snapshot read at
238 /// `FlowFabricWorker::connect`. Used for key construction on
239 /// the lifetime of this task.
240 /// * `execution_id` — the claimed execution's UUID.
241 /// * `attempt_index` / `attempt_id` — current attempt identity.
242 /// `attempt_index` is 0 on a fresh claim, preserved on a
243 /// resumed claim.
244 /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
245 /// is returned by the Lua FCALL and bumped on each resumed
246 /// claim.
247 /// * `lease_ttl_ms` — lease TTL used to schedule the background
248 /// renewal task (renews at `lease_ttl_ms / 3`).
249 /// * `lane_id` — the lane the task was claimed on. Used for
250 /// index key construction (`lane_active`, `lease_expiry`,
251 /// etc.).
252 /// * `worker_instance_id` — this worker's identity. Used to
253 /// resolve `worker_leases` index entries during renewal and
254 /// completion.
255 /// * `input_payload` / `execution_kind` / `tags` — pre-read
256 /// execution metadata, exposed via getters so the worker
257 /// doesn't round-trip to Valkey to inspect what it just
258 /// claimed.
259 ///
260 /// # Invariant: constructor is `pub(crate)` on purpose
261 ///
262 /// **External callers cannot construct a `ClaimedTask`** — only
263 /// the in-crate claim entry points (`claim_next`,
264 /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
265 /// load-bearing: those entry points are the ONLY sites that
266 /// acquire a permit from the worker's concurrency semaphore
267 /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
268 /// it through `ClaimedTask::set_concurrency_permit` before
269 /// returning the task. Promoting `new` to `pub` would let
270 /// consumers build tasks that bypass the concurrency contract
271 /// the worker's `max_concurrent_tasks` config advertises — the
272 /// returned task would run alongside other in-flight work
273 /// without debiting the permit bank, and the
274 /// complete/fail/cancel/drop path would have nothing to
275 /// release.
276 ///
277 /// If an external callsite genuinely needs to rehydrate a
278 /// task from saved FCALL results, the right answer is a new
279 /// `FlowFabricWorker` entry point that wraps `new` + acquires a
280 /// permit — not promoting this constructor.
281 #[allow(clippy::too_many_arguments)]
282 pub(crate) fn new(
283 client: Client,
284 backend: Arc<dyn EngineBackend>,
285 partition_config: PartitionConfig,
286 execution_id: ExecutionId,
287 attempt_index: AttemptIndex,
288 attempt_id: AttemptId,
289 lease_id: LeaseId,
290 lease_epoch: LeaseEpoch,
291 lease_ttl_ms: u64,
292 lane_id: LaneId,
293 worker_instance_id: WorkerInstanceId,
294 input_payload: Vec<u8>,
295 execution_kind: String,
296 tags: HashMap<String, String>,
297 ) -> Self {
298 let renewal_stop = Arc::new(Notify::new());
299 let renewal_failures = Arc::new(AtomicU32::new(0));
300
301 // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
302 // Build the handle once at construction time and hand both Arc clones
303 // into the spawned task.
304 let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
305 execution_id.clone(),
306 attempt_index,
307 attempt_id.clone(),
308 lease_id.clone(),
309 lease_epoch,
310 lease_ttl_ms,
311 lane_id.clone(),
312 worker_instance_id.clone(),
313 HandleKind::Fresh,
314 );
315 let renewal_handle = spawn_renewal_task(
316 backend.clone(),
317 renewal_handle_cookie,
318 execution_id.clone(),
319 lease_ttl_ms,
320 renewal_stop.clone(),
321 renewal_failures.clone(),
322 );
323
324 Self {
325 client,
326 backend,
327 partition_config,
328 execution_id,
329 attempt_index,
330 attempt_id,
331 lease_id,
332 lease_epoch,
333 lease_ttl_ms,
334 lane_id,
335 worker_instance_id,
336 input_payload,
337 execution_kind,
338 tags,
339 renewal_handle,
340 renewal_stop,
341 renewal_failures,
342 terminal_op_called: AtomicBool::new(false),
343 _concurrency_permit: None,
344 }
345 }
346
347 /// Attach a concurrency permit from the worker's semaphore.
348 /// The permit is held for the task's lifetime and released on drop.
349 #[allow(dead_code)]
350 pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
351 self._concurrency_permit = Some(permit);
352 }
353
354 // ── Accessors ──
355
356 pub fn execution_id(&self) -> &ExecutionId {
357 &self.execution_id
358 }
359
360 pub fn attempt_index(&self) -> AttemptIndex {
361 self.attempt_index
362 }
363
364 pub fn attempt_id(&self) -> &AttemptId {
365 &self.attempt_id
366 }
367
368 pub fn lease_id(&self) -> &LeaseId {
369 &self.lease_id
370 }
371
372 pub fn lease_epoch(&self) -> LeaseEpoch {
373 self.lease_epoch
374 }
375
376 pub fn input_payload(&self) -> &[u8] {
377 &self.input_payload
378 }
379
380 pub fn execution_kind(&self) -> &str {
381 &self.execution_kind
382 }
383
384 pub fn tags(&self) -> &HashMap<String, String> {
385 &self.tags
386 }
387
388 pub fn lane_id(&self) -> &LaneId {
389 &self.lane_id
390 }
391
392 /// Synthesise a Valkey-backend `Handle` encoding this task's
393 /// attempt-cookie fields. Private to the SDK — the trait
394 /// forwarders call this per op to produce the `&Handle` argument
395 /// the `EngineBackend` methods take.
396 ///
397 /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
398 /// carry one cached `Handle` instead of synthesising per op is
399 /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
400 /// a single allocation of a small byte buffer — negligible
401 /// compared to the FCALL round-trip that follows.
402 ///
403 /// Kind is always `HandleKind::Fresh` because today the 9
404 /// Stage-1b ops all treat the backend tag + opaque bytes as
405 /// authoritative; they do not dispatch on kind. Stage 1d routes
406 /// resumed-claim callers through a `Resumed` synth.
407 fn synth_handle(&self) -> Handle {
408 ff_backend_valkey::ValkeyBackend::encode_handle(
409 self.execution_id.clone(),
410 self.attempt_index,
411 self.attempt_id.clone(),
412 self.lease_id.clone(),
413 self.lease_epoch,
414 self.lease_ttl_ms,
415 self.lane_id.clone(),
416 self.worker_instance_id.clone(),
417 HandleKind::Fresh,
418 )
419 }
420
421 /// Check if the lease is likely still valid based on renewal success.
422 ///
423 /// Returns `false` if 3 or more consecutive renewal attempts have failed.
424 /// Workers should check this before committing expensive or irreversible
425 /// side effects. A `false` return means Valkey may have already expired
426 /// the lease and another worker could be processing this execution.
427 pub fn is_lease_healthy(&self) -> bool {
428 self.renewal_failures.load(Ordering::Relaxed) < 3
429 }
430
431 /// Number of consecutive lease renewal failures since the last success.
432 ///
433 /// Returns 0 when renewals are working normally. Useful for observability
434 /// and custom health policies beyond the default threshold of 3.
435 pub fn consecutive_renewal_failures(&self) -> u32 {
436 self.renewal_failures.load(Ordering::Relaxed)
437 }
438
439 // ── Terminal operations (consume self) ──
440
441 /// Delay the execution until `delay_until`.
442 ///
443 /// Releases the lease. The execution moves to `delayed` state.
444 /// Consumes self — the task cannot be used after delay.
445 pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
446 // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
447 // the pre-migration `stop_renewal` contract: called only when
448 // the FCALL round-tripped (Ok or a typed Lua/engine error);
449 // raw transport errors bubble up without stopping renewal so
450 // the `Drop` warning still fires if the caller later drops
451 // `self` without retrying.
452 let handle = self.synth_handle();
453 let out = self.backend.delay(&handle, delay_until).await;
454 if fcall_landed(&out) {
455 self.stop_renewal();
456 }
457 out.map_err(SdkError::from)
458 }
459
460 /// Move execution to waiting_children state.
461 ///
462 /// Releases the lease. The execution waits for child dependencies to complete.
463 /// Consumes self.
464 pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
465 // RFC-012 Stage 1b: forwards through `backend.wait_children`.
466 // See `delay_execution` for the stop_renewal contract.
467 let handle = self.synth_handle();
468 let out = self.backend.wait_children(&handle).await;
469 if fcall_landed(&out) {
470 self.stop_renewal();
471 }
472 out.map_err(SdkError::from)
473 }
474
475 /// Complete the execution successfully.
476 ///
477 /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
478 /// Renewal continues during the FCALL to prevent lease expiry under
479 /// network latency — the Lua fences on lease_id+epoch atomically.
480 /// Consumes self — the task cannot be used after completion.
481 ///
482 /// # Connection errors and replay
483 ///
484 /// If the Valkey connection drops after the Lua commit but before the
485 /// client reads the response, retrying `complete()` with the same
486 /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
487 /// reconciles the "already terminal with matching outcome" response
488 /// into `Ok(())`. A retry after a different terminal op has raced in
489 /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
490 /// populated `terminal_outcome` so the caller can see what actually
491 /// happened.
492 pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
493 // RFC-012 Stage 1b: forwards through `backend.complete`.
494 // The FCALL body + the `ExecutionNotActive` replay reconciliation
495 // (match same epoch + attempt_id + outcome=="success" → Ok)
496 // moved to `ff_backend_valkey::complete_impl`.
497 let handle = self.synth_handle();
498 let out = self.backend.complete(&handle, result_payload).await;
499 if fcall_landed(&out) {
500 self.stop_renewal();
501 }
502 out.map_err(SdkError::from)
503 }
504
505 /// Fail the execution with a reason and error category.
506 ///
507 /// If the execution policy allows retries, the engine schedules a retry
508 /// (delayed backoff). Otherwise, the execution becomes terminal failed.
509 /// Returns [`FailOutcome`] so the caller knows what happened.
510 ///
511 /// # Connection errors and replay
512 ///
513 /// `fail()` is replay-safe under the same conditions as `complete()`:
514 /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
515 /// reconciled into the outcome the server actually committed
516 /// (`TerminalFailed` if no retries left; `RetryScheduled` with
517 /// `delay_until = 0` if a retry was scheduled — the exact delay is
518 /// not recovered on the replay path).
519 pub async fn fail(
520 self,
521 reason: &str,
522 error_category: &str,
523 ) -> Result<FailOutcome, SdkError> {
524 // RFC-012 Stage 1b: forwards through `backend.fail`.
525 // The FCALL body + retry-policy read + the two-shape replay
526 // reconciliation (terminal/failed → TerminalFailed, runnable
527 // → RetryScheduled{0}) moved to
528 // `ff_backend_valkey::fail_impl`.
529 //
530 // **Preserving the caller's raw category string.** The
531 // pre-Stage-1b code passed `error_category` straight to the
532 // Lua `failure_category` ARGV. Real callers use arbitrary
533 // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
534 // `"inference_error"`, …) that do not correspond to the 5
535 // named `FailureClass` variants. A lossy enum conversion
536 // would rewrite every non-canonical category to
537 // `Transient` — a real behavior change for
538 // ff-readiness-tests + the examples crates.
539 //
540 // Instead: pack the raw category bytes into
541 // `FailureReason.detail`; `fail_impl` reads them back and
542 // threads them to Lua verbatim. The `classification` enum
543 // is derived from the string for the few consumers who
544 // match on the trait return today (none in-workspace).
545 // Stage 1d (or issue #117) widens `FailureClass` with a
546 // `Custom(String)` arm; this stash carrier retires then.
547 let handle = self.synth_handle();
548 let failure_reason = ff_core::backend::FailureReason::with_detail(
549 reason.to_owned(),
550 error_category.as_bytes().to_vec(),
551 );
552 let classification = error_category_to_class(error_category);
553 let out = self.backend.fail(&handle, failure_reason, classification).await;
554 if fcall_landed(&out) {
555 self.stop_renewal();
556 }
557 out.map_err(SdkError::from)
558 }
559
560 /// Cancel the execution.
561 ///
562 /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
563 /// Consumes self.
564 ///
565 /// # Connection errors and replay
566 ///
567 /// `cancel()` is replay-safe under the same conditions as `complete()`:
568 /// a retry by the same caller matching `lease_epoch` + `attempt_id`
569 /// returns `Ok(())` if the server's stored `terminal_outcome` is
570 /// `cancelled`. A retry that finds a different outcome (because a
571 /// concurrent `complete()` or `fail()` won the race) surfaces as
572 /// `ExecutionNotActive` with the populated `terminal_outcome` so the
573 /// caller can see that the cancel intent was NOT honored.
574 pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
575 // RFC-012 Stage 1b: forwards through `backend.cancel`.
576 // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
577 // and the `ExecutionNotActive` → outcome=="cancelled" replay
578 // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
579 let handle = self.synth_handle();
580 let out = self.backend.cancel(&handle, reason).await;
581 if fcall_landed(&out) {
582 self.stop_renewal();
583 }
584 out.map_err(SdkError::from)
585 }
586
587 // ── Non-terminal operations ──
588
589 /// Manually renew the lease.
590 ///
591 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
592 /// trait. The background renewal task calls
593 /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
594 /// this method is the public entry point for workers that want
595 /// to force a renew out-of-band.
596 pub async fn renew_lease(&self) -> Result<(), SdkError> {
597 let handle = self.synth_handle();
598 self.backend
599 .renew(&handle)
600 .await
601 .map(|_renewal| ())
602 .map_err(SdkError::from)
603 }
604
605 /// Update progress (pct 0-100 and optional message).
606 ///
607 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
608 /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
609 pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
610 let handle = self.synth_handle();
611 self.backend
612 .progress(&handle, Some(pct), Some(message.to_owned()))
613 .await
614 .map_err(SdkError::from)
615 }
616
617 /// Report usage against a budget and check limits.
618 ///
619 /// Non-consuming — the worker can report usage multiple times.
620 /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
621 /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
622 // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b Tranche 3.
623 // `AdmissionDecision` (trait return) cannot losslessly encode
624 // `ReportUsageResult::{SoftBreach, HardBreach, AlreadyApplied}`
625 // — `SoftBreach` is a warning (not a rejection) and has no home
626 // in `Admitted / Throttled / Rejected`, and `HardBreach`'s
627 // structured dim/current/limit fields collapse to `Rejected { reason: String }`.
628 // Tracked in #117.
629 pub async fn report_usage(
630 &self,
631 budget_id: &BudgetId,
632 dimensions: &[(&str, u64)],
633 dedup_key: Option<&str>,
634 ) -> Result<ReportUsageResult, SdkError> {
635 let partition = budget_partition(budget_id, &self.partition_config);
636 let bctx = BudgetKeyContext::new(&partition, budget_id);
637
638 // KEYS (3): budget_usage, budget_limits, budget_def
639 let keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
640
641 // ARGV: dim_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]
642 let now = TimestampMs::now();
643 let dim_count = dimensions.len();
644 let mut argv: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
645 argv.push(dim_count.to_string());
646 for (dim, _) in dimensions {
647 argv.push((*dim).to_string());
648 }
649 for (_, delta) in dimensions {
650 argv.push(delta.to_string());
651 }
652 argv.push(now.to_string());
653 let dedup_key_val = dedup_key
654 .filter(|k| !k.is_empty())
655 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
656 .unwrap_or_default();
657 argv.push(dedup_key_val);
658
659 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
660 let argv_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
661
662 let raw: Value = self
663 .client
664 .fcall("ff_report_usage_and_check", &key_refs, &argv_refs)
665 .await
666 .map_err(SdkError::Valkey)?;
667
668 parse_report_usage_result(&raw)
669 }
670
671 /// Create a pending waitpoint for future signal delivery.
672 ///
673 /// Non-consuming — the worker keeps the lease. Signals delivered to the
674 /// waitpoint are buffered. When the worker later calls `suspend()` with
675 /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
676 /// resume condition.
677 ///
678 /// Returns both the waitpoint_id AND the HMAC token required by external
679 /// callers to buffer signals against this pending waitpoint
680 /// (RFC-004 §Waitpoint Security).
681 // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. The
682 // `EngineBackend` trait has no `create_pending_waitpoint` slot —
683 // RFC-012 §3.1 inventory does not list this op today. Needs
684 // either a new trait method or a reshape of `suspend` that covers
685 // the lease-retaining flow. Tracked in #117.
686 pub async fn create_pending_waitpoint(
687 &self,
688 waitpoint_key: &str,
689 expires_in_ms: u64,
690 ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
691 let partition = execution_partition(&self.execution_id, &self.partition_config);
692 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
693 let idx = IndexKeys::new(&partition);
694
695 let waitpoint_id = WaitpointId::new();
696 let expires_at = TimestampMs::from_millis(TimestampMs::now().0 + expires_in_ms as i64);
697
698 // KEYS (4): exec_core, waitpoint_hash, pending_wp_expiry_zset, hmac_secrets
699 let keys: Vec<String> = vec![
700 ctx.core(),
701 ctx.waitpoint(&waitpoint_id),
702 idx.pending_waitpoint_expiry(),
703 idx.waitpoint_hmac_secrets(),
704 ];
705
706 // ARGV (5): execution_id, attempt_index, waitpoint_id, waitpoint_key, expires_at
707 let args: Vec<String> = vec![
708 self.execution_id.to_string(),
709 self.attempt_index.to_string(),
710 waitpoint_id.to_string(),
711 waitpoint_key.to_owned(),
712 expires_at.to_string(),
713 ];
714
715 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
716 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
717
718 let raw: Value = self
719 .client
720 .fcall("ff_create_pending_waitpoint", &key_refs, &arg_refs)
721 .await
722 .map_err(SdkError::Valkey)?;
723
724 // Response: {1, "OK", waitpoint_id, waitpoint_key, waitpoint_token}.
725 // Extract the token (the waitpoint_id is the one we generated locally).
726 let token = extract_pending_waitpoint_token(&raw)?;
727 Ok((waitpoint_id, token))
728 }
729
730 // ── Phase 4: Streaming ──
731
732 /// Append a frame to the current attempt's output stream.
733 ///
734 /// Non-consuming — the worker can append many frames during execution.
735 /// The stream is created lazily on the first append.
736 // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
737 // method `append_frame` returns `()`; this SDK method returns
738 // `AppendFrameOutcome { stream_id, frame_count }` — the return-type
739 // delta is a hard ABI break on the Stage-1a trait. Tracked in #117.
740 pub async fn append_frame(
741 &self,
742 frame_type: &str,
743 payload: &[u8],
744 metadata: Option<&str>,
745 ) -> Result<AppendFrameOutcome, SdkError> {
746 let partition = execution_partition(&self.execution_id, &self.partition_config);
747 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
748
749 let now = TimestampMs::now();
750
751 // KEYS (3): exec_core, stream_key, stream_meta
752 let keys: Vec<String> = vec![
753 ctx.core(),
754 ctx.stream(self.attempt_index),
755 ctx.stream_meta(self.attempt_index),
756 ];
757
758 let payload_str = String::from_utf8_lossy(payload);
759
760 // ARGV (13): execution_id, attempt_index, lease_id, lease_epoch,
761 // frame_type, ts, payload, encoding, correlation_id,
762 // source, retention_maxlen, attempt_id, max_payload_bytes
763 let args: Vec<String> = vec![
764 self.execution_id.to_string(), // 1
765 self.attempt_index.to_string(), // 2
766 self.lease_id.to_string(), // 3
767 self.lease_epoch.to_string(), // 4
768 frame_type.to_owned(), // 5
769 now.to_string(), // 6 ts
770 payload_str.into_owned(), // 7
771 "utf8".to_owned(), // 8 encoding
772 metadata.unwrap_or("").to_owned(), // 9 correlation_id
773 "worker".to_owned(), // 10 source
774 "10000".to_owned(), // 11 retention_maxlen
775 self.attempt_id.to_string(), // 12
776 "65536".to_owned(), // 13 max_payload_bytes
777 ];
778
779 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
780 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
781
782 let raw: Value = self
783 .client
784 .fcall("ff_append_frame", &key_refs, &arg_refs)
785 .await
786 .map_err(SdkError::Valkey)?;
787
788 parse_append_frame_result(&raw)
789 }
790
791 // ── Phase 3: Suspend ──
792
793 /// Suspend the execution, releasing the lease and creating a waitpoint.
794 ///
795 /// The execution transitions to `suspended` and the worker loses ownership.
796 /// An external signal matching the condition will resume the execution.
797 ///
798 /// If `condition_matchers` is empty, a wildcard matcher is created that
799 /// matches ANY signal name. To require an explicit operator resume with
800 /// no signal match, pass a sentinel name that no real signal will use.
801 ///
802 /// If buffered signals on a pending waitpoint already satisfy the condition,
803 /// returns `AlreadySatisfied` and the lease is NOT released.
804 ///
805 /// Consumes self — the task cannot be used after suspension.
806 // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
807 // method `suspend` returns `Handle` (a resumed-kind attempt
808 // cookie); this SDK method returns `SuspendOutcome { suspension_id,
809 // waitpoint_id, waitpoint_key, waitpoint_token }` — semantically
810 // distinct return shapes. Also: SDK takes `&[ConditionMatcher]` +
811 // `TimeoutBehavior` with no `WaitpointSpec` mapping. Tracked in #117.
812 pub async fn suspend(
813 self,
814 reason_code: &str,
815 condition_matchers: &[ConditionMatcher],
816 timeout_ms: Option<u64>,
817 timeout_behavior: TimeoutBehavior,
818 ) -> Result<SuspendOutcome, SdkError> {
819 let partition = execution_partition(&self.execution_id, &self.partition_config);
820 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
821 let idx = IndexKeys::new(&partition);
822
823 let suspension_id = SuspensionId::new();
824 let waitpoint_id = WaitpointId::new();
825 // For now, use a simple opaque key (real implementation would use HMAC token)
826 let waitpoint_key = format!("wpk:{}", waitpoint_id);
827
828 let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
829
830 // Build resume condition JSON
831 let required_signal_names: Vec<&str> = condition_matchers
832 .iter()
833 .map(|m| m.signal_name.as_str())
834 .collect();
835 let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
836 let resume_condition_json = serde_json::json!({
837 "condition_type": "signal_set",
838 "required_signal_names": required_signal_names,
839 "signal_match_mode": match_mode,
840 "minimum_signal_count": 1,
841 "timeout_behavior": timeout_behavior.as_str(),
842 "allow_operator_override": true,
843 }).to_string();
844
845 let resume_policy_json = serde_json::json!({
846 "resume_target": "runnable",
847 "close_waitpoint_on_resume": true,
848 "consume_matched_signals": true,
849 "retain_signal_buffer_until_closed": true,
850 }).to_string();
851
852 // KEYS (17): exec_core, attempt_record, lease_current, lease_history,
853 // lease_expiry, worker_leases, suspension_current, waitpoint_hash,
854 // waitpoint_signals, suspension_timeout, pending_wp_expiry,
855 // active_index, suspended_index, waitpoint_history, wp_condition,
856 // attempt_timeout, hmac_secrets
857 let keys: Vec<String> = vec![
858 ctx.core(), // 1
859 ctx.attempt_hash(self.attempt_index), // 2
860 ctx.lease_current(), // 3
861 ctx.lease_history(), // 4
862 idx.lease_expiry(), // 5
863 idx.worker_leases(&self.worker_instance_id), // 6
864 ctx.suspension_current(), // 7
865 ctx.waitpoint(&waitpoint_id), // 8
866 ctx.waitpoint_signals(&waitpoint_id), // 9
867 idx.suspension_timeout(), // 10
868 idx.pending_waitpoint_expiry(), // 11
869 idx.lane_active(&self.lane_id), // 12
870 idx.lane_suspended(&self.lane_id), // 13
871 ctx.waitpoints(), // 14
872 ctx.waitpoint_condition(&waitpoint_id), // 15
873 idx.attempt_timeout(), // 16
874 idx.waitpoint_hmac_secrets(), // 17
875 ];
876
877 // ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
878 // lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
879 // reason_code, requested_by, timeout_at, resume_condition_json,
880 // resume_policy_json, continuation_metadata_pointer,
881 // use_pending_waitpoint, timeout_behavior, lease_history_maxlen
882 let args: Vec<String> = vec![
883 self.execution_id.to_string(), // 1
884 self.attempt_index.to_string(), // 2
885 self.attempt_id.to_string(), // 3
886 self.lease_id.to_string(), // 4
887 self.lease_epoch.to_string(), // 5
888 suspension_id.to_string(), // 6
889 waitpoint_id.to_string(), // 7
890 waitpoint_key.clone(), // 8
891 reason_code.to_owned(), // 9
892 "worker".to_owned(), // 10
893 timeout_at.map_or(String::new(), |t| t.to_string()), // 11
894 resume_condition_json, // 12
895 resume_policy_json, // 13
896 String::new(), // 14 continuation_metadata_ptr
897 String::new(), // 15 use_pending_waitpoint
898 timeout_behavior.as_str().to_owned(), // 16
899 "1000".to_owned(), // 17 lease_history_maxlen
900 ];
901
902 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
903 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
904
905 let raw: Value = self
906 .client
907 .fcall("ff_suspend_execution", &key_refs, &arg_refs)
908 .await
909 .map_err(SdkError::Valkey)?;
910
911 self.stop_renewal();
912 parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
913 }
914
915 /// Read the signals that satisfied the waitpoint and triggered this
916 /// resume.
917 ///
918 /// Non-consuming. Intended to be called immediately after re-claim via
919 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
920 /// subsequent `suspend()` (which replaces `suspension:current`).
921 ///
922 /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
923 ///
924 /// - No prior suspension on this execution.
925 /// - The prior suspension belonged to an earlier attempt (e.g. the
926 /// attempt was cancelled/failed and a retry is now claiming).
927 /// - The prior suspension was closed by timeout / cancel / operator
928 /// override rather than by a matched signal.
929 ///
930 /// Reads `suspension:current` once, filters by `attempt_index` to
931 /// guard against stale prior-attempt records, then fetches the matched
932 /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
933 /// fields and reads each signal's metadata + payload directly.
934 pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
935 // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
936 // Pre-migration body (HGETALL suspension_current + HMGET
937 // matchers + pipelined HGETALL signal_hash / GET
938 // signal_payload) lives in
939 // `ff_backend_valkey::observe_signals_impl`.
940 let handle = self.synth_handle();
941 self.backend
942 .observe_signals(&handle)
943 .await
944 .map_err(SdkError::from)
945 }
946
947 /// Signal the renewal task to stop. Called by every terminal op
948 /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
949 /// `move_to_waiting_children`) after the FCALL returns. Also marks
950 /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
951 /// consumption from a genuine drop-without-terminal-op.
952 fn stop_renewal(&self) {
953 self.terminal_op_called.store(true, Ordering::Release);
954 self.renewal_stop.notify_one();
955 }
956
957}
958
959/// True iff the backend's FCALL result represents a round-trip that
960/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
961/// contention, conflict, state, bug) all count as "landed" — the
962/// server either committed or rejected with a typed response. Only
963/// raw `Transport` errors (connection drops, request timeouts, parse
964/// failures) count as "did not land", which matches the pre-Stage-1b
965/// SDK's `fcall(...).await.map_err(SdkError::Valkey)?` short-circuit
966/// — those errors returned before `stop_renewal()` ran, preserving
967/// the `Drop` warning for genuine "lease will leak" cases.
968///
969/// Stage 1b terminal-op forwarders use this predicate to decide
970/// whether to call `stop_renewal()`: yes for landed responses, no
971/// for transport errors so the caller's retry path still sees a
972/// running renewal task.
973fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
974 match r {
975 Ok(_) => true,
976 Err(crate::EngineError::Transport { .. }) => false,
977 Err(_) => true,
978 }
979}
980
981/// Map the SDK's free-form `error_category: &str` to the typed
982/// `FailureClass` the trait's `fail` method takes. Unknown categories
983/// fall through to `Transient` — the Lua side already tolerated
984/// arbitrary strings, so the worst a category drift does under the
985/// Stage 1b forwarder is reclassify a novel category as transient.
986/// Stage 1d (or issue #117) widens `FailureClass` with a
987/// `Custom(String)` arm for exact round-trip.
988fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
989 use ff_core::backend::FailureClass;
990 match s {
991 "transient" => FailureClass::Transient,
992 "permanent" => FailureClass::Permanent,
993 "infra_crash" => FailureClass::InfraCrash,
994 "timeout" => FailureClass::Timeout,
995 "cancelled" => FailureClass::Cancelled,
996 _ => FailureClass::Transient,
997 }
998}
999
1000impl Drop for ClaimedTask {
1001 fn drop(&mut self) {
1002 // Abort the background renewal task on drop.
1003 // This is a safety net — complete/fail/cancel already stop renewal
1004 // via notify before consuming self. But if the task is dropped
1005 // without being consumed (e.g., panic), abort prevents leaked renewals.
1006 //
1007 // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1008 // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1009 // and then self is consumed into Drop immediately. The renewal task
1010 // has not yet been polled by the runtime, so `is_finished()` is still
1011 // `false` here — which previously fired the warning on every
1012 // complete/fail/cancel/suspend call. `terminal_op_called` is the
1013 // authoritative signal that a terminal-op path ran to the point of
1014 // stopping renewal; it does not by itself certify the Lua side
1015 // succeeded (see the field doc). The caller surfaces any error via
1016 // the op's return value, so a `Drop` warning is unneeded there.
1017 if !self.terminal_op_called.load(Ordering::Acquire) {
1018 tracing::warn!(
1019 execution_id = %self.execution_id,
1020 "ClaimedTask dropped without terminal operation — lease will expire"
1021 );
1022 }
1023 self.renewal_handle.abort();
1024 }
1025}
1026
1027// ── Lease renewal ──
1028
1029/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1030/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1031/// hooks still see one span per renewal (restores PR #119 Cursor
1032/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1033/// construction time, not per-tick).
1034///
1035/// See `benches/harness/src/bin/long_running.rs` for the bench
1036/// consumer that depends on this span naming.
1037#[tracing::instrument(
1038 name = "renew_lease",
1039 skip_all,
1040 fields(execution_id = %execution_id)
1041)]
1042async fn renew_once(
1043 backend: &dyn EngineBackend,
1044 handle: &Handle,
1045 execution_id: &ExecutionId,
1046) -> Result<(), crate::EngineError> {
1047 backend.renew(handle).await.map(|_| ())
1048}
1049
1050/// Spawn a background tokio task that renews the lease at `ttl / 3`
1051/// intervals.
1052///
1053/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1054/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1055/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1056/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1057/// + the encoded `Handle` instead. Per-tick tracing lives on
1058/// [`renew_once`]; this function itself is sync + one-shot.
1059///
1060/// Stops when:
1061/// - `stop_signal` is notified (complete/fail/cancel called)
1062/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1063/// - The task handle is aborted (ClaimedTask dropped)
1064fn spawn_renewal_task(
1065 backend: Arc<dyn EngineBackend>,
1066 handle: Handle,
1067 execution_id: ExecutionId,
1068 lease_ttl_ms: u64,
1069 stop_signal: Arc<Notify>,
1070 failure_counter: Arc<AtomicU32>,
1071) -> JoinHandle<()> {
1072 // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1073 // panics if a caller (or a misconfigured test) passes a
1074 // lease_ttl_ms < 3. The SDK config validator already enforces
1075 // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1076 // is a cheap belt-and-suspenders (Copilot review finding on
1077 // PR #119).
1078 let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1079
1080 tokio::spawn(async move {
1081 let mut tick = tokio::time::interval(interval);
1082 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1083 // Skip the first immediate tick — the lease was just acquired.
1084 tick.tick().await;
1085
1086 loop {
1087 tokio::select! {
1088 _ = stop_signal.notified() => {
1089 tracing::debug!(
1090 execution_id = %execution_id,
1091 "lease renewal stopped by signal"
1092 );
1093 return;
1094 }
1095 _ = tick.tick() => {
1096 match renew_once(backend.as_ref(), &handle, &execution_id).await {
1097 Ok(_renewal) => {
1098 failure_counter.store(0, Ordering::Relaxed);
1099 tracing::trace!(
1100 execution_id = %execution_id,
1101 "lease renewed"
1102 );
1103 }
1104 Err(e) if is_terminal_renewal_error(&e) => {
1105 failure_counter.fetch_add(1, Ordering::Relaxed);
1106 tracing::warn!(
1107 execution_id = %execution_id,
1108 error = %e,
1109 "lease renewal failed with terminal error, stopping renewal"
1110 );
1111 return;
1112 }
1113 Err(e) => {
1114 let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1115 tracing::warn!(
1116 execution_id = %execution_id,
1117 error = %e,
1118 consecutive_failures = count,
1119 "lease renewal failed (will retry next interval)"
1120 );
1121 }
1122 }
1123 }
1124 }
1125 }
1126 })
1127}
1128
1129/// Check if an engine error means renewal should stop permanently.
1130#[allow(dead_code)]
1131fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1132 use crate::{ContentionKind, EngineError, StateKind};
1133 matches!(
1134 err,
1135 EngineError::State(
1136 StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1137 ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1138 | EngineError::NotFound { entity: "execution" }
1139 )
1140}
1141
1142// ── FCALL result parsing ──
1143
1144/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1145/// function into a typed [`ReportUsageResult`].
1146///
1147/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1148/// `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1149/// Status code `!= 1` is parsed as a [`ScriptError`] via
1150/// [`ScriptError::from_code_with_detail`].
1151///
1152/// Exposed as `pub` so downstream SDKs that speak the same wire format
1153/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1154/// call this directly instead of re-implementing the parse. Keeping one
1155/// parser paired with the producer (the Lua function registered at
1156/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1157/// against silent format drift between producer and consumer.
1158pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1159 let arr = match raw {
1160 Value::Array(arr) => arr,
1161 _ => {
1162 return Err(SdkError::from(ScriptError::Parse {
1163 fcall: "parse_report_usage_result".into(),
1164 execution_id: None,
1165 message: "ff_report_usage_and_check: expected Array".into(),
1166 }));
1167 }
1168 };
1169 let status_code = match arr.first() {
1170 Some(Ok(Value::Int(n))) => *n,
1171 _ => {
1172 return Err(SdkError::from(ScriptError::Parse {
1173 fcall: "parse_report_usage_result".into(),
1174 execution_id: None,
1175 message: "ff_report_usage_and_check: expected Int status code".into(),
1176 }));
1177 }
1178 };
1179 if status_code != 1 {
1180 let error_code = usage_field_str(arr, 1);
1181 let detail = usage_field_str(arr, 2);
1182 return Err(SdkError::from(
1183 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1184 ScriptError::Parse {
1185 fcall: "parse_report_usage_result".into(),
1186 execution_id: None,
1187 message: format!("ff_report_usage_and_check: {error_code}"),
1188 }
1189 }),
1190 ));
1191 }
1192 let sub_status = usage_field_str(arr, 1);
1193 match sub_status.as_str() {
1194 "OK" => Ok(ReportUsageResult::Ok),
1195 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1196 "SOFT_BREACH" => {
1197 let dim = usage_field_str(arr, 2);
1198 let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1199 let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1200 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1201 }
1202 "HARD_BREACH" => {
1203 let dim = usage_field_str(arr, 2);
1204 let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1205 let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1206 Ok(ReportUsageResult::HardBreach {
1207 dimension: dim,
1208 current_usage: current,
1209 hard_limit: limit,
1210 })
1211 }
1212 _ => Err(SdkError::from(ScriptError::Parse {
1213 fcall: "parse_report_usage_result".into(),
1214 execution_id: None,
1215 message: format!(
1216 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1217 ),
1218 })),
1219 }
1220}
1221
1222fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1223 match arr.get(index) {
1224 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1225 Some(Ok(Value::SimpleString(s))) => s.clone(),
1226 Some(Ok(Value::Int(n))) => n.to_string(),
1227 _ => String::new(),
1228 }
1229}
1230
1231/// Parse a required numeric usage field (u64) from the wire array at
1232/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1233/// holds a non-string/non-int value, or contains a string that does
1234/// not parse as u64.
1235///
1236/// Rationale: the Lua producer (`lua/budget.lua:99`,
1237/// `ff_report_usage_and_check`) always emits
1238/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1239/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1240/// non-numeric value here means the Lua and Rust sides drifted;
1241/// silently coercing to `0` would surface drift as "zero-usage breach"
1242/// — arithmetically correct but semantically nonsense. Fail loudly
1243/// instead so drift shows up as a parse error at the first call site.
1244fn parse_usage_u64(
1245 arr: &[Result<Value, ferriskey::Error>],
1246 index: usize,
1247 sub_status: &str,
1248 field_name: &str,
1249) -> Result<u64, SdkError> {
1250 match arr.get(index) {
1251 Some(Ok(Value::Int(n))) => {
1252 u64::try_from(*n).map_err(|_| {
1253 SdkError::from(ScriptError::Parse {
1254 fcall: "parse_usage_u64".into(),
1255 execution_id: None,
1256 message: format!(
1257 "ff_report_usage_and_check {sub_status}: {field_name} \
1258 (index {index}) negative int {n} cannot be u64"
1259 ),
1260 })
1261 })
1262 }
1263 Some(Ok(Value::BulkString(b))) => {
1264 let s = String::from_utf8_lossy(b);
1265 s.parse::<u64>().map_err(|_| {
1266 SdkError::from(ScriptError::Parse {
1267 fcall: "parse_usage_u64".into(),
1268 execution_id: None,
1269 message: format!(
1270 "ff_report_usage_and_check {sub_status}: {field_name} \
1271 (index {index}) not a u64 string: {s:?}"
1272 ),
1273 })
1274 })
1275 }
1276 Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1277 SdkError::from(ScriptError::Parse {
1278 fcall: "parse_usage_u64".into(),
1279 execution_id: None,
1280 message: format!(
1281 "ff_report_usage_and_check {sub_status}: {field_name} \
1282 (index {index}) not a u64 string: {s:?}"
1283 ),
1284 })
1285 }),
1286 Some(_) => Err(SdkError::from(ScriptError::Parse {
1287 fcall: "parse_usage_u64".into(),
1288 execution_id: None,
1289 message: format!(
1290 "ff_report_usage_and_check {sub_status}: {field_name} \
1291 (index {index}) wrong wire type (expected Int or String)"
1292 ),
1293 })),
1294 None => Err(SdkError::from(ScriptError::Parse {
1295 fcall: "parse_usage_u64".into(),
1296 execution_id: None,
1297 message: format!(
1298 "ff_report_usage_and_check {sub_status}: {field_name} \
1299 (index {index}) missing from response"
1300 ),
1301 })),
1302 }
1303}
1304
1305/// Parse a standard {1, "OK", ...} / {0, "error", ...} FCALL result.
1306/// Extract the waitpoint_token (field index 4 in the 0-indexed response array)
1307/// from an `ff_create_pending_waitpoint` reply. Runs `parse_success_result`
1308/// first so error cases produce the usual typed error, not a parse failure.
1309fn extract_pending_waitpoint_token(raw: &Value) -> Result<WaitpointToken, SdkError> {
1310 parse_success_result(raw, "ff_create_pending_waitpoint")?;
1311 let arr = match raw {
1312 Value::Array(arr) => arr,
1313 _ => unreachable!("parse_success_result would have rejected non-array"),
1314 };
1315 // Layout: [0]=1, [1]="OK", [2]=waitpoint_id, [3]=waitpoint_key, [4]=waitpoint_token
1316 let token_str = arr
1317 .get(4)
1318 .and_then(|v| match v {
1319 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1320 Ok(Value::SimpleString(s)) => Some(s.clone()),
1321 _ => None,
1322 })
1323 .ok_or_else(|| {
1324 SdkError::from(ScriptError::Parse {
1325 fcall: "extract_pending_waitpoint_token".into(),
1326 execution_id: None,
1327 message: "ff_create_pending_waitpoint: missing waitpoint_token in response".into(),
1328 })
1329 })?;
1330 Ok(WaitpointToken::new(token_str))
1331}
1332
1333/// Pure helper: decide whether `suspension:current` represents a
1334/// signal-driven resume for the currently-claimed attempt, and extract
1335/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1336/// (no record, stale prior-attempt, non-resumed close). Returns an error
1337/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1338/// bug rather than a missing-data case.
1339///
1340/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1341/// `EngineBackend::observe_signals`, which re-implements this invariant
1342/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1343/// tests so the parsing contract stays exercised at the SDK layer —
1344/// Stage 1d will consolidate (either promote the helper into ff-core
1345/// or drop these tests once the backend-side tests cover equivalent
1346/// ground).
1347#[allow(dead_code)]
1348fn resume_waitpoint_id_from_suspension(
1349 susp: &HashMap<String, String>,
1350 claimed_attempt: AttemptIndex,
1351) -> Result<Option<WaitpointId>, SdkError> {
1352 if susp.is_empty() {
1353 return Ok(None);
1354 }
1355 let susp_att: u32 = susp
1356 .get("attempt_index")
1357 .and_then(|s| s.parse().ok())
1358 .unwrap_or(u32::MAX);
1359 if susp_att != claimed_attempt.0 {
1360 return Ok(None);
1361 }
1362 let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1363 if close_reason != "resumed" {
1364 return Ok(None);
1365 }
1366 let wp_id_str = susp
1367 .get("waitpoint_id")
1368 .map(String::as_str)
1369 .unwrap_or_default();
1370 if wp_id_str.is_empty() {
1371 return Ok(None);
1372 }
1373 let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1374 SdkError::from(ScriptError::Parse {
1375 fcall: "resume_waitpoint_id_from_suspension".into(),
1376 execution_id: None,
1377 message: format!(
1378 "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1379 ),
1380 })
1381 })?;
1382 Ok(Some(waitpoint_id))
1383}
1384
1385pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1386 let arr = match raw {
1387 Value::Array(arr) => arr,
1388 _ => {
1389 return Err(SdkError::from(ScriptError::Parse {
1390 fcall: "parse_success_result".into(),
1391 execution_id: None,
1392 message: format!(
1393 "{function_name}: expected Array, got non-array"
1394 ),
1395 }));
1396 }
1397 };
1398
1399 if arr.is_empty() {
1400 return Err(SdkError::from(ScriptError::Parse {
1401 fcall: "parse_success_result".into(),
1402 execution_id: None,
1403 message: format!(
1404 "{function_name}: empty result array"
1405 ),
1406 }));
1407 }
1408
1409 let status_code = match arr.first() {
1410 Some(Ok(Value::Int(n))) => *n,
1411 _ => {
1412 return Err(SdkError::from(ScriptError::Parse {
1413 fcall: "parse_success_result".into(),
1414 execution_id: None,
1415 message: format!(
1416 "{function_name}: expected Int at index 0"
1417 ),
1418 }));
1419 }
1420 };
1421
1422 if status_code == 1 {
1423 Ok(())
1424 } else {
1425 // Extract error code from index 1 and optional detail from index 2
1426 // (e.g. `capability_mismatch` ships missing tokens there). Variants
1427 // that carry a String payload pick the detail up via
1428 // `from_code_with_detail`; other variants ignore it.
1429 let field_str = |idx: usize| -> String {
1430 arr.get(idx)
1431 .and_then(|v| match v {
1432 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1433 Ok(Value::SimpleString(s)) => Some(s.clone()),
1434 _ => None,
1435 })
1436 .unwrap_or_default()
1437 };
1438 let error_code = {
1439 let s = field_str(1);
1440 if s.is_empty() { "unknown".to_owned() } else { s }
1441 };
1442 // Collect all detail slots (idx >= 2). Most variants only read
1443 // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1444 // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1445 // reconciliation after a network drop.
1446 let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1447 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1448
1449 let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1450 .unwrap_or_else(|| {
1451 ScriptError::Parse {
1452 fcall: "parse_success_result".into(),
1453 execution_id: None,
1454 message: format!("{function_name}: unknown error: {error_code}"),
1455 }
1456 });
1457
1458 Err(SdkError::from(script_err))
1459 }
1460}
1461
1462/// Parse ff_suspend_execution result:
1463/// ok(suspension_id, waitpoint_id, waitpoint_key)
1464/// ok_already_satisfied(suspension_id, waitpoint_id, waitpoint_key)
1465fn parse_suspend_result(
1466 raw: &Value,
1467 suspension_id: SuspensionId,
1468 waitpoint_id: WaitpointId,
1469 waitpoint_key: String,
1470) -> Result<SuspendOutcome, SdkError> {
1471 let arr = match raw {
1472 Value::Array(arr) => arr,
1473 _ => {
1474 return Err(SdkError::from(ScriptError::Parse {
1475 fcall: "parse_suspend_result".into(),
1476 execution_id: None,
1477 message: "ff_suspend_execution: expected Array".into(),
1478 }));
1479 }
1480 };
1481
1482 let status_code = match arr.first() {
1483 Some(Ok(Value::Int(n))) => *n,
1484 _ => {
1485 return Err(SdkError::from(ScriptError::Parse {
1486 fcall: "parse_suspend_result".into(),
1487 execution_id: None,
1488 message: "ff_suspend_execution: bad status code".into(),
1489 }));
1490 }
1491 };
1492
1493 if status_code != 1 {
1494 let err_field_str = |idx: usize| -> String {
1495 arr.get(idx)
1496 .and_then(|v| match v {
1497 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1498 Ok(Value::SimpleString(s)) => Some(s.clone()),
1499 _ => None,
1500 })
1501 .unwrap_or_default()
1502 };
1503 let error_code = {
1504 let s = err_field_str(1);
1505 if s.is_empty() { "unknown".to_owned() } else { s }
1506 };
1507 let detail = err_field_str(2);
1508 return Err(SdkError::from(
1509 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1510 ScriptError::Parse {
1511 fcall: "parse_suspend_result".into(),
1512 execution_id: None,
1513 message: format!("ff_suspend_execution: {error_code}"),
1514 }
1515 }),
1516 ));
1517 }
1518
1519 // Check sub-status at index 1
1520 let sub_status = arr
1521 .get(1)
1522 .and_then(|v| match v {
1523 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1524 Ok(Value::SimpleString(s)) => Some(s.clone()),
1525 _ => None,
1526 })
1527 .unwrap_or_default();
1528
1529 // Lua returns: {1, status, suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
1530 // The suspension_id/waitpoint_id/waitpoint_key values the worker passed in are
1531 // authoritative (Lua echoes them); waitpoint_token however is MINTED by Lua and
1532 // must be read from the response.
1533 let waitpoint_token = arr
1534 .get(5)
1535 .and_then(|v| match v {
1536 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1537 Ok(Value::SimpleString(s)) => Some(s.clone()),
1538 _ => None,
1539 })
1540 .map(WaitpointToken::new)
1541 .ok_or_else(|| {
1542 SdkError::from(ScriptError::Parse {
1543 fcall: "parse_suspend_result".into(),
1544 execution_id: None,
1545 message: "ff_suspend_execution: missing waitpoint_token in response".into(),
1546 })
1547 })?;
1548
1549 if sub_status == "ALREADY_SATISFIED" {
1550 Ok(SuspendOutcome::AlreadySatisfied {
1551 suspension_id,
1552 waitpoint_id,
1553 waitpoint_key,
1554 waitpoint_token,
1555 })
1556 } else {
1557 Ok(SuspendOutcome::Suspended {
1558 suspension_id,
1559 waitpoint_id,
1560 waitpoint_key,
1561 waitpoint_token,
1562 })
1563 }
1564}
1565
1566/// Parse ff_deliver_signal result:
1567/// ok(signal_id, effect)
1568/// ok_duplicate(existing_signal_id)
1569pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1570 let arr = match raw {
1571 Value::Array(arr) => arr,
1572 _ => {
1573 return Err(SdkError::from(ScriptError::Parse {
1574 fcall: "parse_signal_result".into(),
1575 execution_id: None,
1576 message: "ff_deliver_signal: expected Array".into(),
1577 }));
1578 }
1579 };
1580
1581 let status_code = match arr.first() {
1582 Some(Ok(Value::Int(n))) => *n,
1583 _ => {
1584 return Err(SdkError::from(ScriptError::Parse {
1585 fcall: "parse_signal_result".into(),
1586 execution_id: None,
1587 message: "ff_deliver_signal: bad status code".into(),
1588 }));
1589 }
1590 };
1591
1592 if status_code != 1 {
1593 let err_field_str = |idx: usize| -> String {
1594 arr.get(idx)
1595 .and_then(|v| match v {
1596 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1597 Ok(Value::SimpleString(s)) => Some(s.clone()),
1598 _ => None,
1599 })
1600 .unwrap_or_default()
1601 };
1602 let error_code = {
1603 let s = err_field_str(1);
1604 if s.is_empty() { "unknown".to_owned() } else { s }
1605 };
1606 let detail = err_field_str(2);
1607 return Err(SdkError::from(
1608 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1609 ScriptError::Parse {
1610 fcall: "parse_signal_result".into(),
1611 execution_id: None,
1612 message: format!("ff_deliver_signal: {error_code}"),
1613 }
1614 }),
1615 ));
1616 }
1617
1618 let sub_status = arr
1619 .get(1)
1620 .and_then(|v| match v {
1621 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1622 Ok(Value::SimpleString(s)) => Some(s.clone()),
1623 _ => None,
1624 })
1625 .unwrap_or_default();
1626
1627 if sub_status == "DUPLICATE" {
1628 let existing_id = arr
1629 .get(2)
1630 .and_then(|v| match v {
1631 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1632 Ok(Value::SimpleString(s)) => Some(s.clone()),
1633 _ => None,
1634 })
1635 .unwrap_or_default();
1636 return Ok(SignalOutcome::Duplicate {
1637 existing_signal_id: existing_id,
1638 });
1639 }
1640
1641 // Parse: {1, "OK", signal_id, effect}
1642 let signal_id_str = arr
1643 .get(2)
1644 .and_then(|v| match v {
1645 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1646 Ok(Value::SimpleString(s)) => Some(s.clone()),
1647 _ => None,
1648 })
1649 .unwrap_or_default();
1650
1651 let effect = arr
1652 .get(3)
1653 .and_then(|v| match v {
1654 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1655 Ok(Value::SimpleString(s)) => Some(s.clone()),
1656 _ => None,
1657 })
1658 .unwrap_or_default();
1659
1660 let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1661 SdkError::from(ScriptError::Parse {
1662 fcall: "parse_signal_result".into(),
1663 execution_id: None,
1664 message: format!(
1665 "ff_deliver_signal: invalid signal_id from Lua: {e}"
1666 ),
1667 })
1668 })?;
1669
1670 if effect == "resume_condition_satisfied" {
1671 Ok(SignalOutcome::TriggeredResume { signal_id })
1672 } else {
1673 Ok(SignalOutcome::Accepted { signal_id, effect })
1674 }
1675}
1676
1677/// Parse ff_append_frame result: ok(entry_id, frame_count)
1678fn parse_append_frame_result(raw: &Value) -> Result<AppendFrameOutcome, SdkError> {
1679 let arr = match raw {
1680 Value::Array(arr) => arr,
1681 _ => {
1682 return Err(SdkError::from(ScriptError::Parse {
1683 fcall: "parse_append_frame_result".into(),
1684 execution_id: None,
1685 message: "ff_append_frame: expected Array".into(),
1686 }));
1687 }
1688 };
1689
1690 let status_code = match arr.first() {
1691 Some(Ok(Value::Int(n))) => *n,
1692 _ => {
1693 return Err(SdkError::from(ScriptError::Parse {
1694 fcall: "parse_append_frame_result".into(),
1695 execution_id: None,
1696 message: "ff_append_frame: bad status code".into(),
1697 }));
1698 }
1699 };
1700
1701 if status_code != 1 {
1702 let err_field_str = |idx: usize| -> String {
1703 arr.get(idx)
1704 .and_then(|v| match v {
1705 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1706 Ok(Value::SimpleString(s)) => Some(s.clone()),
1707 _ => None,
1708 })
1709 .unwrap_or_default()
1710 };
1711 let error_code = {
1712 let s = err_field_str(1);
1713 if s.is_empty() { "unknown".to_owned() } else { s }
1714 };
1715 let detail = err_field_str(2);
1716 return Err(SdkError::from(
1717 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1718 ScriptError::Parse {
1719 fcall: "parse_append_frame_result".into(),
1720 execution_id: None,
1721 message: format!("ff_append_frame: {error_code}"),
1722 }
1723 }),
1724 ));
1725 }
1726
1727 // ok(entry_id, frame_count) → arr[2] = entry_id, arr[3] = frame_count
1728 let stream_id = arr
1729 .get(2)
1730 .and_then(|v| match v {
1731 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1732 Ok(Value::SimpleString(s)) => Some(s.clone()),
1733 _ => None,
1734 })
1735 .unwrap_or_default();
1736
1737 let frame_count = arr
1738 .get(3)
1739 .and_then(|v| match v {
1740 Ok(Value::BulkString(b)) => String::from_utf8_lossy(b).parse::<u64>().ok(),
1741 Ok(Value::SimpleString(s)) => s.parse::<u64>().ok(),
1742 Ok(Value::Int(n)) => Some(*n as u64),
1743 _ => None,
1744 })
1745 .unwrap_or(0);
1746
1747 Ok(AppendFrameOutcome {
1748 stream_id,
1749 frame_count,
1750 })
1751}
1752
1753/// Parse ff_fail_execution result:
1754/// ok("retry_scheduled", delay_until)
1755/// ok("terminal_failed")
1756#[allow(dead_code)]
1757fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1758 let arr = match raw {
1759 Value::Array(arr) => arr,
1760 _ => {
1761 return Err(SdkError::from(ScriptError::Parse {
1762 fcall: "parse_fail_result".into(),
1763 execution_id: None,
1764 message: "ff_fail_execution: expected Array".into(),
1765 }));
1766 }
1767 };
1768
1769 let status_code = match arr.first() {
1770 Some(Ok(Value::Int(n))) => *n,
1771 _ => {
1772 return Err(SdkError::from(ScriptError::Parse {
1773 fcall: "parse_fail_result".into(),
1774 execution_id: None,
1775 message: "ff_fail_execution: bad status code".into(),
1776 }));
1777 }
1778 };
1779
1780 if status_code != 1 {
1781 let err_field_str = |idx: usize| -> String {
1782 arr.get(idx)
1783 .and_then(|v| match v {
1784 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1785 Ok(Value::SimpleString(s)) => Some(s.clone()),
1786 _ => None,
1787 })
1788 .unwrap_or_default()
1789 };
1790 let error_code = {
1791 let s = err_field_str(1);
1792 if s.is_empty() { "unknown".to_owned() } else { s }
1793 };
1794 let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1795 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1796 return Err(SdkError::from(
1797 ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1798 ScriptError::Parse {
1799 fcall: "parse_fail_result".into(),
1800 execution_id: None,
1801 message: format!("ff_fail_execution: {error_code}"),
1802 }
1803 }),
1804 ));
1805 }
1806
1807 // Parse sub-status from field[2] (index 2 = first field after status+OK)
1808 let sub_status = arr
1809 .get(2)
1810 .and_then(|v| match v {
1811 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1812 Ok(Value::SimpleString(s)) => Some(s.clone()),
1813 _ => None,
1814 })
1815 .unwrap_or_default();
1816
1817 match sub_status.as_str() {
1818 "retry_scheduled" => {
1819 // Lua returns: ok("retry_scheduled", tostring(delay_until))
1820 // arr[3] = delay_until
1821 let delay_str = arr
1822 .get(3)
1823 .and_then(|v| match v {
1824 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1825 Ok(Value::Int(n)) => Some(n.to_string()),
1826 _ => None,
1827 })
1828 .unwrap_or_default();
1829 let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1830
1831 Ok(FailOutcome::RetryScheduled {
1832 delay_until: TimestampMs::from_millis(delay_until),
1833 })
1834 }
1835 "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1836 _ => Err(SdkError::from(ScriptError::Parse {
1837 fcall: "parse_fail_result".into(),
1838 execution_id: None,
1839 message: format!(
1840 "ff_fail_execution: unexpected sub-status: {sub_status}"
1841 ),
1842 })),
1843 }
1844}
1845
1846// ── Stream read / tail (consumer API, RFC-006 #2) ──
1847
1848/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1849/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1850/// server would accept.
1851pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1852
1853/// Maximum frames per read/tail call. Mirrors
1854/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1855/// callers don't need to import ff-core just to read the bound.
1856pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1857
1858/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1859/// signal so polling consumers can exit cleanly.
1860///
1861/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1862pub use ff_core::contracts::StreamFrames;
1863
1864/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1865/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1866/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1867/// `StreamCursor::Start` / `StreamCursor::End` instead.
1868pub use ff_core::contracts::StreamCursor;
1869
1870/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1871/// — XREAD does not accept the open markers. Pulled out as a bare
1872/// function so unit tests can exercise the guard without constructing a
1873/// live `ferriskey::Client`.
1874fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1875 if !after.is_concrete() {
1876 return Err(SdkError::Config {
1877 context: "tail_stream".into(),
1878 field: Some("after".into()),
1879 message: "XREAD cursor must be a concrete entry id; pass \
1880 StreamCursor::from_beginning() to start from the \
1881 beginning"
1882 .into(),
1883 });
1884 }
1885 Ok(())
1886}
1887
1888fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1889 if count_limit == 0 {
1890 return Err(SdkError::Config {
1891 context: "read_stream_frames".into(),
1892 field: Some("count_limit".into()),
1893 message: "count_limit must be >= 1".into(),
1894 });
1895 }
1896 if count_limit > STREAM_READ_HARD_CAP {
1897 return Err(SdkError::Config {
1898 context: "read_stream_frames".into(),
1899 field: Some("count_limit".into()),
1900 message: format!(
1901 "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1902 ),
1903 });
1904 }
1905 Ok(())
1906}
1907
1908/// Read frames from a completed or in-flight attempt's stream.
1909///
1910/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1911/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1912/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1913/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1914/// `0` returns [`SdkError::Config`].
1915///
1916/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1917/// consumers know when the producer has finalized the stream. A
1918/// never-written attempt and an in-progress stream are indistinguishable
1919/// here — both present as `frames=[]`, `closed_at=None`.
1920///
1921/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1922/// client but are not the lease-holding worker — no lease check is
1923/// performed.
1924///
1925/// # Head-of-line note
1926///
1927/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1928/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1929/// calling this on a `client` that is also serving FCALLs stalls those
1930/// FCALLs behind the reply. The REST server isolates reads on its
1931/// `tail_client`; direct SDK callers should either use a dedicated
1932/// client OR paginate through smaller `count_limit` slices.
1933pub async fn read_stream(
1934 client: &Client,
1935 partition_config: &PartitionConfig,
1936 execution_id: &ExecutionId,
1937 attempt_index: AttemptIndex,
1938 from: StreamCursor,
1939 to: StreamCursor,
1940 count_limit: u64,
1941) -> Result<StreamFrames, SdkError> {
1942 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
1943 validate_stream_read_count(count_limit)?;
1944
1945 let partition = execution_partition(execution_id, partition_config);
1946 let ctx = ExecKeyContext::new(&partition, execution_id);
1947 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
1948
1949 // Lower the opaque cursor to the Valkey XRANGE marker at the
1950 // adapter edge. `ReadFramesArgs` is string-typed — the Lua ABI is
1951 // untouched.
1952 let args = ReadFramesArgs {
1953 execution_id: execution_id.clone(),
1954 attempt_index,
1955 from_id: from.into_wire_string(),
1956 to_id: to.into_wire_string(),
1957 count_limit,
1958 };
1959
1960 let ReadFramesResult::Frames(f) =
1961 ff_script::functions::stream::ff_read_attempt_stream(client, &keys, &args)
1962 .await
1963 .map_err(SdkError::from)?;
1964 Ok(f)
1965}
1966
1967/// Tail a live attempt's stream.
1968///
1969/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1970/// with id strictly greater than `after`. Pass
1971/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1972/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1973/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1974/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1975///
1976/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1977/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1978/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1979/// SDK and REST ceilings aligned.
1980///
1981/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1982/// polling consumers should loop until `result.is_closed()` is true, then
1983/// drain and exit. Timeout with no new frames presents as
1984/// `frames=[], closed_at=None`.
1985///
1986/// # Head-of-line warning — use a dedicated client
1987///
1988/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1989/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1990/// the read side until a frame arrives or the block elapses. If the
1991/// `client` you pass here is ALSO used for claims, completes, fails,
1992/// appends, or any other FCALL, a 30-second tail will stall all those
1993/// calls for up to 30 seconds.
1994///
1995/// **Strongly recommended**: build a separate `ferriskey::Client` for
1996/// tail callers — mirrors the `Server::tail_client` split that the REST
1997/// server uses internally (see `crates/ff-server/src/server.rs` and
1998/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1999///
2000/// # Tail parallelism caveat (same mux)
2001///
2002/// Even a dedicated tail client is still one multiplexed TCP connection.
2003/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
2004/// ferriskey's per-call `request_timeout` starts at future-poll — so
2005/// two concurrent tails against the same client can time out spuriously:
2006/// the second call's BLOCK budget elapses while it waits for the first
2007/// BLOCK to return. The REST server handles this internally with a
2008/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
2009/// each call its full `block_ms` budget at the server.
2010///
2011/// **Direct SDK callers that need concurrent tails**: either
2012/// (1) build ONE `ferriskey::Client` per concurrent tail call (a small
2013/// pool of clients, rotated by the caller), OR
2014/// (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
2015/// only one BLOCK is in flight per client at a time.
2016/// If you need the REST-side backpressure (429 on contention) and the
2017/// built-in serializer, go through the
2018/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
2019/// than calling this directly.
2020///
2021/// This SDK does not enforce either pattern — the mutex belongs at the
2022/// application layer, and the connection pool belongs at the SDK
2023/// caller's DI layer; neither has a structured place inside this
2024/// helper.
2025///
2026/// # Timeout handling
2027///
2028/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
2029/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
2030/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
2031/// 500ms)`, overriding the client's default per-call. A tail with
2032/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
2033/// the client was built with a shorter `request_timeout`. No custom client
2034/// configuration is required for timeout reasons — only for head-of-line
2035/// isolation above.
2036pub async fn tail_stream(
2037 client: &Client,
2038 partition_config: &PartitionConfig,
2039 execution_id: &ExecutionId,
2040 attempt_index: AttemptIndex,
2041 after: StreamCursor,
2042 block_ms: u64,
2043 count_limit: u64,
2044) -> Result<StreamFrames, SdkError> {
2045 if block_ms > MAX_TAIL_BLOCK_MS {
2046 return Err(SdkError::Config {
2047 context: "tail_stream".into(),
2048 field: Some("block_ms".into()),
2049 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
2050 });
2051 }
2052 validate_stream_read_count(count_limit)?;
2053 // XREAD does not accept `-` / `+` markers as cursors — reject at
2054 // the SDK boundary with `SdkError::Config` rather than forwarding
2055 // an invalid `-`/`+` into ferriskey (which would surface as an
2056 // opaque server error).
2057 validate_tail_cursor(&after)?;
2058
2059 let partition = execution_partition(execution_id, partition_config);
2060 let ctx = ExecKeyContext::new(&partition, execution_id);
2061 let stream_key = ctx.stream(attempt_index);
2062 let stream_meta_key = ctx.stream_meta(attempt_index);
2063
2064 ff_script::stream_tail::xread_block(
2065 client,
2066 &stream_key,
2067 &stream_meta_key,
2068 after.to_wire(),
2069 block_ms,
2070 count_limit,
2071 )
2072 .await
2073 .map_err(SdkError::from)
2074}
2075
2076#[cfg(test)]
2077mod tail_stream_boundary_tests {
2078 use super::*;
2079
2080 // `validate_tail_cursor` rejects `StreamCursor::Start` and
2081 // `StreamCursor::End` before `tail_stream` touches the client —
2082 // same shape as the `count_limit` guard above. The matching
2083 // full-path rejection on the REST layer is covered by
2084 // `ff-server::api`.
2085
2086 #[test]
2087 fn rejects_start_cursor() {
2088 let err = validate_tail_cursor(&StreamCursor::Start)
2089 .expect_err("Start must be rejected");
2090 match err {
2091 SdkError::Config { field, context, .. } => {
2092 assert_eq!(field.as_deref(), Some("after"));
2093 assert_eq!(context, "tail_stream");
2094 }
2095 other => panic!("expected SdkError::Config, got {other:?}"),
2096 }
2097 }
2098
2099 #[test]
2100 fn rejects_end_cursor() {
2101 let err = validate_tail_cursor(&StreamCursor::End)
2102 .expect_err("End must be rejected");
2103 assert!(matches!(err, SdkError::Config { .. }));
2104 }
2105
2106 #[test]
2107 fn accepts_at_cursor() {
2108 validate_tail_cursor(&StreamCursor::At("0-0".into()))
2109 .expect("At cursor must be accepted");
2110 validate_tail_cursor(&StreamCursor::from_beginning())
2111 .expect("from_beginning() must be accepted");
2112 validate_tail_cursor(&StreamCursor::At("123-0".into()))
2113 .expect("concrete id must be accepted");
2114 }
2115}
2116
2117#[cfg(test)]
2118mod parse_report_usage_result_tests {
2119 use super::*;
2120
2121 /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
2122 /// BulkString and SimpleString uniformly (see
2123 /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
2124 /// `Value::SimpleString(s)` → clone). SimpleString avoids a
2125 /// dev-dependency on `bytes` just for test construction.
2126 fn s(v: &str) -> Result<Value, ferriskey::Error> {
2127 Ok(Value::SimpleString(v.to_owned()))
2128 }
2129
2130 fn int(n: i64) -> Result<Value, ferriskey::Error> {
2131 Ok(Value::Int(n))
2132 }
2133
2134 fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2135 Value::Array(items)
2136 }
2137
2138 #[test]
2139 fn ok_status() {
2140 let raw = arr(vec![int(1), s("OK")]);
2141 assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2142 }
2143
2144 #[test]
2145 fn already_applied_status() {
2146 let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2147 assert_eq!(
2148 parse_report_usage_result(&raw).unwrap(),
2149 ReportUsageResult::AlreadyApplied
2150 );
2151 }
2152
2153 #[test]
2154 fn soft_breach_status() {
2155 let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2156 match parse_report_usage_result(&raw).unwrap() {
2157 ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2158 assert_eq!(dimension, "tokens");
2159 assert_eq!(current_usage, 150);
2160 assert_eq!(soft_limit, 100);
2161 }
2162 other => panic!("expected SoftBreach, got {other:?}"),
2163 }
2164 }
2165
2166 #[test]
2167 fn hard_breach_status() {
2168 let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2169 match parse_report_usage_result(&raw).unwrap() {
2170 ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2171 assert_eq!(dimension, "requests");
2172 assert_eq!(current_usage, 10001);
2173 assert_eq!(hard_limit, 10000);
2174 }
2175 other => panic!("expected HardBreach, got {other:?}"),
2176 }
2177 }
2178
2179 /// Negative case: non-Array input. Guards against a future Lua refactor
2180 /// that accidentally returns a bare string/int — the parser must fail
2181 /// loudly rather than silently succeed or panic.
2182 #[test]
2183 fn non_array_input_is_parse_error() {
2184 let raw = Value::SimpleString("OK".to_owned());
2185 let err = parse_report_usage_result(&raw).unwrap_err();
2186 let msg = format!("{err}");
2187 assert!(
2188 msg.to_lowercase().contains("expected array"),
2189 "error should mention expected shape, got: {msg}"
2190 );
2191 }
2192
2193 /// Negative case: Array whose first element isn't an Int status code.
2194 /// The Lua function's first return slot is always `status_code` (1 on
2195 /// success, an error code otherwise); a non-Int there is a wire-format
2196 /// break that must surface as a parse error.
2197 #[test]
2198 fn first_element_non_int_is_parse_error() {
2199 let raw = arr(vec![s("not_an_int"), s("OK")]);
2200 let err = parse_report_usage_result(&raw).unwrap_err();
2201 let msg = format!("{err}");
2202 assert!(
2203 msg.to_lowercase().contains("int"),
2204 "error should mention Int status code, got: {msg}"
2205 );
2206 }
2207
2208 /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2209 /// field. Guards against the silent-coercion defect cross-review
2210 /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2211 /// which would have surfaced Lua-side wire-format drift as a
2212 /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2213 /// but semantically wrong (a "breach with zero usage" is nonsense
2214 /// and masks the real error).
2215 #[test]
2216 fn soft_breach_non_numeric_current_is_parse_error() {
2217 let raw = arr(vec![
2218 int(1),
2219 s("SOFT_BREACH"),
2220 s("tokens"),
2221 s("not_a_number"), // current_usage — must fail, not coerce to 0
2222 s("100"),
2223 ]);
2224 let err = parse_report_usage_result(&raw).unwrap_err();
2225 let msg = format!("{err}");
2226 assert!(
2227 msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2228 "error should identify sub-status + field, got: {msg}"
2229 );
2230 assert!(
2231 msg.to_lowercase().contains("u64"),
2232 "error should mention the expected type (u64), got: {msg}"
2233 );
2234 }
2235
2236 /// Negative case: HARD_BREACH with the limit slot missing
2237 /// entirely. Same defence as the non-numeric test above: a
2238 /// truncated response must fail loudly rather than coerce to 0.
2239 #[test]
2240 fn hard_breach_missing_limit_is_parse_error() {
2241 let raw = arr(vec![
2242 int(1),
2243 s("HARD_BREACH"),
2244 s("requests"),
2245 s("10001"),
2246 // no index 4 — hard_limit missing
2247 ]);
2248 let err = parse_report_usage_result(&raw).unwrap_err();
2249 let msg = format!("{err}");
2250 assert!(
2251 msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2252 "error should identify sub-status + field, got: {msg}"
2253 );
2254 assert!(
2255 msg.to_lowercase().contains("missing"),
2256 "error should say 'missing', got: {msg}"
2257 );
2258 }
2259}
2260
2261
2262#[cfg(test)]
2263mod resume_signals_tests {
2264 use super::*;
2265
2266 fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2267 pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2268 }
2269
2270 #[test]
2271 fn empty_suspension_returns_none() {
2272 let susp = m(&[]);
2273 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2274 assert!(out.is_none(), "no suspension record → None");
2275 }
2276
2277 #[test]
2278 fn stale_prior_attempt_returns_none() {
2279 let wp = WaitpointId::new();
2280 let susp = m(&[
2281 ("attempt_index", "0"),
2282 ("close_reason", "resumed"),
2283 ("waitpoint_id", &wp.to_string()),
2284 ]);
2285 // Claimed attempt is 1; suspension belongs to 0 → stale.
2286 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2287 assert!(out.is_none(), "attempt_index mismatch → None");
2288 }
2289
2290 #[test]
2291 fn non_resumed_close_returns_none() {
2292 let wp = WaitpointId::new();
2293 for reason in ["timeout", "cancelled", "", "expired"] {
2294 let susp = m(&[
2295 ("attempt_index", "0"),
2296 ("close_reason", reason),
2297 ("waitpoint_id", &wp.to_string()),
2298 ]);
2299 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2300 assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2301 }
2302 }
2303
2304 #[test]
2305 fn resumed_same_attempt_returns_waitpoint() {
2306 let wp = WaitpointId::new();
2307 let susp = m(&[
2308 ("attempt_index", "2"),
2309 ("close_reason", "resumed"),
2310 ("waitpoint_id", &wp.to_string()),
2311 ]);
2312 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2313 assert_eq!(out, Some(wp));
2314 }
2315
2316 #[test]
2317 fn malformed_waitpoint_id_is_error() {
2318 let susp = m(&[
2319 ("attempt_index", "0"),
2320 ("close_reason", "resumed"),
2321 ("waitpoint_id", "not-a-uuid"),
2322 ]);
2323 let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2324 assert!(
2325 format!("{err}").contains("not a valid UUID"),
2326 "error should mention invalid UUID, got: {err}"
2327 );
2328 }
2329
2330 #[test]
2331 fn empty_waitpoint_id_returns_none() {
2332 // Defensive: an empty waitpoint_id field (shouldn't happen on
2333 // resumed records, but guard against partial writes) is None, not an error.
2334 let susp = m(&[
2335 ("attempt_index", "0"),
2336 ("close_reason", "resumed"),
2337 ("waitpoint_id", ""),
2338 ]);
2339 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2340 assert!(out.is_none());
2341 }
2342
2343 // The previous `matched_signal_ids_from_condition` helper was
2344 // removed when the production path switched from unbounded
2345 // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2346 // feedback on unbounded condition-hash reply size). That loop
2347 // is exercised by the integration tests in `ff-test`.
2348}
2349
2350#[cfg(test)]
2351mod terminal_replay_parsing_tests {
2352 //! Unit tests for the SDK's parse path of the enriched
2353 //! `execution_not_active` error returned on a terminal-op replay.
2354 //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2355 //! the Lua side emits the 4-slot detail; these tests prove the
2356 //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2357 //! variant so the reconciler in complete()/fail()/cancel() can
2358 //! match on them.
2359
2360 use super::*;
2361 use ferriskey::Value;
2362
2363 // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2364 // in the test harness. parse_success_result + parse_fail_result handle both.
2365 fn bulk(s: &str) -> Value {
2366 Value::SimpleString(s.to_owned())
2367 }
2368
2369 /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2370 #[test]
2371 fn parse_success_result_extracts_all_four_detail_slots() {
2372 let raw = Value::Array(vec![
2373 Ok(Value::Int(0)),
2374 Ok(bulk("execution_not_active")),
2375 Ok(bulk("success")),
2376 Ok(bulk("42")),
2377 Ok(bulk("terminal")),
2378 Ok(bulk("11111111-1111-1111-1111-111111111111")),
2379 ]);
2380 let err = parse_success_result(&raw, "test").unwrap_err();
2381 let unboxed = match err {
2382 SdkError::Engine(b) => *b,
2383 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2384 };
2385 match unboxed {
2386 crate::EngineError::Contention(
2387 crate::ContentionKind::ExecutionNotActive {
2388 terminal_outcome,
2389 lease_epoch,
2390 lifecycle_phase,
2391 attempt_id,
2392 },
2393 ) => {
2394 assert_eq!(terminal_outcome, "success");
2395 assert_eq!(lease_epoch, "42");
2396 assert_eq!(lifecycle_phase, "terminal");
2397 assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2398 }
2399 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2400 }
2401 }
2402
2403 /// parse_fail_result must extract all detail slots too so the
2404 /// reconciler in fail() can match lifecycle_phase = "runnable" for
2405 /// retry-scheduled replays.
2406 #[test]
2407 fn parse_fail_result_extracts_all_four_detail_slots() {
2408 let raw = Value::Array(vec![
2409 Ok(Value::Int(0)),
2410 Ok(bulk("execution_not_active")),
2411 Ok(bulk("none")),
2412 Ok(bulk("7")),
2413 Ok(bulk("runnable")),
2414 Ok(bulk("22222222-2222-2222-2222-222222222222")),
2415 ]);
2416 let err = parse_fail_result(&raw).unwrap_err();
2417 let unboxed = match err {
2418 SdkError::Engine(b) => *b,
2419 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2420 };
2421 match unboxed {
2422 crate::EngineError::Contention(
2423 crate::ContentionKind::ExecutionNotActive {
2424 terminal_outcome,
2425 lease_epoch,
2426 lifecycle_phase,
2427 attempt_id,
2428 },
2429 ) => {
2430 assert_eq!(terminal_outcome, "none");
2431 assert_eq!(lease_epoch, "7");
2432 assert_eq!(lifecycle_phase, "runnable");
2433 assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2434 }
2435 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2436 }
2437 }
2438
2439 /// Empty detail slots must default to "" (not panic) so older-Lua
2440 /// producers or malformed replies degrade to an unreconcilable
2441 /// variant rather than a Parse error.
2442 #[test]
2443 fn parse_success_result_missing_slots_defaults_to_empty() {
2444 let raw = Value::Array(vec![
2445 Ok(Value::Int(0)),
2446 Ok(bulk("execution_not_active")),
2447 ]);
2448 let err = parse_success_result(&raw, "test").unwrap_err();
2449 let unboxed = match err {
2450 SdkError::Engine(b) => *b,
2451 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2452 };
2453 match unboxed {
2454 crate::EngineError::Contention(
2455 crate::ContentionKind::ExecutionNotActive {
2456 terminal_outcome,
2457 lease_epoch,
2458 lifecycle_phase,
2459 attempt_id,
2460 },
2461 ) => {
2462 assert_eq!(terminal_outcome, "");
2463 assert_eq!(lease_epoch, "");
2464 assert_eq!(lifecycle_phase, "");
2465 assert_eq!(attempt_id, "");
2466 }
2467 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2468 }
2469 }
2470}