ff_sdk/task.rs
1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_script::error::ScriptError;
11use ff_core::keys::{ExecKeyContext, IndexKeys};
12use ff_core::partition::{execution_partition, PartitionConfig};
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21/// Timeout behavior when a suspension deadline is reached.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum TimeoutBehavior {
24 Fail,
25 Cancel,
26 Expire,
27 AutoResume,
28 Escalate,
29}
30
31impl TimeoutBehavior {
32 pub fn as_str(&self) -> &str {
33 match self {
34 Self::Fail => "fail",
35 Self::Cancel => "cancel",
36 Self::Expire => "expire",
37 Self::AutoResume => "auto_resume_with_timeout_signal",
38 Self::Escalate => "escalate",
39 }
40 }
41}
42
43impl std::fmt::Display for TimeoutBehavior {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.write_str(self.as_str())
46 }
47}
48
49impl std::str::FromStr for TimeoutBehavior {
50 type Err = String;
51
52 fn from_str(s: &str) -> Result<Self, Self::Err> {
53 match s {
54 "fail" => Ok(Self::Fail),
55 "cancel" => Ok(Self::Cancel),
56 "expire" => Ok(Self::Expire),
57 "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
58 "escalate" => Ok(Self::Escalate),
59 other => Err(format!("unknown timeout behavior: {other}")),
60 }
61 }
62}
63
64/// A condition matcher for the resume condition.
65#[derive(Clone, Debug)]
66pub struct ConditionMatcher {
67 /// Signal name to match (empty = wildcard).
68 pub signal_name: String,
69}
70
71/// Outcome of a `suspend()` call.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub enum SuspendOutcome {
74 /// Execution is now suspended, waitpoint active.
75 Suspended {
76 suspension_id: SuspensionId,
77 waitpoint_id: WaitpointId,
78 waitpoint_key: String,
79 /// HMAC token that signal deliverers must supply to this waitpoint.
80 waitpoint_token: WaitpointToken,
81 },
82 /// Buffered signals already satisfied the condition — suspension skipped.
83 /// The caller still holds the lease.
84 AlreadySatisfied {
85 suspension_id: SuspensionId,
86 waitpoint_id: WaitpointId,
87 waitpoint_key: String,
88 waitpoint_token: WaitpointToken,
89 },
90}
91
92/// A signal to deliver to a suspended execution's waitpoint.
93#[derive(Clone, Debug)]
94pub struct Signal {
95 pub signal_name: String,
96 pub signal_category: String,
97 pub payload: Option<Vec<u8>>,
98 pub source_type: String,
99 pub source_identity: String,
100 pub idempotency_key: Option<String>,
101 /// HMAC token issued when the waitpoint was created. Required for
102 /// authenticated signal delivery (RFC-004 §Waitpoint Security).
103 pub waitpoint_token: WaitpointToken,
104}
105
106/// Outcome of `deliver_signal()`.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum SignalOutcome {
109 /// Signal accepted, appended to waitpoint but condition not yet satisfied.
110 Accepted { signal_id: SignalId, effect: String },
111 /// Signal triggered resume — execution is now runnable.
112 TriggeredResume { signal_id: SignalId },
113 /// Duplicate signal (idempotency key matched).
114 Duplicate { existing_signal_id: String },
115}
116
117impl SignalOutcome {
118 /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
119 ///
120 /// Consuming packages that call `ff_deliver_signal` directly can use this
121 /// to interpret the Lua return value without depending on SDK internals.
122 pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
123 parse_signal_result(raw)
124 }
125}
126
127/// A signal that triggered a resume, readable by a worker after re-claim.
128///
129/// **RFC-012 Stage 0:** the canonical definition has moved to
130/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
131/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
132/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
133/// scheduled for removal in 0.5.0.
134pub use ff_core::backend::ResumeSignal;
135
136/// Outcome of `append_frame()`.
137///
138/// **RFC-012 §R7.2.1:** canonical definition moved to
139/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
140/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
141/// 0.4.x window. Existing consumers construct + match exhaustively
142/// without change. The derive set widened from `Clone, Debug` to
143/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
144pub use ff_core::backend::AppendFrameOutcome;
145
146/// Outcome of a `fail()` call.
147///
148/// **RFC-012 Stage 1a:** canonical definition moved to
149/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
150/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
151/// re-export in `lib.rs`) through the 0.4.x window. Existing
152/// consumers construct + match exhaustively without change.
153pub use ff_core::backend::FailOutcome;
154
155/// A claimed execution with an active lease. The worker processes this task
156/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
157///
158/// The lease is automatically renewed in the background at `lease_ttl / 3`
159/// intervals. Renewal stops when the task is consumed or dropped.
160///
161/// `complete`, `fail`, and `cancel` consume `self` — this prevents
162/// double-complete bugs at the type level.
163pub struct ClaimedTask {
164 /// Shared Valkey client.
165 client: Client,
166 /// `EngineBackend` the trait-migrated ops forward through.
167 ///
168 /// **RFC-012 Stage 1b + Round-7.** Today this is always a
169 /// `ValkeyBackend` wrapping `client`. Most per-task ops
170 /// (`complete`/`fail`/`cancel`/`delay_execution`/
171 /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
172 /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
173 /// through `backend.*()`. `suspend` still uses
174 /// `client.fcall("ff_suspend_execution", ...)` directly — this is
175 /// the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
176 /// input-shape work. Lease renewal goes through
177 /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
178 /// trait-shape gaps tracked by #117.
179 ///
180 /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
181 /// deliver_signal) through the same trait surface; Stage 1d
182 /// refactors this struct to carry a single `Handle` rather than
183 /// synthesising one per op via `synth_handle`.
184 backend: Arc<dyn EngineBackend>,
185 /// Partition config for key construction.
186 partition_config: PartitionConfig,
187 /// Execution identity.
188 execution_id: ExecutionId,
189 /// Current attempt.
190 attempt_index: AttemptIndex,
191 attempt_id: AttemptId,
192 /// Lease identity.
193 lease_id: LeaseId,
194 lease_epoch: LeaseEpoch,
195 /// Lease timing.
196 lease_ttl_ms: u64,
197 /// Lane used at claim time.
198 lane_id: LaneId,
199 /// Worker instance that holds this lease (for index cleanup keys).
200 worker_instance_id: WorkerInstanceId,
201 /// Execution data.
202 input_payload: Vec<u8>,
203 execution_kind: String,
204 tags: HashMap<String, String>,
205 /// Background renewal task handle.
206 renewal_handle: JoinHandle<()>,
207 /// Signal to stop renewal (used before consuming self).
208 renewal_stop: Arc<Notify>,
209 /// Consecutive lease renewal failures. Shared with the background renewal
210 /// task. Reset to 0 on each successful renewal. Workers should check
211 /// `is_lease_healthy()` before committing expensive side effects.
212 renewal_failures: Arc<AtomicU32>,
213 /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
214 /// response is received, just before `self` is consumed into `Drop`.
215 /// `Drop` reads this instead of `renewal_handle.is_finished()` to
216 /// suppress the false-positive "dropped without terminal operation"
217 /// warning: after `notify_one`, the renewal task has not yet been
218 /// polled by the runtime, so `is_finished()` is still `false` on the
219 /// happy path when self is being consumed.
220 ///
221 /// Note: the flag is set for any terminal-op path that reaches
222 /// `stop_renewal()`, which includes Lua-level script errors (the
223 /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
224 /// the caller already receives the `Err` via the op's return value,
225 /// so an additional `Drop` warning would be noise. The warning is
226 /// reserved for genuine drop-without-terminal-op cases (panic, early
227 /// return, transport failure before stop_renewal ran).
228 terminal_op_called: AtomicBool,
229 /// Concurrency permit from the worker's semaphore. Held for the lifetime
230 /// of the task; released on complete/fail/cancel/drop.
231 _concurrency_permit: Option<OwnedSemaphorePermit>,
232}
233
234impl ClaimedTask {
235 /// Construct a `ClaimedTask` from the results of a successful
236 /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
237 ///
238 /// # Arguments
239 ///
240 /// * `client` — shared Valkey client used for subsequent lease
241 /// renewals, signal delivery, and the final
242 /// complete/fail/cancel FCALL.
243 /// * `partition_config` — partition topology snapshot read at
244 /// `FlowFabricWorker::connect`. Used for key construction on
245 /// the lifetime of this task.
246 /// * `execution_id` — the claimed execution's UUID.
247 /// * `attempt_index` / `attempt_id` — current attempt identity.
248 /// `attempt_index` is 0 on a fresh claim, preserved on a
249 /// resumed claim.
250 /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
251 /// is returned by the Lua FCALL and bumped on each resumed
252 /// claim.
253 /// * `lease_ttl_ms` — lease TTL used to schedule the background
254 /// renewal task (renews at `lease_ttl_ms / 3`).
255 /// * `lane_id` — the lane the task was claimed on. Used for
256 /// index key construction (`lane_active`, `lease_expiry`,
257 /// etc.).
258 /// * `worker_instance_id` — this worker's identity. Used to
259 /// resolve `worker_leases` index entries during renewal and
260 /// completion.
261 /// * `input_payload` / `execution_kind` / `tags` — pre-read
262 /// execution metadata, exposed via getters so the worker
263 /// doesn't round-trip to Valkey to inspect what it just
264 /// claimed.
265 ///
266 /// # Invariant: constructor is `pub(crate)` on purpose
267 ///
268 /// **External callers cannot construct a `ClaimedTask`** — only
269 /// the in-crate claim entry points (`claim_next`,
270 /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
271 /// load-bearing: those entry points are the ONLY sites that
272 /// acquire a permit from the worker's concurrency semaphore
273 /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
274 /// it through `ClaimedTask::set_concurrency_permit` before
275 /// returning the task. Promoting `new` to `pub` would let
276 /// consumers build tasks that bypass the concurrency contract
277 /// the worker's `max_concurrent_tasks` config advertises — the
278 /// returned task would run alongside other in-flight work
279 /// without debiting the permit bank, and the
280 /// complete/fail/cancel/drop path would have nothing to
281 /// release.
282 ///
283 /// If an external callsite genuinely needs to rehydrate a
284 /// task from saved FCALL results, the right answer is a new
285 /// `FlowFabricWorker` entry point that wraps `new` + acquires a
286 /// permit — not promoting this constructor.
287 #[allow(clippy::too_many_arguments)]
288 pub(crate) fn new(
289 client: Client,
290 backend: Arc<dyn EngineBackend>,
291 partition_config: PartitionConfig,
292 execution_id: ExecutionId,
293 attempt_index: AttemptIndex,
294 attempt_id: AttemptId,
295 lease_id: LeaseId,
296 lease_epoch: LeaseEpoch,
297 lease_ttl_ms: u64,
298 lane_id: LaneId,
299 worker_instance_id: WorkerInstanceId,
300 input_payload: Vec<u8>,
301 execution_kind: String,
302 tags: HashMap<String, String>,
303 ) -> Self {
304 let renewal_stop = Arc::new(Notify::new());
305 let renewal_failures = Arc::new(AtomicU32::new(0));
306
307 // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
308 // Build the handle once at construction time and hand both Arc clones
309 // into the spawned task.
310 let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
311 execution_id.clone(),
312 attempt_index,
313 attempt_id.clone(),
314 lease_id.clone(),
315 lease_epoch,
316 lease_ttl_ms,
317 lane_id.clone(),
318 worker_instance_id.clone(),
319 HandleKind::Fresh,
320 );
321 let renewal_handle = spawn_renewal_task(
322 backend.clone(),
323 renewal_handle_cookie,
324 execution_id.clone(),
325 lease_ttl_ms,
326 renewal_stop.clone(),
327 renewal_failures.clone(),
328 );
329
330 Self {
331 client,
332 backend,
333 partition_config,
334 execution_id,
335 attempt_index,
336 attempt_id,
337 lease_id,
338 lease_epoch,
339 lease_ttl_ms,
340 lane_id,
341 worker_instance_id,
342 input_payload,
343 execution_kind,
344 tags,
345 renewal_handle,
346 renewal_stop,
347 renewal_failures,
348 terminal_op_called: AtomicBool::new(false),
349 _concurrency_permit: None,
350 }
351 }
352
353 /// Attach a concurrency permit from the worker's semaphore.
354 /// The permit is held for the task's lifetime and released on drop.
355 #[allow(dead_code)]
356 pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
357 self._concurrency_permit = Some(permit);
358 }
359
360 // ── Accessors ──
361
362 pub fn execution_id(&self) -> &ExecutionId {
363 &self.execution_id
364 }
365
366 pub fn attempt_index(&self) -> AttemptIndex {
367 self.attempt_index
368 }
369
370 pub fn attempt_id(&self) -> &AttemptId {
371 &self.attempt_id
372 }
373
374 pub fn lease_id(&self) -> &LeaseId {
375 &self.lease_id
376 }
377
378 pub fn lease_epoch(&self) -> LeaseEpoch {
379 self.lease_epoch
380 }
381
382 pub fn input_payload(&self) -> &[u8] {
383 &self.input_payload
384 }
385
386 pub fn execution_kind(&self) -> &str {
387 &self.execution_kind
388 }
389
390 pub fn tags(&self) -> &HashMap<String, String> {
391 &self.tags
392 }
393
394 pub fn lane_id(&self) -> &LaneId {
395 &self.lane_id
396 }
397
398 /// Read frames from this task's attempt stream.
399 ///
400 /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
401 /// no longer need the `ferriskey::Client` leak (#87). See
402 /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
403 /// for the backend-level contract.
404 ///
405 /// Validation (count_limit bounds) runs at this boundary — out-of-range
406 /// input surfaces as [`SdkError::Config`] before reaching the backend.
407 pub async fn read_stream(
408 &self,
409 from: StreamCursor,
410 to: StreamCursor,
411 count_limit: u64,
412 ) -> Result<StreamFrames, SdkError> {
413 validate_stream_read_count(count_limit)?;
414 Ok(self
415 .backend
416 .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
417 .await?)
418 }
419
420 /// Tail this task's attempt stream for new frames.
421 ///
422 /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
423 /// for the head-of-line warning — the task's backend is shared
424 /// with claim/complete/fail hot paths. Consumers that need
425 /// isolation should build a dedicated `EngineBackend` for tail
426 /// reads.
427 pub async fn tail_stream(
428 &self,
429 after: StreamCursor,
430 block_ms: u64,
431 count_limit: u64,
432 ) -> Result<StreamFrames, SdkError> {
433 if block_ms > MAX_TAIL_BLOCK_MS {
434 return Err(SdkError::Config {
435 context: "tail_stream".into(),
436 field: Some("block_ms".into()),
437 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
438 });
439 }
440 validate_stream_read_count(count_limit)?;
441 validate_tail_cursor(&after)?;
442 Ok(self
443 .backend
444 .tail_stream(
445 &self.execution_id,
446 self.attempt_index,
447 after,
448 block_ms,
449 count_limit,
450 )
451 .await?)
452 }
453
454 /// Synthesise a Valkey-backend `Handle` encoding this task's
455 /// attempt-cookie fields. Private to the SDK — the trait
456 /// forwarders call this per op to produce the `&Handle` argument
457 /// the `EngineBackend` methods take.
458 ///
459 /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
460 /// carry one cached `Handle` instead of synthesising per op is
461 /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
462 /// a single allocation of a small byte buffer — negligible
463 /// compared to the FCALL round-trip that follows.
464 ///
465 /// Kind is always `HandleKind::Fresh` because today the 9
466 /// Stage-1b ops all treat the backend tag + opaque bytes as
467 /// authoritative; they do not dispatch on kind. Stage 1d routes
468 /// resumed-claim callers through a `Resumed` synth.
469 fn synth_handle(&self) -> Handle {
470 ff_backend_valkey::ValkeyBackend::encode_handle(
471 self.execution_id.clone(),
472 self.attempt_index,
473 self.attempt_id.clone(),
474 self.lease_id.clone(),
475 self.lease_epoch,
476 self.lease_ttl_ms,
477 self.lane_id.clone(),
478 self.worker_instance_id.clone(),
479 HandleKind::Fresh,
480 )
481 }
482
483 /// Check if the lease is likely still valid based on renewal success.
484 ///
485 /// Returns `false` if 3 or more consecutive renewal attempts have failed.
486 /// Workers should check this before committing expensive or irreversible
487 /// side effects. A `false` return means Valkey may have already expired
488 /// the lease and another worker could be processing this execution.
489 pub fn is_lease_healthy(&self) -> bool {
490 self.renewal_failures.load(Ordering::Relaxed) < 3
491 }
492
493 /// Number of consecutive lease renewal failures since the last success.
494 ///
495 /// Returns 0 when renewals are working normally. Useful for observability
496 /// and custom health policies beyond the default threshold of 3.
497 pub fn consecutive_renewal_failures(&self) -> u32 {
498 self.renewal_failures.load(Ordering::Relaxed)
499 }
500
501 // ── Terminal operations (consume self) ──
502
503 /// Delay the execution until `delay_until`.
504 ///
505 /// Releases the lease. The execution moves to `delayed` state.
506 /// Consumes self — the task cannot be used after delay.
507 pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
508 // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
509 // the pre-migration `stop_renewal` contract: called only when
510 // the FCALL round-tripped (Ok or a typed Lua/engine error);
511 // raw transport errors bubble up without stopping renewal so
512 // the `Drop` warning still fires if the caller later drops
513 // `self` without retrying.
514 let handle = self.synth_handle();
515 let out = self.backend.delay(&handle, delay_until).await;
516 if fcall_landed(&out) {
517 self.stop_renewal();
518 }
519 out.map_err(SdkError::from)
520 }
521
522 /// Move execution to waiting_children state.
523 ///
524 /// Releases the lease. The execution waits for child dependencies to complete.
525 /// Consumes self.
526 pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
527 // RFC-012 Stage 1b: forwards through `backend.wait_children`.
528 // See `delay_execution` for the stop_renewal contract.
529 let handle = self.synth_handle();
530 let out = self.backend.wait_children(&handle).await;
531 if fcall_landed(&out) {
532 self.stop_renewal();
533 }
534 out.map_err(SdkError::from)
535 }
536
537 /// Complete the execution successfully.
538 ///
539 /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
540 /// Renewal continues during the FCALL to prevent lease expiry under
541 /// network latency — the Lua fences on lease_id+epoch atomically.
542 /// Consumes self — the task cannot be used after completion.
543 ///
544 /// # Connection errors and replay
545 ///
546 /// If the Valkey connection drops after the Lua commit but before the
547 /// client reads the response, retrying `complete()` with the same
548 /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
549 /// reconciles the "already terminal with matching outcome" response
550 /// into `Ok(())`. A retry after a different terminal op has raced in
551 /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
552 /// populated `terminal_outcome` so the caller can see what actually
553 /// happened.
554 pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
555 // RFC-012 Stage 1b: forwards through `backend.complete`.
556 // The FCALL body + the `ExecutionNotActive` replay reconciliation
557 // (match same epoch + attempt_id + outcome=="success" → Ok)
558 // moved to `ff_backend_valkey::complete_impl`.
559 let handle = self.synth_handle();
560 let out = self.backend.complete(&handle, result_payload).await;
561 if fcall_landed(&out) {
562 self.stop_renewal();
563 }
564 out.map_err(SdkError::from)
565 }
566
567 /// Fail the execution with a reason and error category.
568 ///
569 /// If the execution policy allows retries, the engine schedules a retry
570 /// (delayed backoff). Otherwise, the execution becomes terminal failed.
571 /// Returns [`FailOutcome`] so the caller knows what happened.
572 ///
573 /// # Connection errors and replay
574 ///
575 /// `fail()` is replay-safe under the same conditions as `complete()`:
576 /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
577 /// reconciled into the outcome the server actually committed
578 /// (`TerminalFailed` if no retries left; `RetryScheduled` with
579 /// `delay_until = 0` if a retry was scheduled — the exact delay is
580 /// not recovered on the replay path).
581 pub async fn fail(
582 self,
583 reason: &str,
584 error_category: &str,
585 ) -> Result<FailOutcome, SdkError> {
586 // RFC-012 Stage 1b: forwards through `backend.fail`.
587 // The FCALL body + retry-policy read + the two-shape replay
588 // reconciliation (terminal/failed → TerminalFailed, runnable
589 // → RetryScheduled{0}) moved to
590 // `ff_backend_valkey::fail_impl`.
591 //
592 // **Preserving the caller's raw category string.** The
593 // pre-Stage-1b code passed `error_category` straight to the
594 // Lua `failure_category` ARGV. Real callers use arbitrary
595 // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
596 // `"inference_error"`, …) that do not correspond to the 5
597 // named `FailureClass` variants. A lossy enum conversion
598 // would rewrite every non-canonical category to
599 // `Transient` — a real behavior change for
600 // ff-readiness-tests + the examples crates.
601 //
602 // Instead: pack the raw category bytes into
603 // `FailureReason.detail`; `fail_impl` reads them back and
604 // threads them to Lua verbatim. The `classification` enum
605 // is derived from the string for the few consumers who
606 // match on the trait return today (none in-workspace).
607 // Stage 1d (or issue #117) widens `FailureClass` with a
608 // `Custom(String)` arm; this stash carrier retires then.
609 let handle = self.synth_handle();
610 let failure_reason = ff_core::backend::FailureReason::with_detail(
611 reason.to_owned(),
612 error_category.as_bytes().to_vec(),
613 );
614 let classification = error_category_to_class(error_category);
615 let out = self.backend.fail(&handle, failure_reason, classification).await;
616 if fcall_landed(&out) {
617 self.stop_renewal();
618 }
619 out.map_err(SdkError::from)
620 }
621
622 /// Cancel the execution.
623 ///
624 /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
625 /// Consumes self.
626 ///
627 /// # Connection errors and replay
628 ///
629 /// `cancel()` is replay-safe under the same conditions as `complete()`:
630 /// a retry by the same caller matching `lease_epoch` + `attempt_id`
631 /// returns `Ok(())` if the server's stored `terminal_outcome` is
632 /// `cancelled`. A retry that finds a different outcome (because a
633 /// concurrent `complete()` or `fail()` won the race) surfaces as
634 /// `ExecutionNotActive` with the populated `terminal_outcome` so the
635 /// caller can see that the cancel intent was NOT honored.
636 pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
637 // RFC-012 Stage 1b: forwards through `backend.cancel`.
638 // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
639 // and the `ExecutionNotActive` → outcome=="cancelled" replay
640 // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
641 let handle = self.synth_handle();
642 let out = self.backend.cancel(&handle, reason).await;
643 if fcall_landed(&out) {
644 self.stop_renewal();
645 }
646 out.map_err(SdkError::from)
647 }
648
649 // ── Non-terminal operations ──
650
651 /// Manually renew the lease.
652 ///
653 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
654 /// trait. The background renewal task calls
655 /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
656 /// this method is the public entry point for workers that want
657 /// to force a renew out-of-band.
658 pub async fn renew_lease(&self) -> Result<(), SdkError> {
659 let handle = self.synth_handle();
660 self.backend
661 .renew(&handle)
662 .await
663 .map(|_renewal| ())
664 .map_err(SdkError::from)
665 }
666
667 /// Update progress (pct 0-100 and optional message).
668 ///
669 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
670 /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
671 pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
672 let handle = self.synth_handle();
673 self.backend
674 .progress(&handle, Some(pct), Some(message.to_owned()))
675 .await
676 .map_err(SdkError::from)
677 }
678
679 /// Report usage against a budget and check limits.
680 ///
681 /// Non-consuming — the worker can report usage multiple times.
682 /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
683 /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
684 ///
685 /// **RFC-012 §R7.2.3:** forwards through
686 /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
687 /// The trait's `UsageDimensions.dedup_key` carries the raw key;
688 /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
689 /// wrap so the dedup state co-locates with the budget partition
690 /// (PR #108).
691 pub async fn report_usage(
692 &self,
693 budget_id: &BudgetId,
694 dimensions: &[(&str, u64)],
695 dedup_key: Option<&str>,
696 ) -> Result<ReportUsageResult, SdkError> {
697 let handle = self.synth_handle();
698 let mut dims = ff_core::backend::UsageDimensions::default();
699 for (name, delta) in dimensions {
700 dims.custom.insert((*name).to_owned(), *delta);
701 }
702 dims.dedup_key = dedup_key
703 .filter(|k| !k.is_empty())
704 .map(|k| k.to_owned());
705 self.backend
706 .report_usage(&handle, budget_id, dims)
707 .await
708 .map_err(SdkError::from)
709 }
710
711 /// Create a pending waitpoint for future signal delivery.
712 ///
713 /// Non-consuming — the worker keeps the lease. Signals delivered to the
714 /// waitpoint are buffered. When the worker later calls `suspend()` with
715 /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
716 /// resume condition.
717 ///
718 /// Returns both the waitpoint_id AND the HMAC token required by external
719 /// callers to buffer signals against this pending waitpoint
720 /// (RFC-004 §Waitpoint Security).
721 ///
722 /// **RFC-012 §R7.2.2:** forwards through
723 /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
724 /// The trait returns
725 /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
726 /// `hmac_token` is the same wire HMAC this method has always
727 /// produced; the SDK unwraps it back to the historical
728 /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
729 pub async fn create_pending_waitpoint(
730 &self,
731 waitpoint_key: &str,
732 expires_in_ms: u64,
733 ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
734 let handle = self.synth_handle();
735 let expires_in = std::time::Duration::from_millis(expires_in_ms);
736 let pending = self
737 .backend
738 .create_waitpoint(&handle, waitpoint_key, expires_in)
739 .await
740 .map_err(SdkError::from)?;
741 // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
742 // `.token()` accessor borrows the underlying
743 // `WaitpointToken`, which is the caller's historical return
744 // shape.
745 Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
746 }
747
748 // ── Phase 4: Streaming ──
749
750 /// Append a frame to the current attempt's output stream.
751 ///
752 /// Non-consuming — the worker can append many frames during execution.
753 /// The stream is created lazily on the first append.
754 ///
755 /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
756 /// `EngineBackend` trait. The free-form `frame_type` tag and
757 /// optional `metadata` (wire `correlation_id`) travel on the
758 /// extended [`ff_core::backend::Frame`] shape (`frame_type:
759 /// String`, `correlation_id: Option<String>`), giving byte-for-byte
760 /// wire parity with the pre-migration direct-FCALL path.
761 pub async fn append_frame(
762 &self,
763 frame_type: &str,
764 payload: &[u8],
765 metadata: Option<&str>,
766 ) -> Result<AppendFrameOutcome, SdkError> {
767 let handle = self.synth_handle();
768 let mut frame = ff_core::backend::Frame::new(
769 payload.to_vec(),
770 ff_core::backend::FrameKind::Event,
771 )
772 .with_frame_type(frame_type);
773 if let Some(cid) = metadata {
774 frame = frame.with_correlation_id(cid);
775 }
776 self.backend
777 .append_frame(&handle, frame)
778 .await
779 .map_err(SdkError::from)
780 }
781
782 // ── Phase 3: Suspend ──
783
784 /// Suspend the execution, releasing the lease and creating a waitpoint.
785 ///
786 /// The execution transitions to `suspended` and the worker loses ownership.
787 /// An external signal matching the condition will resume the execution.
788 ///
789 /// If `condition_matchers` is empty, a wildcard matcher is created that
790 /// matches ANY signal name. To require an explicit operator resume with
791 /// no signal match, pass a sentinel name that no real signal will use.
792 ///
793 /// If buffered signals on a pending waitpoint already satisfy the condition,
794 /// returns `AlreadySatisfied` and the lease is NOT released.
795 ///
796 /// Consumes self — the task cannot be used after suspension.
797 // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
798 // method `suspend` returns `Handle` (a resumed-kind attempt
799 // cookie); this SDK method returns `SuspendOutcome { suspension_id,
800 // waitpoint_id, waitpoint_key, waitpoint_token }` — semantically
801 // distinct return shapes. Also: SDK takes `&[ConditionMatcher]` +
802 // `TimeoutBehavior` with no `WaitpointSpec` mapping. Tracked in #117.
803 pub async fn suspend(
804 self,
805 reason_code: &str,
806 condition_matchers: &[ConditionMatcher],
807 timeout_ms: Option<u64>,
808 timeout_behavior: TimeoutBehavior,
809 ) -> Result<SuspendOutcome, SdkError> {
810 let partition = execution_partition(&self.execution_id, &self.partition_config);
811 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
812 let idx = IndexKeys::new(&partition);
813
814 let suspension_id = SuspensionId::new();
815 let waitpoint_id = WaitpointId::new();
816 // For now, use a simple opaque key (real implementation would use HMAC token)
817 let waitpoint_key = format!("wpk:{}", waitpoint_id);
818
819 let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
820
821 // Build resume condition JSON
822 let required_signal_names: Vec<&str> = condition_matchers
823 .iter()
824 .map(|m| m.signal_name.as_str())
825 .collect();
826 let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
827 let resume_condition_json = serde_json::json!({
828 "condition_type": "signal_set",
829 "required_signal_names": required_signal_names,
830 "signal_match_mode": match_mode,
831 "minimum_signal_count": 1,
832 "timeout_behavior": timeout_behavior.as_str(),
833 "allow_operator_override": true,
834 }).to_string();
835
836 let resume_policy_json = serde_json::json!({
837 "resume_target": "runnable",
838 "close_waitpoint_on_resume": true,
839 "consume_matched_signals": true,
840 "retain_signal_buffer_until_closed": true,
841 }).to_string();
842
843 // KEYS (17): exec_core, attempt_record, lease_current, lease_history,
844 // lease_expiry, worker_leases, suspension_current, waitpoint_hash,
845 // waitpoint_signals, suspension_timeout, pending_wp_expiry,
846 // active_index, suspended_index, waitpoint_history, wp_condition,
847 // attempt_timeout, hmac_secrets
848 let keys: Vec<String> = vec![
849 ctx.core(), // 1
850 ctx.attempt_hash(self.attempt_index), // 2
851 ctx.lease_current(), // 3
852 ctx.lease_history(), // 4
853 idx.lease_expiry(), // 5
854 idx.worker_leases(&self.worker_instance_id), // 6
855 ctx.suspension_current(), // 7
856 ctx.waitpoint(&waitpoint_id), // 8
857 ctx.waitpoint_signals(&waitpoint_id), // 9
858 idx.suspension_timeout(), // 10
859 idx.pending_waitpoint_expiry(), // 11
860 idx.lane_active(&self.lane_id), // 12
861 idx.lane_suspended(&self.lane_id), // 13
862 ctx.waitpoints(), // 14
863 ctx.waitpoint_condition(&waitpoint_id), // 15
864 idx.attempt_timeout(), // 16
865 idx.waitpoint_hmac_secrets(), // 17
866 ];
867
868 // ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
869 // lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
870 // reason_code, requested_by, timeout_at, resume_condition_json,
871 // resume_policy_json, continuation_metadata_pointer,
872 // use_pending_waitpoint, timeout_behavior, lease_history_maxlen
873 let args: Vec<String> = vec![
874 self.execution_id.to_string(), // 1
875 self.attempt_index.to_string(), // 2
876 self.attempt_id.to_string(), // 3
877 self.lease_id.to_string(), // 4
878 self.lease_epoch.to_string(), // 5
879 suspension_id.to_string(), // 6
880 waitpoint_id.to_string(), // 7
881 waitpoint_key.clone(), // 8
882 reason_code.to_owned(), // 9
883 "worker".to_owned(), // 10
884 timeout_at.map_or(String::new(), |t| t.to_string()), // 11
885 resume_condition_json, // 12
886 resume_policy_json, // 13
887 String::new(), // 14 continuation_metadata_ptr
888 String::new(), // 15 use_pending_waitpoint
889 timeout_behavior.as_str().to_owned(), // 16
890 "1000".to_owned(), // 17 lease_history_maxlen
891 ];
892
893 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
894 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
895
896 let raw: Value = self
897 .client
898 .fcall("ff_suspend_execution", &key_refs, &arg_refs)
899 .await
900 .map_err(SdkError::from)?;
901
902 self.stop_renewal();
903 parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
904 }
905
906 /// Read the signals that satisfied the waitpoint and triggered this
907 /// resume.
908 ///
909 /// Non-consuming. Intended to be called immediately after re-claim via
910 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
911 /// subsequent `suspend()` (which replaces `suspension:current`).
912 ///
913 /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
914 ///
915 /// - No prior suspension on this execution.
916 /// - The prior suspension belonged to an earlier attempt (e.g. the
917 /// attempt was cancelled/failed and a retry is now claiming).
918 /// - The prior suspension was closed by timeout / cancel / operator
919 /// override rather than by a matched signal.
920 ///
921 /// Reads `suspension:current` once, filters by `attempt_index` to
922 /// guard against stale prior-attempt records, then fetches the matched
923 /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
924 /// fields and reads each signal's metadata + payload directly.
925 pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
926 // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
927 // Pre-migration body (HGETALL suspension_current + HMGET
928 // matchers + pipelined HGETALL signal_hash / GET
929 // signal_payload) lives in
930 // `ff_backend_valkey::observe_signals_impl`.
931 let handle = self.synth_handle();
932 self.backend
933 .observe_signals(&handle)
934 .await
935 .map_err(SdkError::from)
936 }
937
938 /// Signal the renewal task to stop. Called by every terminal op
939 /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
940 /// `move_to_waiting_children`) after the FCALL returns. Also marks
941 /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
942 /// consumption from a genuine drop-without-terminal-op.
943 fn stop_renewal(&self) {
944 self.terminal_op_called.store(true, Ordering::Release);
945 self.renewal_stop.notify_one();
946 }
947
948}
949
950/// True iff the backend's FCALL result represents a round-trip that
951/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
952/// contention, conflict, state, bug) all count as "landed" — the
953/// server either committed or rejected with a typed response. Only
954/// raw `Transport` errors (connection drops, request timeouts, parse
955/// failures) count as "did not land", which matches the pre-Stage-1b
956/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
957/// — those errors returned before `stop_renewal()` ran, preserving
958/// the `Drop` warning for genuine "lease will leak" cases.
959///
960/// Stage 1b terminal-op forwarders use this predicate to decide
961/// whether to call `stop_renewal()`: yes for landed responses, no
962/// for transport errors so the caller's retry path still sees a
963/// running renewal task.
964fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
965 match r {
966 Ok(_) => true,
967 Err(crate::EngineError::Transport { .. }) => false,
968 Err(_) => true,
969 }
970}
971
972/// Map the SDK's free-form `error_category: &str` to the typed
973/// `FailureClass` the trait's `fail` method takes. Unknown categories
974/// fall through to `Transient` — the Lua side already tolerated
975/// arbitrary strings, so the worst a category drift does under the
976/// Stage 1b forwarder is reclassify a novel category as transient.
977/// Stage 1d (or issue #117) widens `FailureClass` with a
978/// `Custom(String)` arm for exact round-trip.
979fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
980 use ff_core::backend::FailureClass;
981 match s {
982 "transient" => FailureClass::Transient,
983 "permanent" => FailureClass::Permanent,
984 "infra_crash" => FailureClass::InfraCrash,
985 "timeout" => FailureClass::Timeout,
986 "cancelled" => FailureClass::Cancelled,
987 _ => FailureClass::Transient,
988 }
989}
990
991impl Drop for ClaimedTask {
992 fn drop(&mut self) {
993 // Abort the background renewal task on drop.
994 // This is a safety net — complete/fail/cancel already stop renewal
995 // via notify before consuming self. But if the task is dropped
996 // without being consumed (e.g., panic), abort prevents leaked renewals.
997 //
998 // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
999 // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1000 // and then self is consumed into Drop immediately. The renewal task
1001 // has not yet been polled by the runtime, so `is_finished()` is still
1002 // `false` here — which previously fired the warning on every
1003 // complete/fail/cancel/suspend call. `terminal_op_called` is the
1004 // authoritative signal that a terminal-op path ran to the point of
1005 // stopping renewal; it does not by itself certify the Lua side
1006 // succeeded (see the field doc). The caller surfaces any error via
1007 // the op's return value, so a `Drop` warning is unneeded there.
1008 if !self.terminal_op_called.load(Ordering::Acquire) {
1009 tracing::warn!(
1010 execution_id = %self.execution_id,
1011 "ClaimedTask dropped without terminal operation — lease will expire"
1012 );
1013 }
1014 self.renewal_handle.abort();
1015 }
1016}
1017
1018// ── Lease renewal ──
1019
1020/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1021/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1022/// hooks still see one span per renewal (restores PR #119 Cursor
1023/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1024/// construction time, not per-tick).
1025///
1026/// See `benches/harness/src/bin/long_running.rs` for the bench
1027/// consumer that depends on this span naming.
1028#[tracing::instrument(
1029 name = "renew_lease",
1030 skip_all,
1031 fields(execution_id = %execution_id)
1032)]
1033async fn renew_once(
1034 backend: &dyn EngineBackend,
1035 handle: &Handle,
1036 execution_id: &ExecutionId,
1037) -> Result<(), crate::EngineError> {
1038 backend.renew(handle).await.map(|_| ())
1039}
1040
1041/// Spawn a background tokio task that renews the lease at `ttl / 3`
1042/// intervals.
1043///
1044/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1045/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1046/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1047/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1048/// + the encoded `Handle` instead. Per-tick tracing lives on
1049/// [`renew_once`]; this function itself is sync + one-shot.
1050///
1051/// Stops when:
1052/// - `stop_signal` is notified (complete/fail/cancel called)
1053/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1054/// - The task handle is aborted (ClaimedTask dropped)
1055fn spawn_renewal_task(
1056 backend: Arc<dyn EngineBackend>,
1057 handle: Handle,
1058 execution_id: ExecutionId,
1059 lease_ttl_ms: u64,
1060 stop_signal: Arc<Notify>,
1061 failure_counter: Arc<AtomicU32>,
1062) -> JoinHandle<()> {
1063 // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1064 // panics if a caller (or a misconfigured test) passes a
1065 // lease_ttl_ms < 3. The SDK config validator already enforces
1066 // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1067 // is a cheap belt-and-suspenders (Copilot review finding on
1068 // PR #119).
1069 let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1070
1071 tokio::spawn(async move {
1072 let mut tick = tokio::time::interval(interval);
1073 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1074 // Skip the first immediate tick — the lease was just acquired.
1075 tick.tick().await;
1076
1077 loop {
1078 tokio::select! {
1079 _ = stop_signal.notified() => {
1080 tracing::debug!(
1081 execution_id = %execution_id,
1082 "lease renewal stopped by signal"
1083 );
1084 return;
1085 }
1086 _ = tick.tick() => {
1087 match renew_once(backend.as_ref(), &handle, &execution_id).await {
1088 Ok(_renewal) => {
1089 failure_counter.store(0, Ordering::Relaxed);
1090 tracing::trace!(
1091 execution_id = %execution_id,
1092 "lease renewed"
1093 );
1094 }
1095 Err(e) if is_terminal_renewal_error(&e) => {
1096 failure_counter.fetch_add(1, Ordering::Relaxed);
1097 tracing::warn!(
1098 execution_id = %execution_id,
1099 error = %e,
1100 "lease renewal failed with terminal error, stopping renewal"
1101 );
1102 return;
1103 }
1104 Err(e) => {
1105 let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1106 tracing::warn!(
1107 execution_id = %execution_id,
1108 error = %e,
1109 consecutive_failures = count,
1110 "lease renewal failed (will retry next interval)"
1111 );
1112 }
1113 }
1114 }
1115 }
1116 }
1117 })
1118}
1119
1120/// Check if an engine error means renewal should stop permanently.
1121#[allow(dead_code)]
1122fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1123 use crate::{ContentionKind, EngineError, StateKind};
1124 matches!(
1125 err,
1126 EngineError::State(
1127 StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1128 ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1129 | EngineError::NotFound { entity: "execution" }
1130 )
1131}
1132
1133// ── FCALL result parsing ──
1134
1135/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1136/// function into a typed [`ReportUsageResult`].
1137///
1138/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1139/// `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1140/// Status code `!= 1` is parsed as a [`ScriptError`] via
1141/// [`ScriptError::from_code_with_detail`].
1142///
1143/// Exposed as `pub` so downstream SDKs that speak the same wire format
1144/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1145/// call this directly instead of re-implementing the parse. Keeping one
1146/// parser paired with the producer (the Lua function registered at
1147/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1148/// against silent format drift between producer and consumer.
1149pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1150 let arr = match raw {
1151 Value::Array(arr) => arr,
1152 _ => {
1153 return Err(SdkError::from(ScriptError::Parse {
1154 fcall: "parse_report_usage_result".into(),
1155 execution_id: None,
1156 message: "ff_report_usage_and_check: expected Array".into(),
1157 }));
1158 }
1159 };
1160 let status_code = match arr.first() {
1161 Some(Ok(Value::Int(n))) => *n,
1162 _ => {
1163 return Err(SdkError::from(ScriptError::Parse {
1164 fcall: "parse_report_usage_result".into(),
1165 execution_id: None,
1166 message: "ff_report_usage_and_check: expected Int status code".into(),
1167 }));
1168 }
1169 };
1170 if status_code != 1 {
1171 let error_code = usage_field_str(arr, 1);
1172 let detail = usage_field_str(arr, 2);
1173 return Err(SdkError::from(
1174 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1175 ScriptError::Parse {
1176 fcall: "parse_report_usage_result".into(),
1177 execution_id: None,
1178 message: format!("ff_report_usage_and_check: {error_code}"),
1179 }
1180 }),
1181 ));
1182 }
1183 let sub_status = usage_field_str(arr, 1);
1184 match sub_status.as_str() {
1185 "OK" => Ok(ReportUsageResult::Ok),
1186 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1187 "SOFT_BREACH" => {
1188 let dim = usage_field_str(arr, 2);
1189 let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1190 let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1191 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1192 }
1193 "HARD_BREACH" => {
1194 let dim = usage_field_str(arr, 2);
1195 let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1196 let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1197 Ok(ReportUsageResult::HardBreach {
1198 dimension: dim,
1199 current_usage: current,
1200 hard_limit: limit,
1201 })
1202 }
1203 _ => Err(SdkError::from(ScriptError::Parse {
1204 fcall: "parse_report_usage_result".into(),
1205 execution_id: None,
1206 message: format!(
1207 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1208 ),
1209 })),
1210 }
1211}
1212
1213fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1214 match arr.get(index) {
1215 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1216 Some(Ok(Value::SimpleString(s))) => s.clone(),
1217 Some(Ok(Value::Int(n))) => n.to_string(),
1218 _ => String::new(),
1219 }
1220}
1221
1222/// Parse a required numeric usage field (u64) from the wire array at
1223/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1224/// holds a non-string/non-int value, or contains a string that does
1225/// not parse as u64.
1226///
1227/// Rationale: the Lua producer (`lua/budget.lua:99`,
1228/// `ff_report_usage_and_check`) always emits
1229/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1230/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1231/// non-numeric value here means the Lua and Rust sides drifted;
1232/// silently coercing to `0` would surface drift as "zero-usage breach"
1233/// — arithmetically correct but semantically nonsense. Fail loudly
1234/// instead so drift shows up as a parse error at the first call site.
1235fn parse_usage_u64(
1236 arr: &[Result<Value, ferriskey::Error>],
1237 index: usize,
1238 sub_status: &str,
1239 field_name: &str,
1240) -> Result<u64, SdkError> {
1241 match arr.get(index) {
1242 Some(Ok(Value::Int(n))) => {
1243 u64::try_from(*n).map_err(|_| {
1244 SdkError::from(ScriptError::Parse {
1245 fcall: "parse_usage_u64".into(),
1246 execution_id: None,
1247 message: format!(
1248 "ff_report_usage_and_check {sub_status}: {field_name} \
1249 (index {index}) negative int {n} cannot be u64"
1250 ),
1251 })
1252 })
1253 }
1254 Some(Ok(Value::BulkString(b))) => {
1255 let s = String::from_utf8_lossy(b);
1256 s.parse::<u64>().map_err(|_| {
1257 SdkError::from(ScriptError::Parse {
1258 fcall: "parse_usage_u64".into(),
1259 execution_id: None,
1260 message: format!(
1261 "ff_report_usage_and_check {sub_status}: {field_name} \
1262 (index {index}) not a u64 string: {s:?}"
1263 ),
1264 })
1265 })
1266 }
1267 Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1268 SdkError::from(ScriptError::Parse {
1269 fcall: "parse_usage_u64".into(),
1270 execution_id: None,
1271 message: format!(
1272 "ff_report_usage_and_check {sub_status}: {field_name} \
1273 (index {index}) not a u64 string: {s:?}"
1274 ),
1275 })
1276 }),
1277 Some(_) => Err(SdkError::from(ScriptError::Parse {
1278 fcall: "parse_usage_u64".into(),
1279 execution_id: None,
1280 message: format!(
1281 "ff_report_usage_and_check {sub_status}: {field_name} \
1282 (index {index}) wrong wire type (expected Int or String)"
1283 ),
1284 })),
1285 None => Err(SdkError::from(ScriptError::Parse {
1286 fcall: "parse_usage_u64".into(),
1287 execution_id: None,
1288 message: format!(
1289 "ff_report_usage_and_check {sub_status}: {field_name} \
1290 (index {index}) missing from response"
1291 ),
1292 })),
1293 }
1294}
1295
1296/// Pure helper: decide whether `suspension:current` represents a
1297/// signal-driven resume for the currently-claimed attempt, and extract
1298/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1299/// (no record, stale prior-attempt, non-resumed close). Returns an error
1300/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1301/// bug rather than a missing-data case.
1302///
1303/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1304/// `EngineBackend::observe_signals`, which re-implements this invariant
1305/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1306/// tests so the parsing contract stays exercised at the SDK layer —
1307/// Stage 1d will consolidate (either promote the helper into ff-core
1308/// or drop these tests once the backend-side tests cover equivalent
1309/// ground).
1310#[allow(dead_code)]
1311fn resume_waitpoint_id_from_suspension(
1312 susp: &HashMap<String, String>,
1313 claimed_attempt: AttemptIndex,
1314) -> Result<Option<WaitpointId>, SdkError> {
1315 if susp.is_empty() {
1316 return Ok(None);
1317 }
1318 let susp_att: u32 = susp
1319 .get("attempt_index")
1320 .and_then(|s| s.parse().ok())
1321 .unwrap_or(u32::MAX);
1322 if susp_att != claimed_attempt.0 {
1323 return Ok(None);
1324 }
1325 let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1326 if close_reason != "resumed" {
1327 return Ok(None);
1328 }
1329 let wp_id_str = susp
1330 .get("waitpoint_id")
1331 .map(String::as_str)
1332 .unwrap_or_default();
1333 if wp_id_str.is_empty() {
1334 return Ok(None);
1335 }
1336 let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1337 SdkError::from(ScriptError::Parse {
1338 fcall: "resume_waitpoint_id_from_suspension".into(),
1339 execution_id: None,
1340 message: format!(
1341 "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1342 ),
1343 })
1344 })?;
1345 Ok(Some(waitpoint_id))
1346}
1347
1348pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1349 let arr = match raw {
1350 Value::Array(arr) => arr,
1351 _ => {
1352 return Err(SdkError::from(ScriptError::Parse {
1353 fcall: "parse_success_result".into(),
1354 execution_id: None,
1355 message: format!(
1356 "{function_name}: expected Array, got non-array"
1357 ),
1358 }));
1359 }
1360 };
1361
1362 if arr.is_empty() {
1363 return Err(SdkError::from(ScriptError::Parse {
1364 fcall: "parse_success_result".into(),
1365 execution_id: None,
1366 message: format!(
1367 "{function_name}: empty result array"
1368 ),
1369 }));
1370 }
1371
1372 let status_code = match arr.first() {
1373 Some(Ok(Value::Int(n))) => *n,
1374 _ => {
1375 return Err(SdkError::from(ScriptError::Parse {
1376 fcall: "parse_success_result".into(),
1377 execution_id: None,
1378 message: format!(
1379 "{function_name}: expected Int at index 0"
1380 ),
1381 }));
1382 }
1383 };
1384
1385 if status_code == 1 {
1386 Ok(())
1387 } else {
1388 // Extract error code from index 1 and optional detail from index 2
1389 // (e.g. `capability_mismatch` ships missing tokens there). Variants
1390 // that carry a String payload pick the detail up via
1391 // `from_code_with_detail`; other variants ignore it.
1392 let field_str = |idx: usize| -> String {
1393 arr.get(idx)
1394 .and_then(|v| match v {
1395 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1396 Ok(Value::SimpleString(s)) => Some(s.clone()),
1397 _ => None,
1398 })
1399 .unwrap_or_default()
1400 };
1401 let error_code = {
1402 let s = field_str(1);
1403 if s.is_empty() { "unknown".to_owned() } else { s }
1404 };
1405 // Collect all detail slots (idx >= 2). Most variants only read
1406 // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1407 // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1408 // reconciliation after a network drop.
1409 let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1410 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1411
1412 let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1413 .unwrap_or_else(|| {
1414 ScriptError::Parse {
1415 fcall: "parse_success_result".into(),
1416 execution_id: None,
1417 message: format!("{function_name}: unknown error: {error_code}"),
1418 }
1419 });
1420
1421 Err(SdkError::from(script_err))
1422 }
1423}
1424
1425/// Parse ff_suspend_execution result:
1426/// ok(suspension_id, waitpoint_id, waitpoint_key)
1427/// ok_already_satisfied(suspension_id, waitpoint_id, waitpoint_key)
1428fn parse_suspend_result(
1429 raw: &Value,
1430 suspension_id: SuspensionId,
1431 waitpoint_id: WaitpointId,
1432 waitpoint_key: String,
1433) -> Result<SuspendOutcome, SdkError> {
1434 let arr = match raw {
1435 Value::Array(arr) => arr,
1436 _ => {
1437 return Err(SdkError::from(ScriptError::Parse {
1438 fcall: "parse_suspend_result".into(),
1439 execution_id: None,
1440 message: "ff_suspend_execution: expected Array".into(),
1441 }));
1442 }
1443 };
1444
1445 let status_code = match arr.first() {
1446 Some(Ok(Value::Int(n))) => *n,
1447 _ => {
1448 return Err(SdkError::from(ScriptError::Parse {
1449 fcall: "parse_suspend_result".into(),
1450 execution_id: None,
1451 message: "ff_suspend_execution: bad status code".into(),
1452 }));
1453 }
1454 };
1455
1456 if status_code != 1 {
1457 let err_field_str = |idx: usize| -> String {
1458 arr.get(idx)
1459 .and_then(|v| match v {
1460 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1461 Ok(Value::SimpleString(s)) => Some(s.clone()),
1462 _ => None,
1463 })
1464 .unwrap_or_default()
1465 };
1466 let error_code = {
1467 let s = err_field_str(1);
1468 if s.is_empty() { "unknown".to_owned() } else { s }
1469 };
1470 let detail = err_field_str(2);
1471 return Err(SdkError::from(
1472 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1473 ScriptError::Parse {
1474 fcall: "parse_suspend_result".into(),
1475 execution_id: None,
1476 message: format!("ff_suspend_execution: {error_code}"),
1477 }
1478 }),
1479 ));
1480 }
1481
1482 // Check sub-status at index 1
1483 let sub_status = arr
1484 .get(1)
1485 .and_then(|v| match v {
1486 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1487 Ok(Value::SimpleString(s)) => Some(s.clone()),
1488 _ => None,
1489 })
1490 .unwrap_or_default();
1491
1492 // Lua returns: {1, status, suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
1493 // The suspension_id/waitpoint_id/waitpoint_key values the worker passed in are
1494 // authoritative (Lua echoes them); waitpoint_token however is MINTED by Lua and
1495 // must be read from the response.
1496 let waitpoint_token = arr
1497 .get(5)
1498 .and_then(|v| match v {
1499 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1500 Ok(Value::SimpleString(s)) => Some(s.clone()),
1501 _ => None,
1502 })
1503 .map(WaitpointToken::new)
1504 .ok_or_else(|| {
1505 SdkError::from(ScriptError::Parse {
1506 fcall: "parse_suspend_result".into(),
1507 execution_id: None,
1508 message: "ff_suspend_execution: missing waitpoint_token in response".into(),
1509 })
1510 })?;
1511
1512 if sub_status == "ALREADY_SATISFIED" {
1513 Ok(SuspendOutcome::AlreadySatisfied {
1514 suspension_id,
1515 waitpoint_id,
1516 waitpoint_key,
1517 waitpoint_token,
1518 })
1519 } else {
1520 Ok(SuspendOutcome::Suspended {
1521 suspension_id,
1522 waitpoint_id,
1523 waitpoint_key,
1524 waitpoint_token,
1525 })
1526 }
1527}
1528
1529/// Parse ff_deliver_signal result:
1530/// ok(signal_id, effect)
1531/// ok_duplicate(existing_signal_id)
1532pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1533 let arr = match raw {
1534 Value::Array(arr) => arr,
1535 _ => {
1536 return Err(SdkError::from(ScriptError::Parse {
1537 fcall: "parse_signal_result".into(),
1538 execution_id: None,
1539 message: "ff_deliver_signal: expected Array".into(),
1540 }));
1541 }
1542 };
1543
1544 let status_code = match arr.first() {
1545 Some(Ok(Value::Int(n))) => *n,
1546 _ => {
1547 return Err(SdkError::from(ScriptError::Parse {
1548 fcall: "parse_signal_result".into(),
1549 execution_id: None,
1550 message: "ff_deliver_signal: bad status code".into(),
1551 }));
1552 }
1553 };
1554
1555 if status_code != 1 {
1556 let err_field_str = |idx: usize| -> String {
1557 arr.get(idx)
1558 .and_then(|v| match v {
1559 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1560 Ok(Value::SimpleString(s)) => Some(s.clone()),
1561 _ => None,
1562 })
1563 .unwrap_or_default()
1564 };
1565 let error_code = {
1566 let s = err_field_str(1);
1567 if s.is_empty() { "unknown".to_owned() } else { s }
1568 };
1569 let detail = err_field_str(2);
1570 return Err(SdkError::from(
1571 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1572 ScriptError::Parse {
1573 fcall: "parse_signal_result".into(),
1574 execution_id: None,
1575 message: format!("ff_deliver_signal: {error_code}"),
1576 }
1577 }),
1578 ));
1579 }
1580
1581 let sub_status = arr
1582 .get(1)
1583 .and_then(|v| match v {
1584 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1585 Ok(Value::SimpleString(s)) => Some(s.clone()),
1586 _ => None,
1587 })
1588 .unwrap_or_default();
1589
1590 if sub_status == "DUPLICATE" {
1591 let existing_id = arr
1592 .get(2)
1593 .and_then(|v| match v {
1594 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1595 Ok(Value::SimpleString(s)) => Some(s.clone()),
1596 _ => None,
1597 })
1598 .unwrap_or_default();
1599 return Ok(SignalOutcome::Duplicate {
1600 existing_signal_id: existing_id,
1601 });
1602 }
1603
1604 // Parse: {1, "OK", signal_id, effect}
1605 let signal_id_str = arr
1606 .get(2)
1607 .and_then(|v| match v {
1608 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1609 Ok(Value::SimpleString(s)) => Some(s.clone()),
1610 _ => None,
1611 })
1612 .unwrap_or_default();
1613
1614 let effect = arr
1615 .get(3)
1616 .and_then(|v| match v {
1617 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1618 Ok(Value::SimpleString(s)) => Some(s.clone()),
1619 _ => None,
1620 })
1621 .unwrap_or_default();
1622
1623 let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1624 SdkError::from(ScriptError::Parse {
1625 fcall: "parse_signal_result".into(),
1626 execution_id: None,
1627 message: format!(
1628 "ff_deliver_signal: invalid signal_id from Lua: {e}"
1629 ),
1630 })
1631 })?;
1632
1633 if effect == "resume_condition_satisfied" {
1634 Ok(SignalOutcome::TriggeredResume { signal_id })
1635 } else {
1636 Ok(SignalOutcome::Accepted { signal_id, effect })
1637 }
1638}
1639
1640/// Parse ff_fail_execution result:
1641/// ok("retry_scheduled", delay_until)
1642/// ok("terminal_failed")
1643#[allow(dead_code)]
1644fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1645 let arr = match raw {
1646 Value::Array(arr) => arr,
1647 _ => {
1648 return Err(SdkError::from(ScriptError::Parse {
1649 fcall: "parse_fail_result".into(),
1650 execution_id: None,
1651 message: "ff_fail_execution: expected Array".into(),
1652 }));
1653 }
1654 };
1655
1656 let status_code = match arr.first() {
1657 Some(Ok(Value::Int(n))) => *n,
1658 _ => {
1659 return Err(SdkError::from(ScriptError::Parse {
1660 fcall: "parse_fail_result".into(),
1661 execution_id: None,
1662 message: "ff_fail_execution: bad status code".into(),
1663 }));
1664 }
1665 };
1666
1667 if status_code != 1 {
1668 let err_field_str = |idx: usize| -> String {
1669 arr.get(idx)
1670 .and_then(|v| match v {
1671 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1672 Ok(Value::SimpleString(s)) => Some(s.clone()),
1673 _ => None,
1674 })
1675 .unwrap_or_default()
1676 };
1677 let error_code = {
1678 let s = err_field_str(1);
1679 if s.is_empty() { "unknown".to_owned() } else { s }
1680 };
1681 let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1682 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1683 return Err(SdkError::from(
1684 ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1685 ScriptError::Parse {
1686 fcall: "parse_fail_result".into(),
1687 execution_id: None,
1688 message: format!("ff_fail_execution: {error_code}"),
1689 }
1690 }),
1691 ));
1692 }
1693
1694 // Parse sub-status from field[2] (index 2 = first field after status+OK)
1695 let sub_status = arr
1696 .get(2)
1697 .and_then(|v| match v {
1698 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1699 Ok(Value::SimpleString(s)) => Some(s.clone()),
1700 _ => None,
1701 })
1702 .unwrap_or_default();
1703
1704 match sub_status.as_str() {
1705 "retry_scheduled" => {
1706 // Lua returns: ok("retry_scheduled", tostring(delay_until))
1707 // arr[3] = delay_until
1708 let delay_str = arr
1709 .get(3)
1710 .and_then(|v| match v {
1711 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1712 Ok(Value::Int(n)) => Some(n.to_string()),
1713 _ => None,
1714 })
1715 .unwrap_or_default();
1716 let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1717
1718 Ok(FailOutcome::RetryScheduled {
1719 delay_until: TimestampMs::from_millis(delay_until),
1720 })
1721 }
1722 "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1723 _ => Err(SdkError::from(ScriptError::Parse {
1724 fcall: "parse_fail_result".into(),
1725 execution_id: None,
1726 message: format!(
1727 "ff_fail_execution: unexpected sub-status: {sub_status}"
1728 ),
1729 })),
1730 }
1731}
1732
1733// ── Stream read / tail (consumer API, RFC-006 #2) ──
1734
1735/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1736/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1737/// server would accept.
1738pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1739
1740/// Maximum frames per read/tail call. Mirrors
1741/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1742/// callers don't need to import ff-core just to read the bound.
1743pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1744
1745/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1746/// signal so polling consumers can exit cleanly.
1747///
1748/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1749pub use ff_core::contracts::StreamFrames;
1750
1751/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1752/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1753/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1754/// `StreamCursor::Start` / `StreamCursor::End` instead.
1755pub use ff_core::contracts::StreamCursor;
1756
1757/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1758/// — XREAD does not accept the open markers. Pulled out as a bare
1759/// function so unit tests can exercise the guard without constructing a
1760/// live `ferriskey::Client`.
1761fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1762 if !after.is_concrete() {
1763 return Err(SdkError::Config {
1764 context: "tail_stream".into(),
1765 field: Some("after".into()),
1766 message: "XREAD cursor must be a concrete entry id; pass \
1767 StreamCursor::from_beginning() to start from the \
1768 beginning"
1769 .into(),
1770 });
1771 }
1772 Ok(())
1773}
1774
1775fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1776 if count_limit == 0 {
1777 return Err(SdkError::Config {
1778 context: "read_stream_frames".into(),
1779 field: Some("count_limit".into()),
1780 message: "count_limit must be >= 1".into(),
1781 });
1782 }
1783 if count_limit > STREAM_READ_HARD_CAP {
1784 return Err(SdkError::Config {
1785 context: "read_stream_frames".into(),
1786 field: Some("count_limit".into()),
1787 message: format!(
1788 "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1789 ),
1790 });
1791 }
1792 Ok(())
1793}
1794
1795/// Read frames from a completed or in-flight attempt's stream.
1796///
1797/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1798/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1799/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1800/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1801/// `0` returns [`SdkError::Config`].
1802///
1803/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1804/// consumers know when the producer has finalized the stream. A
1805/// never-written attempt and an in-progress stream are indistinguishable
1806/// here — both present as `frames=[]`, `closed_at=None`.
1807///
1808/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1809/// client but are not the lease-holding worker — no lease check is
1810/// performed.
1811///
1812/// # Head-of-line note
1813///
1814/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1815/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1816/// calling this on a `client` that is also serving FCALLs stalls those
1817/// FCALLs behind the reply. The REST server isolates reads on its
1818/// `tail_client`; direct SDK callers should either use a dedicated
1819/// client OR paginate through smaller `count_limit` slices.
1820pub async fn read_stream(
1821 backend: &dyn EngineBackend,
1822 execution_id: &ExecutionId,
1823 attempt_index: AttemptIndex,
1824 from: StreamCursor,
1825 to: StreamCursor,
1826 count_limit: u64,
1827) -> Result<StreamFrames, SdkError> {
1828 validate_stream_read_count(count_limit)?;
1829 Ok(backend
1830 .read_stream(execution_id, attempt_index, from, to, count_limit)
1831 .await?)
1832}
1833
1834/// Tail a live attempt's stream.
1835///
1836/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1837/// with id strictly greater than `after`. Pass
1838/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1839/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1840/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1841/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1842///
1843/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1844/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1845/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1846/// SDK and REST ceilings aligned.
1847///
1848/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1849/// polling consumers should loop until `result.is_closed()` is true, then
1850/// drain and exit. Timeout with no new frames presents as
1851/// `frames=[], closed_at=None`.
1852///
1853/// # Head-of-line warning — use a dedicated client
1854///
1855/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1856/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1857/// the read side until a frame arrives or the block elapses. If the
1858/// `client` you pass here is ALSO used for claims, completes, fails,
1859/// appends, or any other FCALL, a 30-second tail will stall all those
1860/// calls for up to 30 seconds.
1861///
1862/// **Strongly recommended**: build a separate `ferriskey::Client` for
1863/// tail callers — mirrors the `Server::tail_client` split that the REST
1864/// server uses internally (see `crates/ff-server/src/server.rs` and
1865/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1866///
1867/// # Tail parallelism caveat (same mux)
1868///
1869/// Even a dedicated tail client is still one multiplexed TCP connection.
1870/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1871/// ferriskey's per-call `request_timeout` starts at future-poll — so
1872/// two concurrent tails against the same client can time out spuriously:
1873/// the second call's BLOCK budget elapses while it waits for the first
1874/// BLOCK to return. The REST server handles this internally with a
1875/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1876/// each call its full `block_ms` budget at the server.
1877///
1878/// **Direct SDK callers that need concurrent tails**: either
1879/// (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1880/// pool of clients, rotated by the caller), OR
1881/// (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1882/// only one BLOCK is in flight per client at a time.
1883/// If you need the REST-side backpressure (429 on contention) and the
1884/// built-in serializer, go through the
1885/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1886/// than calling this directly.
1887///
1888/// This SDK does not enforce either pattern — the mutex belongs at the
1889/// application layer, and the connection pool belongs at the SDK
1890/// caller's DI layer; neither has a structured place inside this
1891/// helper.
1892///
1893/// # Timeout handling
1894///
1895/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1896/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1897/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1898/// 500ms)`, overriding the client's default per-call. A tail with
1899/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1900/// the client was built with a shorter `request_timeout`. No custom client
1901/// configuration is required for timeout reasons — only for head-of-line
1902/// isolation above.
1903pub async fn tail_stream(
1904 backend: &dyn EngineBackend,
1905 execution_id: &ExecutionId,
1906 attempt_index: AttemptIndex,
1907 after: StreamCursor,
1908 block_ms: u64,
1909 count_limit: u64,
1910) -> Result<StreamFrames, SdkError> {
1911 if block_ms > MAX_TAIL_BLOCK_MS {
1912 return Err(SdkError::Config {
1913 context: "tail_stream".into(),
1914 field: Some("block_ms".into()),
1915 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1916 });
1917 }
1918 validate_stream_read_count(count_limit)?;
1919 // XREAD does not accept `-` / `+` markers as cursors — reject at
1920 // the SDK boundary with `SdkError::Config` rather than forwarding
1921 // an invalid `-`/`+` into the backend (which would surface as an
1922 // opaque `EngineError::Transport`).
1923 validate_tail_cursor(&after)?;
1924
1925 Ok(backend
1926 .tail_stream(execution_id, attempt_index, after, block_ms, count_limit)
1927 .await?)
1928}
1929
1930#[cfg(test)]
1931mod tail_stream_boundary_tests {
1932 use super::*;
1933
1934 // `validate_tail_cursor` rejects `StreamCursor::Start` and
1935 // `StreamCursor::End` before `tail_stream` touches the client —
1936 // same shape as the `count_limit` guard above. The matching
1937 // full-path rejection on the REST layer is covered by
1938 // `ff-server::api`.
1939
1940 #[test]
1941 fn rejects_start_cursor() {
1942 let err = validate_tail_cursor(&StreamCursor::Start)
1943 .expect_err("Start must be rejected");
1944 match err {
1945 SdkError::Config { field, context, .. } => {
1946 assert_eq!(field.as_deref(), Some("after"));
1947 assert_eq!(context, "tail_stream");
1948 }
1949 other => panic!("expected SdkError::Config, got {other:?}"),
1950 }
1951 }
1952
1953 #[test]
1954 fn rejects_end_cursor() {
1955 let err = validate_tail_cursor(&StreamCursor::End)
1956 .expect_err("End must be rejected");
1957 assert!(matches!(err, SdkError::Config { .. }));
1958 }
1959
1960 #[test]
1961 fn accepts_at_cursor() {
1962 validate_tail_cursor(&StreamCursor::At("0-0".into()))
1963 .expect("At cursor must be accepted");
1964 validate_tail_cursor(&StreamCursor::from_beginning())
1965 .expect("from_beginning() must be accepted");
1966 validate_tail_cursor(&StreamCursor::At("123-0".into()))
1967 .expect("concrete id must be accepted");
1968 }
1969}
1970
1971#[cfg(test)]
1972mod parse_report_usage_result_tests {
1973 use super::*;
1974
1975 /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
1976 /// BulkString and SimpleString uniformly (see
1977 /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
1978 /// `Value::SimpleString(s)` → clone). SimpleString avoids a
1979 /// dev-dependency on `bytes` just for test construction.
1980 fn s(v: &str) -> Result<Value, ferriskey::Error> {
1981 Ok(Value::SimpleString(v.to_owned()))
1982 }
1983
1984 fn int(n: i64) -> Result<Value, ferriskey::Error> {
1985 Ok(Value::Int(n))
1986 }
1987
1988 fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
1989 Value::Array(items)
1990 }
1991
1992 #[test]
1993 fn ok_status() {
1994 let raw = arr(vec![int(1), s("OK")]);
1995 assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
1996 }
1997
1998 #[test]
1999 fn already_applied_status() {
2000 let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2001 assert_eq!(
2002 parse_report_usage_result(&raw).unwrap(),
2003 ReportUsageResult::AlreadyApplied
2004 );
2005 }
2006
2007 #[test]
2008 fn soft_breach_status() {
2009 let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2010 match parse_report_usage_result(&raw).unwrap() {
2011 ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2012 assert_eq!(dimension, "tokens");
2013 assert_eq!(current_usage, 150);
2014 assert_eq!(soft_limit, 100);
2015 }
2016 other => panic!("expected SoftBreach, got {other:?}"),
2017 }
2018 }
2019
2020 #[test]
2021 fn hard_breach_status() {
2022 let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2023 match parse_report_usage_result(&raw).unwrap() {
2024 ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2025 assert_eq!(dimension, "requests");
2026 assert_eq!(current_usage, 10001);
2027 assert_eq!(hard_limit, 10000);
2028 }
2029 other => panic!("expected HardBreach, got {other:?}"),
2030 }
2031 }
2032
2033 /// Negative case: non-Array input. Guards against a future Lua refactor
2034 /// that accidentally returns a bare string/int — the parser must fail
2035 /// loudly rather than silently succeed or panic.
2036 #[test]
2037 fn non_array_input_is_parse_error() {
2038 let raw = Value::SimpleString("OK".to_owned());
2039 let err = parse_report_usage_result(&raw).unwrap_err();
2040 let msg = format!("{err}");
2041 assert!(
2042 msg.to_lowercase().contains("expected array"),
2043 "error should mention expected shape, got: {msg}"
2044 );
2045 }
2046
2047 /// Negative case: Array whose first element isn't an Int status code.
2048 /// The Lua function's first return slot is always `status_code` (1 on
2049 /// success, an error code otherwise); a non-Int there is a wire-format
2050 /// break that must surface as a parse error.
2051 #[test]
2052 fn first_element_non_int_is_parse_error() {
2053 let raw = arr(vec![s("not_an_int"), s("OK")]);
2054 let err = parse_report_usage_result(&raw).unwrap_err();
2055 let msg = format!("{err}");
2056 assert!(
2057 msg.to_lowercase().contains("int"),
2058 "error should mention Int status code, got: {msg}"
2059 );
2060 }
2061
2062 /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2063 /// field. Guards against the silent-coercion defect cross-review
2064 /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2065 /// which would have surfaced Lua-side wire-format drift as a
2066 /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2067 /// but semantically wrong (a "breach with zero usage" is nonsense
2068 /// and masks the real error).
2069 #[test]
2070 fn soft_breach_non_numeric_current_is_parse_error() {
2071 let raw = arr(vec![
2072 int(1),
2073 s("SOFT_BREACH"),
2074 s("tokens"),
2075 s("not_a_number"), // current_usage — must fail, not coerce to 0
2076 s("100"),
2077 ]);
2078 let err = parse_report_usage_result(&raw).unwrap_err();
2079 let msg = format!("{err}");
2080 assert!(
2081 msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2082 "error should identify sub-status + field, got: {msg}"
2083 );
2084 assert!(
2085 msg.to_lowercase().contains("u64"),
2086 "error should mention the expected type (u64), got: {msg}"
2087 );
2088 }
2089
2090 /// Negative case: HARD_BREACH with the limit slot missing
2091 /// entirely. Same defence as the non-numeric test above: a
2092 /// truncated response must fail loudly rather than coerce to 0.
2093 #[test]
2094 fn hard_breach_missing_limit_is_parse_error() {
2095 let raw = arr(vec![
2096 int(1),
2097 s("HARD_BREACH"),
2098 s("requests"),
2099 s("10001"),
2100 // no index 4 — hard_limit missing
2101 ]);
2102 let err = parse_report_usage_result(&raw).unwrap_err();
2103 let msg = format!("{err}");
2104 assert!(
2105 msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2106 "error should identify sub-status + field, got: {msg}"
2107 );
2108 assert!(
2109 msg.to_lowercase().contains("missing"),
2110 "error should say 'missing', got: {msg}"
2111 );
2112 }
2113}
2114
2115
2116#[cfg(test)]
2117mod resume_signals_tests {
2118 use super::*;
2119
2120 fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2121 pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2122 }
2123
2124 #[test]
2125 fn empty_suspension_returns_none() {
2126 let susp = m(&[]);
2127 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2128 assert!(out.is_none(), "no suspension record → None");
2129 }
2130
2131 #[test]
2132 fn stale_prior_attempt_returns_none() {
2133 let wp = WaitpointId::new();
2134 let susp = m(&[
2135 ("attempt_index", "0"),
2136 ("close_reason", "resumed"),
2137 ("waitpoint_id", &wp.to_string()),
2138 ]);
2139 // Claimed attempt is 1; suspension belongs to 0 → stale.
2140 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2141 assert!(out.is_none(), "attempt_index mismatch → None");
2142 }
2143
2144 #[test]
2145 fn non_resumed_close_returns_none() {
2146 let wp = WaitpointId::new();
2147 for reason in ["timeout", "cancelled", "", "expired"] {
2148 let susp = m(&[
2149 ("attempt_index", "0"),
2150 ("close_reason", reason),
2151 ("waitpoint_id", &wp.to_string()),
2152 ]);
2153 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2154 assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2155 }
2156 }
2157
2158 #[test]
2159 fn resumed_same_attempt_returns_waitpoint() {
2160 let wp = WaitpointId::new();
2161 let susp = m(&[
2162 ("attempt_index", "2"),
2163 ("close_reason", "resumed"),
2164 ("waitpoint_id", &wp.to_string()),
2165 ]);
2166 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2167 assert_eq!(out, Some(wp));
2168 }
2169
2170 #[test]
2171 fn malformed_waitpoint_id_is_error() {
2172 let susp = m(&[
2173 ("attempt_index", "0"),
2174 ("close_reason", "resumed"),
2175 ("waitpoint_id", "not-a-uuid"),
2176 ]);
2177 let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2178 assert!(
2179 format!("{err}").contains("not a valid UUID"),
2180 "error should mention invalid UUID, got: {err}"
2181 );
2182 }
2183
2184 #[test]
2185 fn empty_waitpoint_id_returns_none() {
2186 // Defensive: an empty waitpoint_id field (shouldn't happen on
2187 // resumed records, but guard against partial writes) is None, not an error.
2188 let susp = m(&[
2189 ("attempt_index", "0"),
2190 ("close_reason", "resumed"),
2191 ("waitpoint_id", ""),
2192 ]);
2193 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2194 assert!(out.is_none());
2195 }
2196
2197 // The previous `matched_signal_ids_from_condition` helper was
2198 // removed when the production path switched from unbounded
2199 // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2200 // feedback on unbounded condition-hash reply size). That loop
2201 // is exercised by the integration tests in `ff-test`.
2202}
2203
2204#[cfg(test)]
2205mod terminal_replay_parsing_tests {
2206 //! Unit tests for the SDK's parse path of the enriched
2207 //! `execution_not_active` error returned on a terminal-op replay.
2208 //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2209 //! the Lua side emits the 4-slot detail; these tests prove the
2210 //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2211 //! variant so the reconciler in complete()/fail()/cancel() can
2212 //! match on them.
2213
2214 use super::*;
2215 use ferriskey::Value;
2216
2217 // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2218 // in the test harness. parse_success_result + parse_fail_result handle both.
2219 fn bulk(s: &str) -> Value {
2220 Value::SimpleString(s.to_owned())
2221 }
2222
2223 /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2224 #[test]
2225 fn parse_success_result_extracts_all_four_detail_slots() {
2226 let raw = Value::Array(vec![
2227 Ok(Value::Int(0)),
2228 Ok(bulk("execution_not_active")),
2229 Ok(bulk("success")),
2230 Ok(bulk("42")),
2231 Ok(bulk("terminal")),
2232 Ok(bulk("11111111-1111-1111-1111-111111111111")),
2233 ]);
2234 let err = parse_success_result(&raw, "test").unwrap_err();
2235 let unboxed = match err {
2236 SdkError::Engine(b) => *b,
2237 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2238 };
2239 match unboxed {
2240 crate::EngineError::Contention(
2241 crate::ContentionKind::ExecutionNotActive {
2242 terminal_outcome,
2243 lease_epoch,
2244 lifecycle_phase,
2245 attempt_id,
2246 },
2247 ) => {
2248 assert_eq!(terminal_outcome, "success");
2249 assert_eq!(lease_epoch, "42");
2250 assert_eq!(lifecycle_phase, "terminal");
2251 assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2252 }
2253 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2254 }
2255 }
2256
2257 /// parse_fail_result must extract all detail slots too so the
2258 /// reconciler in fail() can match lifecycle_phase = "runnable" for
2259 /// retry-scheduled replays.
2260 #[test]
2261 fn parse_fail_result_extracts_all_four_detail_slots() {
2262 let raw = Value::Array(vec![
2263 Ok(Value::Int(0)),
2264 Ok(bulk("execution_not_active")),
2265 Ok(bulk("none")),
2266 Ok(bulk("7")),
2267 Ok(bulk("runnable")),
2268 Ok(bulk("22222222-2222-2222-2222-222222222222")),
2269 ]);
2270 let err = parse_fail_result(&raw).unwrap_err();
2271 let unboxed = match err {
2272 SdkError::Engine(b) => *b,
2273 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2274 };
2275 match unboxed {
2276 crate::EngineError::Contention(
2277 crate::ContentionKind::ExecutionNotActive {
2278 terminal_outcome,
2279 lease_epoch,
2280 lifecycle_phase,
2281 attempt_id,
2282 },
2283 ) => {
2284 assert_eq!(terminal_outcome, "none");
2285 assert_eq!(lease_epoch, "7");
2286 assert_eq!(lifecycle_phase, "runnable");
2287 assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2288 }
2289 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2290 }
2291 }
2292
2293 /// Empty detail slots must default to "" (not panic) so older-Lua
2294 /// producers or malformed replies degrade to an unreconcilable
2295 /// variant rather than a Parse error.
2296 #[test]
2297 fn parse_success_result_missing_slots_defaults_to_empty() {
2298 let raw = Value::Array(vec![
2299 Ok(Value::Int(0)),
2300 Ok(bulk("execution_not_active")),
2301 ]);
2302 let err = parse_success_result(&raw, "test").unwrap_err();
2303 let unboxed = match err {
2304 SdkError::Engine(b) => *b,
2305 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2306 };
2307 match unboxed {
2308 crate::EngineError::Contention(
2309 crate::ContentionKind::ExecutionNotActive {
2310 terminal_outcome,
2311 lease_epoch,
2312 lifecycle_phase,
2313 attempt_id,
2314 },
2315 ) => {
2316 assert_eq!(terminal_outcome, "");
2317 assert_eq!(lease_epoch, "");
2318 assert_eq!(lifecycle_phase, "");
2319 assert_eq!(attempt_id, "");
2320 }
2321 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2322 }
2323 }
2324}