ff_sdk/task.rs
1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_script::error::ScriptError;
11use ff_core::keys::{ExecKeyContext, IndexKeys};
12use ff_core::partition::{execution_partition, PartitionConfig};
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21/// Timeout behavior when a suspension deadline is reached.
22#[derive(Clone, Debug, PartialEq, Eq)]
23pub enum TimeoutBehavior {
24 Fail,
25 Cancel,
26 Expire,
27 AutoResume,
28 Escalate,
29}
30
31impl TimeoutBehavior {
32 pub fn as_str(&self) -> &str {
33 match self {
34 Self::Fail => "fail",
35 Self::Cancel => "cancel",
36 Self::Expire => "expire",
37 Self::AutoResume => "auto_resume_with_timeout_signal",
38 Self::Escalate => "escalate",
39 }
40 }
41}
42
43impl std::fmt::Display for TimeoutBehavior {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.write_str(self.as_str())
46 }
47}
48
49impl std::str::FromStr for TimeoutBehavior {
50 type Err = String;
51
52 fn from_str(s: &str) -> Result<Self, Self::Err> {
53 match s {
54 "fail" => Ok(Self::Fail),
55 "cancel" => Ok(Self::Cancel),
56 "expire" => Ok(Self::Expire),
57 "auto_resume_with_timeout_signal" | "auto_resume" => Ok(Self::AutoResume),
58 "escalate" => Ok(Self::Escalate),
59 other => Err(format!("unknown timeout behavior: {other}")),
60 }
61 }
62}
63
64/// A condition matcher for the resume condition.
65#[derive(Clone, Debug)]
66pub struct ConditionMatcher {
67 /// Signal name to match (empty = wildcard).
68 pub signal_name: String,
69}
70
71/// Outcome of a `suspend()` call.
72#[derive(Clone, Debug, PartialEq, Eq)]
73pub enum SuspendOutcome {
74 /// Execution is now suspended, waitpoint active.
75 Suspended {
76 suspension_id: SuspensionId,
77 waitpoint_id: WaitpointId,
78 waitpoint_key: String,
79 /// HMAC token that signal deliverers must supply to this waitpoint.
80 waitpoint_token: WaitpointToken,
81 },
82 /// Buffered signals already satisfied the condition — suspension skipped.
83 /// The caller still holds the lease.
84 AlreadySatisfied {
85 suspension_id: SuspensionId,
86 waitpoint_id: WaitpointId,
87 waitpoint_key: String,
88 waitpoint_token: WaitpointToken,
89 },
90}
91
92/// A signal to deliver to a suspended execution's waitpoint.
93#[derive(Clone, Debug)]
94pub struct Signal {
95 pub signal_name: String,
96 pub signal_category: String,
97 pub payload: Option<Vec<u8>>,
98 pub source_type: String,
99 pub source_identity: String,
100 pub idempotency_key: Option<String>,
101 /// HMAC token issued when the waitpoint was created. Required for
102 /// authenticated signal delivery (RFC-004 §Waitpoint Security).
103 pub waitpoint_token: WaitpointToken,
104}
105
106/// Outcome of `deliver_signal()`.
107#[derive(Clone, Debug, PartialEq, Eq)]
108pub enum SignalOutcome {
109 /// Signal accepted, appended to waitpoint but condition not yet satisfied.
110 Accepted { signal_id: SignalId, effect: String },
111 /// Signal triggered resume — execution is now runnable.
112 TriggeredResume { signal_id: SignalId },
113 /// Duplicate signal (idempotency key matched).
114 Duplicate { existing_signal_id: String },
115}
116
117impl SignalOutcome {
118 /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
119 ///
120 /// Consuming packages that call `ff_deliver_signal` directly can use this
121 /// to interpret the Lua return value without depending on SDK internals.
122 pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
123 parse_signal_result(raw)
124 }
125}
126
127/// A signal that triggered a resume, readable by a worker after re-claim.
128///
129/// **RFC-012 Stage 0:** the canonical definition has moved to
130/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
131/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
132/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
133/// scheduled for removal in 0.5.0.
134pub use ff_core::backend::ResumeSignal;
135
136/// Outcome of `append_frame()`.
137///
138/// **RFC-012 §R7.2.1:** canonical definition moved to
139/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
140/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
141/// 0.4.x window. Existing consumers construct + match exhaustively
142/// without change. The derive set widened from `Clone, Debug` to
143/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
144pub use ff_core::backend::AppendFrameOutcome;
145
146/// Outcome of a `fail()` call.
147///
148/// **RFC-012 Stage 1a:** canonical definition moved to
149/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
150/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
151/// re-export in `lib.rs`) through the 0.4.x window. Existing
152/// consumers construct + match exhaustively without change.
153pub use ff_core::backend::FailOutcome;
154
155/// A claimed execution with an active lease. The worker processes this task
156/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
157///
158/// The lease is automatically renewed in the background at `lease_ttl / 3`
159/// intervals. Renewal stops when the task is consumed or dropped.
160///
161/// `complete`, `fail`, and `cancel` consume `self` — this prevents
162/// double-complete bugs at the type level.
163pub struct ClaimedTask {
164 /// Shared Valkey client.
165 client: Client,
166 /// `EngineBackend` the trait-migrated ops forward through.
167 ///
168 /// **RFC-012 Stage 1b + Round-7.** Today this is always a
169 /// `ValkeyBackend` wrapping `client`. Most per-task ops
170 /// (`complete`/`fail`/`cancel`/`delay_execution`/
171 /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
172 /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
173 /// through `backend.*()`. `suspend` still uses
174 /// `client.fcall("ff_suspend_execution", ...)` directly — this is
175 /// the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
176 /// input-shape work. Lease renewal goes through
177 /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
178 /// trait-shape gaps tracked by #117.
179 ///
180 /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
181 /// deliver_signal) through the same trait surface; Stage 1d
182 /// refactors this struct to carry a single `Handle` rather than
183 /// synthesising one per op via `synth_handle`.
184 backend: Arc<dyn EngineBackend>,
185 /// Partition config for key construction.
186 partition_config: PartitionConfig,
187 /// Execution identity.
188 execution_id: ExecutionId,
189 /// Current attempt.
190 attempt_index: AttemptIndex,
191 attempt_id: AttemptId,
192 /// Lease identity.
193 lease_id: LeaseId,
194 lease_epoch: LeaseEpoch,
195 /// Lease timing.
196 lease_ttl_ms: u64,
197 /// Lane used at claim time.
198 lane_id: LaneId,
199 /// Worker instance that holds this lease (for index cleanup keys).
200 worker_instance_id: WorkerInstanceId,
201 /// Execution data.
202 input_payload: Vec<u8>,
203 execution_kind: String,
204 tags: HashMap<String, String>,
205 /// Background renewal task handle.
206 renewal_handle: JoinHandle<()>,
207 /// Signal to stop renewal (used before consuming self).
208 renewal_stop: Arc<Notify>,
209 /// Consecutive lease renewal failures. Shared with the background renewal
210 /// task. Reset to 0 on each successful renewal. Workers should check
211 /// `is_lease_healthy()` before committing expensive side effects.
212 renewal_failures: Arc<AtomicU32>,
213 /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
214 /// response is received, just before `self` is consumed into `Drop`.
215 /// `Drop` reads this instead of `renewal_handle.is_finished()` to
216 /// suppress the false-positive "dropped without terminal operation"
217 /// warning: after `notify_one`, the renewal task has not yet been
218 /// polled by the runtime, so `is_finished()` is still `false` on the
219 /// happy path when self is being consumed.
220 ///
221 /// Note: the flag is set for any terminal-op path that reaches
222 /// `stop_renewal()`, which includes Lua-level script errors (the
223 /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
224 /// the caller already receives the `Err` via the op's return value,
225 /// so an additional `Drop` warning would be noise. The warning is
226 /// reserved for genuine drop-without-terminal-op cases (panic, early
227 /// return, transport failure before stop_renewal ran).
228 terminal_op_called: AtomicBool,
229 /// Concurrency permit from the worker's semaphore. Held for the lifetime
230 /// of the task; released on complete/fail/cancel/drop.
231 _concurrency_permit: Option<OwnedSemaphorePermit>,
232}
233
234impl ClaimedTask {
235 /// Construct a `ClaimedTask` from the results of a successful
236 /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
237 ///
238 /// # Arguments
239 ///
240 /// * `client` — shared Valkey client used for subsequent lease
241 /// renewals, signal delivery, and the final
242 /// complete/fail/cancel FCALL.
243 /// * `partition_config` — partition topology snapshot read at
244 /// `FlowFabricWorker::connect`. Used for key construction on
245 /// the lifetime of this task.
246 /// * `execution_id` — the claimed execution's UUID.
247 /// * `attempt_index` / `attempt_id` — current attempt identity.
248 /// `attempt_index` is 0 on a fresh claim, preserved on a
249 /// resumed claim.
250 /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
251 /// is returned by the Lua FCALL and bumped on each resumed
252 /// claim.
253 /// * `lease_ttl_ms` — lease TTL used to schedule the background
254 /// renewal task (renews at `lease_ttl_ms / 3`).
255 /// * `lane_id` — the lane the task was claimed on. Used for
256 /// index key construction (`lane_active`, `lease_expiry`,
257 /// etc.).
258 /// * `worker_instance_id` — this worker's identity. Used to
259 /// resolve `worker_leases` index entries during renewal and
260 /// completion.
261 /// * `input_payload` / `execution_kind` / `tags` — pre-read
262 /// execution metadata, exposed via getters so the worker
263 /// doesn't round-trip to Valkey to inspect what it just
264 /// claimed.
265 ///
266 /// # Invariant: constructor is `pub(crate)` on purpose
267 ///
268 /// **External callers cannot construct a `ClaimedTask`** — only
269 /// the in-crate claim entry points (`claim_next`,
270 /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
271 /// load-bearing: those entry points are the ONLY sites that
272 /// acquire a permit from the worker's concurrency semaphore
273 /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
274 /// it through `ClaimedTask::set_concurrency_permit` before
275 /// returning the task. Promoting `new` to `pub` would let
276 /// consumers build tasks that bypass the concurrency contract
277 /// the worker's `max_concurrent_tasks` config advertises — the
278 /// returned task would run alongside other in-flight work
279 /// without debiting the permit bank, and the
280 /// complete/fail/cancel/drop path would have nothing to
281 /// release.
282 ///
283 /// If an external callsite genuinely needs to rehydrate a
284 /// task from saved FCALL results, the right answer is a new
285 /// `FlowFabricWorker` entry point that wraps `new` + acquires a
286 /// permit — not promoting this constructor.
287 #[allow(clippy::too_many_arguments)]
288 pub(crate) fn new(
289 client: Client,
290 backend: Arc<dyn EngineBackend>,
291 partition_config: PartitionConfig,
292 execution_id: ExecutionId,
293 attempt_index: AttemptIndex,
294 attempt_id: AttemptId,
295 lease_id: LeaseId,
296 lease_epoch: LeaseEpoch,
297 lease_ttl_ms: u64,
298 lane_id: LaneId,
299 worker_instance_id: WorkerInstanceId,
300 input_payload: Vec<u8>,
301 execution_kind: String,
302 tags: HashMap<String, String>,
303 ) -> Self {
304 let renewal_stop = Arc::new(Notify::new());
305 let renewal_failures = Arc::new(AtomicU32::new(0));
306
307 // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
308 // Build the handle once at construction time and hand both Arc clones
309 // into the spawned task.
310 let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
311 execution_id.clone(),
312 attempt_index,
313 attempt_id.clone(),
314 lease_id.clone(),
315 lease_epoch,
316 lease_ttl_ms,
317 lane_id.clone(),
318 worker_instance_id.clone(),
319 HandleKind::Fresh,
320 );
321 let renewal_handle = spawn_renewal_task(
322 backend.clone(),
323 renewal_handle_cookie,
324 execution_id.clone(),
325 lease_ttl_ms,
326 renewal_stop.clone(),
327 renewal_failures.clone(),
328 );
329
330 Self {
331 client,
332 backend,
333 partition_config,
334 execution_id,
335 attempt_index,
336 attempt_id,
337 lease_id,
338 lease_epoch,
339 lease_ttl_ms,
340 lane_id,
341 worker_instance_id,
342 input_payload,
343 execution_kind,
344 tags,
345 renewal_handle,
346 renewal_stop,
347 renewal_failures,
348 terminal_op_called: AtomicBool::new(false),
349 _concurrency_permit: None,
350 }
351 }
352
353 /// Attach a concurrency permit from the worker's semaphore.
354 /// The permit is held for the task's lifetime and released on drop.
355 #[allow(dead_code)]
356 pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
357 self._concurrency_permit = Some(permit);
358 }
359
360 // ── Accessors ──
361
362 pub fn execution_id(&self) -> &ExecutionId {
363 &self.execution_id
364 }
365
366 pub fn attempt_index(&self) -> AttemptIndex {
367 self.attempt_index
368 }
369
370 pub fn attempt_id(&self) -> &AttemptId {
371 &self.attempt_id
372 }
373
374 pub fn lease_id(&self) -> &LeaseId {
375 &self.lease_id
376 }
377
378 pub fn lease_epoch(&self) -> LeaseEpoch {
379 self.lease_epoch
380 }
381
382 pub fn input_payload(&self) -> &[u8] {
383 &self.input_payload
384 }
385
386 pub fn execution_kind(&self) -> &str {
387 &self.execution_kind
388 }
389
390 pub fn tags(&self) -> &HashMap<String, String> {
391 &self.tags
392 }
393
394 pub fn lane_id(&self) -> &LaneId {
395 &self.lane_id
396 }
397
398 /// Read frames from this task's attempt stream.
399 ///
400 /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
401 /// no longer need the `ferriskey::Client` leak (#87). See
402 /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
403 /// for the backend-level contract.
404 ///
405 /// Validation (count_limit bounds) runs at this boundary — out-of-range
406 /// input surfaces as [`SdkError::Config`] before reaching the backend.
407 pub async fn read_stream(
408 &self,
409 from: StreamCursor,
410 to: StreamCursor,
411 count_limit: u64,
412 ) -> Result<StreamFrames, SdkError> {
413 validate_stream_read_count(count_limit)?;
414 Ok(self
415 .backend
416 .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
417 .await?)
418 }
419
420 /// Tail this task's attempt stream for new frames.
421 ///
422 /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
423 /// for the head-of-line warning — the task's backend is shared
424 /// with claim/complete/fail hot paths. Consumers that need
425 /// isolation should build a dedicated `EngineBackend` for tail
426 /// reads.
427 pub async fn tail_stream(
428 &self,
429 after: StreamCursor,
430 block_ms: u64,
431 count_limit: u64,
432 ) -> Result<StreamFrames, SdkError> {
433 if block_ms > MAX_TAIL_BLOCK_MS {
434 return Err(SdkError::Config {
435 context: "tail_stream".into(),
436 field: Some("block_ms".into()),
437 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
438 });
439 }
440 validate_stream_read_count(count_limit)?;
441 validate_tail_cursor(&after)?;
442 Ok(self
443 .backend
444 .tail_stream(
445 &self.execution_id,
446 self.attempt_index,
447 after,
448 block_ms,
449 count_limit,
450 )
451 .await?)
452 }
453
454 /// Synthesise a Valkey-backend `Handle` encoding this task's
455 /// attempt-cookie fields. Private to the SDK — the trait
456 /// forwarders call this per op to produce the `&Handle` argument
457 /// the `EngineBackend` methods take.
458 ///
459 /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
460 /// carry one cached `Handle` instead of synthesising per op is
461 /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
462 /// a single allocation of a small byte buffer — negligible
463 /// compared to the FCALL round-trip that follows.
464 ///
465 /// Kind is always `HandleKind::Fresh` because today the 9
466 /// Stage-1b ops all treat the backend tag + opaque bytes as
467 /// authoritative; they do not dispatch on kind. Stage 1d routes
468 /// resumed-claim callers through a `Resumed` synth.
469 fn synth_handle(&self) -> Handle {
470 ff_backend_valkey::ValkeyBackend::encode_handle(
471 self.execution_id.clone(),
472 self.attempt_index,
473 self.attempt_id.clone(),
474 self.lease_id.clone(),
475 self.lease_epoch,
476 self.lease_ttl_ms,
477 self.lane_id.clone(),
478 self.worker_instance_id.clone(),
479 HandleKind::Fresh,
480 )
481 }
482
483 /// Check if the lease is likely still valid based on renewal success.
484 ///
485 /// Returns `false` if 3 or more consecutive renewal attempts have failed.
486 /// Workers should check this before committing expensive or irreversible
487 /// side effects. A `false` return means Valkey may have already expired
488 /// the lease and another worker could be processing this execution.
489 pub fn is_lease_healthy(&self) -> bool {
490 self.renewal_failures.load(Ordering::Relaxed) < 3
491 }
492
493 /// Number of consecutive lease renewal failures since the last success.
494 ///
495 /// Returns 0 when renewals are working normally. Useful for observability
496 /// and custom health policies beyond the default threshold of 3.
497 pub fn consecutive_renewal_failures(&self) -> u32 {
498 self.renewal_failures.load(Ordering::Relaxed)
499 }
500
501 // ── Terminal operations (consume self) ──
502
503 /// Delay the execution until `delay_until`.
504 ///
505 /// Releases the lease. The execution moves to `delayed` state.
506 /// Consumes self — the task cannot be used after delay.
507 pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
508 // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
509 // the pre-migration `stop_renewal` contract: called only when
510 // the FCALL round-tripped (Ok or a typed Lua/engine error);
511 // raw transport errors bubble up without stopping renewal so
512 // the `Drop` warning still fires if the caller later drops
513 // `self` without retrying.
514 let handle = self.synth_handle();
515 let out = self.backend.delay(&handle, delay_until).await;
516 if fcall_landed(&out) {
517 self.stop_renewal();
518 }
519 out.map_err(SdkError::from)
520 }
521
522 /// Move execution to waiting_children state.
523 ///
524 /// Releases the lease. The execution waits for child dependencies to complete.
525 /// Consumes self.
526 pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
527 // RFC-012 Stage 1b: forwards through `backend.wait_children`.
528 // See `delay_execution` for the stop_renewal contract.
529 let handle = self.synth_handle();
530 let out = self.backend.wait_children(&handle).await;
531 if fcall_landed(&out) {
532 self.stop_renewal();
533 }
534 out.map_err(SdkError::from)
535 }
536
537 /// Complete the execution successfully.
538 ///
539 /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
540 /// Renewal continues during the FCALL to prevent lease expiry under
541 /// network latency — the Lua fences on lease_id+epoch atomically.
542 /// Consumes self — the task cannot be used after completion.
543 ///
544 /// # Connection errors and replay
545 ///
546 /// If the Valkey connection drops after the Lua commit but before the
547 /// client reads the response, retrying `complete()` with the same
548 /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
549 /// reconciles the "already terminal with matching outcome" response
550 /// into `Ok(())`. A retry after a different terminal op has raced in
551 /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
552 /// populated `terminal_outcome` so the caller can see what actually
553 /// happened.
554 pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
555 // RFC-012 Stage 1b: forwards through `backend.complete`.
556 // The FCALL body + the `ExecutionNotActive` replay reconciliation
557 // (match same epoch + attempt_id + outcome=="success" → Ok)
558 // moved to `ff_backend_valkey::complete_impl`.
559 let handle = self.synth_handle();
560 let out = self.backend.complete(&handle, result_payload).await;
561 if fcall_landed(&out) {
562 self.stop_renewal();
563 }
564 out.map_err(SdkError::from)
565 }
566
567 /// Fail the execution with a reason and error category.
568 ///
569 /// If the execution policy allows retries, the engine schedules a retry
570 /// (delayed backoff). Otherwise, the execution becomes terminal failed.
571 /// Returns [`FailOutcome`] so the caller knows what happened.
572 ///
573 /// # Connection errors and replay
574 ///
575 /// `fail()` is replay-safe under the same conditions as `complete()`:
576 /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
577 /// reconciled into the outcome the server actually committed
578 /// (`TerminalFailed` if no retries left; `RetryScheduled` with
579 /// `delay_until = 0` if a retry was scheduled — the exact delay is
580 /// not recovered on the replay path).
581 pub async fn fail(
582 self,
583 reason: &str,
584 error_category: &str,
585 ) -> Result<FailOutcome, SdkError> {
586 // RFC-012 Stage 1b: forwards through `backend.fail`.
587 // The FCALL body + retry-policy read + the two-shape replay
588 // reconciliation (terminal/failed → TerminalFailed, runnable
589 // → RetryScheduled{0}) moved to
590 // `ff_backend_valkey::fail_impl`.
591 //
592 // **Preserving the caller's raw category string.** The
593 // pre-Stage-1b code passed `error_category` straight to the
594 // Lua `failure_category` ARGV. Real callers use arbitrary
595 // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
596 // `"inference_error"`, …) that do not correspond to the 5
597 // named `FailureClass` variants. A lossy enum conversion
598 // would rewrite every non-canonical category to
599 // `Transient` — a real behavior change for
600 // ff-readiness-tests + the examples crates.
601 //
602 // Instead: pack the raw category bytes into
603 // `FailureReason.detail`; `fail_impl` reads them back and
604 // threads them to Lua verbatim. The `classification` enum
605 // is derived from the string for the few consumers who
606 // match on the trait return today (none in-workspace).
607 // Stage 1d (or issue #117) widens `FailureClass` with a
608 // `Custom(String)` arm; this stash carrier retires then.
609 let handle = self.synth_handle();
610 let failure_reason = ff_core::backend::FailureReason::with_detail(
611 reason.to_owned(),
612 error_category.as_bytes().to_vec(),
613 );
614 let classification = error_category_to_class(error_category);
615 let out = self.backend.fail(&handle, failure_reason, classification).await;
616 if fcall_landed(&out) {
617 self.stop_renewal();
618 }
619 out.map_err(SdkError::from)
620 }
621
622 /// Cancel the execution.
623 ///
624 /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
625 /// Consumes self.
626 ///
627 /// # Connection errors and replay
628 ///
629 /// `cancel()` is replay-safe under the same conditions as `complete()`:
630 /// a retry by the same caller matching `lease_epoch` + `attempt_id`
631 /// returns `Ok(())` if the server's stored `terminal_outcome` is
632 /// `cancelled`. A retry that finds a different outcome (because a
633 /// concurrent `complete()` or `fail()` won the race) surfaces as
634 /// `ExecutionNotActive` with the populated `terminal_outcome` so the
635 /// caller can see that the cancel intent was NOT honored.
636 pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
637 // RFC-012 Stage 1b: forwards through `backend.cancel`.
638 // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
639 // and the `ExecutionNotActive` → outcome=="cancelled" replay
640 // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
641 let handle = self.synth_handle();
642 let out = self.backend.cancel(&handle, reason).await;
643 if fcall_landed(&out) {
644 self.stop_renewal();
645 }
646 out.map_err(SdkError::from)
647 }
648
649 // ── Non-terminal operations ──
650
651 /// Manually renew the lease.
652 ///
653 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
654 /// trait. The background renewal task calls
655 /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
656 /// this method is the public entry point for workers that want
657 /// to force a renew out-of-band.
658 pub async fn renew_lease(&self) -> Result<(), SdkError> {
659 let handle = self.synth_handle();
660 self.backend
661 .renew(&handle)
662 .await
663 .map(|_renewal| ())
664 .map_err(SdkError::from)
665 }
666
667 /// Update progress (pct 0-100 and optional message).
668 ///
669 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
670 /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
671 ///
672 /// # Not for stream frames
673 ///
674 /// `update_progress` writes the `progress_percent` / `progress_message`
675 /// fields on `exec_core` (the execution's state hash). It is a
676 /// scalar heartbeat — each call overwrites the previous value and
677 /// nothing is appended to the output stream. Producers emitting
678 /// stream frames (arbitrary `frame_type` + payload, consumed via
679 /// `ClaimedTask::read_stream` / `tail_stream` or the HTTP
680 /// stream-tail routes) MUST use [`Self::append_frame`] instead;
681 /// `update_progress` is invisible to stream consumers.
682 pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
683 let handle = self.synth_handle();
684 self.backend
685 .progress(&handle, Some(pct), Some(message.to_owned()))
686 .await
687 .map_err(SdkError::from)
688 }
689
690 /// Report usage against a budget and check limits.
691 ///
692 /// Non-consuming — the worker can report usage multiple times.
693 /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
694 /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
695 ///
696 /// **RFC-012 §R7.2.3:** forwards through
697 /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
698 /// The trait's `UsageDimensions.dedup_key` carries the raw key;
699 /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
700 /// wrap so the dedup state co-locates with the budget partition
701 /// (PR #108).
702 pub async fn report_usage(
703 &self,
704 budget_id: &BudgetId,
705 dimensions: &[(&str, u64)],
706 dedup_key: Option<&str>,
707 ) -> Result<ReportUsageResult, SdkError> {
708 let handle = self.synth_handle();
709 let mut dims = ff_core::backend::UsageDimensions::default();
710 for (name, delta) in dimensions {
711 dims.custom.insert((*name).to_owned(), *delta);
712 }
713 dims.dedup_key = dedup_key
714 .filter(|k| !k.is_empty())
715 .map(|k| k.to_owned());
716 self.backend
717 .report_usage(&handle, budget_id, dims)
718 .await
719 .map_err(SdkError::from)
720 }
721
722 /// Create a pending waitpoint for future signal delivery.
723 ///
724 /// Non-consuming — the worker keeps the lease. Signals delivered to the
725 /// waitpoint are buffered. When the worker later calls `suspend()` with
726 /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
727 /// resume condition.
728 ///
729 /// Returns both the waitpoint_id AND the HMAC token required by external
730 /// callers to buffer signals against this pending waitpoint
731 /// (RFC-004 §Waitpoint Security).
732 ///
733 /// **RFC-012 §R7.2.2:** forwards through
734 /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
735 /// The trait returns
736 /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
737 /// `hmac_token` is the same wire HMAC this method has always
738 /// produced; the SDK unwraps it back to the historical
739 /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
740 pub async fn create_pending_waitpoint(
741 &self,
742 waitpoint_key: &str,
743 expires_in_ms: u64,
744 ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
745 let handle = self.synth_handle();
746 let expires_in = std::time::Duration::from_millis(expires_in_ms);
747 let pending = self
748 .backend
749 .create_waitpoint(&handle, waitpoint_key, expires_in)
750 .await
751 .map_err(SdkError::from)?;
752 // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
753 // `.token()` accessor borrows the underlying
754 // `WaitpointToken`, which is the caller's historical return
755 // shape.
756 Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
757 }
758
759 // ── Phase 4: Streaming ──
760
761 /// Append a frame to the current attempt's output stream.
762 ///
763 /// Non-consuming — the worker can append many frames during execution.
764 /// The stream is created lazily on the first append.
765 ///
766 /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
767 /// `EngineBackend` trait. The free-form `frame_type` tag and
768 /// optional `metadata` (wire `correlation_id`) travel on the
769 /// extended [`ff_core::backend::Frame`] shape (`frame_type:
770 /// String`, `correlation_id: Option<String>`), giving byte-for-byte
771 /// wire parity with the pre-migration direct-FCALL path.
772 ///
773 /// # `append_frame` vs [`Self::update_progress`]
774 ///
775 /// Stream-frame producers (arbitrary `frame_type` + payload
776 /// consumed via `ClaimedTask::read_stream` / `tail_stream` or the
777 /// HTTP stream-tail routes) MUST use `append_frame`.
778 /// `update_progress` writes scalar `progress_percent` /
779 /// `progress_message` fields to `exec_core` and is invisible to
780 /// stream consumers.
781 pub async fn append_frame(
782 &self,
783 frame_type: &str,
784 payload: &[u8],
785 metadata: Option<&str>,
786 ) -> Result<AppendFrameOutcome, SdkError> {
787 let handle = self.synth_handle();
788 let mut frame = ff_core::backend::Frame::new(
789 payload.to_vec(),
790 ff_core::backend::FrameKind::Event,
791 )
792 .with_frame_type(frame_type);
793 if let Some(cid) = metadata {
794 frame = frame.with_correlation_id(cid);
795 }
796 self.backend
797 .append_frame(&handle, frame)
798 .await
799 .map_err(SdkError::from)
800 }
801
802 // ── Phase 3: Suspend ──
803
804 /// Suspend the execution, releasing the lease and creating a waitpoint.
805 ///
806 /// The execution transitions to `suspended` and the worker loses ownership.
807 /// An external signal matching the condition will resume the execution.
808 ///
809 /// If `condition_matchers` is empty, a wildcard matcher is created that
810 /// matches ANY signal name. To require an explicit operator resume with
811 /// no signal match, pass a sentinel name that no real signal will use.
812 ///
813 /// If buffered signals on a pending waitpoint already satisfy the condition,
814 /// returns `AlreadySatisfied` and the lease is NOT released.
815 ///
816 /// Consumes self — the task cannot be used after suspension.
817 // TODO(stage-1d-or-rfc-amendment): deferred from Stage 1b. Trait
818 // method `suspend` returns `Handle` (a resumed-kind attempt
819 // cookie); this SDK method returns `SuspendOutcome { suspension_id,
820 // waitpoint_id, waitpoint_key, waitpoint_token }` — semantically
821 // distinct return shapes. Also: SDK takes `&[ConditionMatcher]` +
822 // `TimeoutBehavior` with no `WaitpointSpec` mapping. Tracked in #117.
823 pub async fn suspend(
824 self,
825 reason_code: &str,
826 condition_matchers: &[ConditionMatcher],
827 timeout_ms: Option<u64>,
828 timeout_behavior: TimeoutBehavior,
829 ) -> Result<SuspendOutcome, SdkError> {
830 let partition = execution_partition(&self.execution_id, &self.partition_config);
831 let ctx = ExecKeyContext::new(&partition, &self.execution_id);
832 let idx = IndexKeys::new(&partition);
833
834 let suspension_id = SuspensionId::new();
835 let waitpoint_id = WaitpointId::new();
836 // For now, use a simple opaque key (real implementation would use HMAC token)
837 let waitpoint_key = format!("wpk:{}", waitpoint_id);
838
839 let timeout_at = timeout_ms.map(|ms| TimestampMs::from_millis(TimestampMs::now().0 + ms as i64));
840
841 // Build resume condition JSON
842 let required_signal_names: Vec<&str> = condition_matchers
843 .iter()
844 .map(|m| m.signal_name.as_str())
845 .collect();
846 let match_mode = if required_signal_names.len() <= 1 { "any" } else { "all" };
847 let resume_condition_json = serde_json::json!({
848 "condition_type": "signal_set",
849 "required_signal_names": required_signal_names,
850 "signal_match_mode": match_mode,
851 "minimum_signal_count": 1,
852 "timeout_behavior": timeout_behavior.as_str(),
853 "allow_operator_override": true,
854 }).to_string();
855
856 let resume_policy_json = serde_json::json!({
857 "resume_target": "runnable",
858 "close_waitpoint_on_resume": true,
859 "consume_matched_signals": true,
860 "retain_signal_buffer_until_closed": true,
861 }).to_string();
862
863 // KEYS (17): exec_core, attempt_record, lease_current, lease_history,
864 // lease_expiry, worker_leases, suspension_current, waitpoint_hash,
865 // waitpoint_signals, suspension_timeout, pending_wp_expiry,
866 // active_index, suspended_index, waitpoint_history, wp_condition,
867 // attempt_timeout, hmac_secrets
868 let keys: Vec<String> = vec![
869 ctx.core(), // 1
870 ctx.attempt_hash(self.attempt_index), // 2
871 ctx.lease_current(), // 3
872 ctx.lease_history(), // 4
873 idx.lease_expiry(), // 5
874 idx.worker_leases(&self.worker_instance_id), // 6
875 ctx.suspension_current(), // 7
876 ctx.waitpoint(&waitpoint_id), // 8
877 ctx.waitpoint_signals(&waitpoint_id), // 9
878 idx.suspension_timeout(), // 10
879 idx.pending_waitpoint_expiry(), // 11
880 idx.lane_active(&self.lane_id), // 12
881 idx.lane_suspended(&self.lane_id), // 13
882 ctx.waitpoints(), // 14
883 ctx.waitpoint_condition(&waitpoint_id), // 15
884 idx.attempt_timeout(), // 16
885 idx.waitpoint_hmac_secrets(), // 17
886 ];
887
888 // ARGV (17): execution_id, attempt_index, attempt_id, lease_id,
889 // lease_epoch, suspension_id, waitpoint_id, waitpoint_key,
890 // reason_code, requested_by, timeout_at, resume_condition_json,
891 // resume_policy_json, continuation_metadata_pointer,
892 // use_pending_waitpoint, timeout_behavior, lease_history_maxlen
893 let args: Vec<String> = vec![
894 self.execution_id.to_string(), // 1
895 self.attempt_index.to_string(), // 2
896 self.attempt_id.to_string(), // 3
897 self.lease_id.to_string(), // 4
898 self.lease_epoch.to_string(), // 5
899 suspension_id.to_string(), // 6
900 waitpoint_id.to_string(), // 7
901 waitpoint_key.clone(), // 8
902 reason_code.to_owned(), // 9
903 "worker".to_owned(), // 10
904 timeout_at.map_or(String::new(), |t| t.to_string()), // 11
905 resume_condition_json, // 12
906 resume_policy_json, // 13
907 String::new(), // 14 continuation_metadata_ptr
908 String::new(), // 15 use_pending_waitpoint
909 timeout_behavior.as_str().to_owned(), // 16
910 "1000".to_owned(), // 17 lease_history_maxlen
911 ];
912
913 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
914 let arg_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
915
916 let raw: Value = self
917 .client
918 .fcall("ff_suspend_execution", &key_refs, &arg_refs)
919 .await
920 .map_err(SdkError::from)?;
921
922 self.stop_renewal();
923 parse_suspend_result(&raw, suspension_id, waitpoint_id, waitpoint_key)
924 }
925
926 /// Read the signals that satisfied the waitpoint and triggered this
927 /// resume.
928 ///
929 /// Non-consuming. Intended to be called immediately after re-claim via
930 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
931 /// subsequent `suspend()` (which replaces `suspension:current`).
932 ///
933 /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
934 ///
935 /// - No prior suspension on this execution.
936 /// - The prior suspension belonged to an earlier attempt (e.g. the
937 /// attempt was cancelled/failed and a retry is now claiming).
938 /// - The prior suspension was closed by timeout / cancel / operator
939 /// override rather than by a matched signal.
940 ///
941 /// Reads `suspension:current` once, filters by `attempt_index` to
942 /// guard against stale prior-attempt records, then fetches the matched
943 /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
944 /// fields and reads each signal's metadata + payload directly.
945 pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
946 // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
947 // Pre-migration body (HGETALL suspension_current + HMGET
948 // matchers + pipelined HGETALL signal_hash / GET
949 // signal_payload) lives in
950 // `ff_backend_valkey::observe_signals_impl`.
951 let handle = self.synth_handle();
952 self.backend
953 .observe_signals(&handle)
954 .await
955 .map_err(SdkError::from)
956 }
957
958 /// Signal the renewal task to stop. Called by every terminal op
959 /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
960 /// `move_to_waiting_children`) after the FCALL returns. Also marks
961 /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
962 /// consumption from a genuine drop-without-terminal-op.
963 fn stop_renewal(&self) {
964 self.terminal_op_called.store(true, Ordering::Release);
965 self.renewal_stop.notify_one();
966 }
967
968}
969
970/// True iff the backend's FCALL result represents a round-trip that
971/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
972/// contention, conflict, state, bug) all count as "landed" — the
973/// server either committed or rejected with a typed response. Only
974/// raw `Transport` errors (connection drops, request timeouts, parse
975/// failures) count as "did not land", which matches the pre-Stage-1b
976/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
977/// — those errors returned before `stop_renewal()` ran, preserving
978/// the `Drop` warning for genuine "lease will leak" cases.
979///
980/// Stage 1b terminal-op forwarders use this predicate to decide
981/// whether to call `stop_renewal()`: yes for landed responses, no
982/// for transport errors so the caller's retry path still sees a
983/// running renewal task.
984fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
985 match r {
986 Ok(_) => true,
987 Err(crate::EngineError::Transport { .. }) => false,
988 Err(_) => true,
989 }
990}
991
992/// Map the SDK's free-form `error_category: &str` to the typed
993/// `FailureClass` the trait's `fail` method takes. Unknown categories
994/// fall through to `Transient` — the Lua side already tolerated
995/// arbitrary strings, so the worst a category drift does under the
996/// Stage 1b forwarder is reclassify a novel category as transient.
997/// Stage 1d (or issue #117) widens `FailureClass` with a
998/// `Custom(String)` arm for exact round-trip.
999fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
1000 use ff_core::backend::FailureClass;
1001 match s {
1002 "transient" => FailureClass::Transient,
1003 "permanent" => FailureClass::Permanent,
1004 "infra_crash" => FailureClass::InfraCrash,
1005 "timeout" => FailureClass::Timeout,
1006 "cancelled" => FailureClass::Cancelled,
1007 _ => FailureClass::Transient,
1008 }
1009}
1010
1011impl Drop for ClaimedTask {
1012 fn drop(&mut self) {
1013 // Abort the background renewal task on drop.
1014 // This is a safety net — complete/fail/cancel already stop renewal
1015 // via notify before consuming self. But if the task is dropped
1016 // without being consumed (e.g., panic), abort prevents leaked renewals.
1017 //
1018 // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1019 // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1020 // and then self is consumed into Drop immediately. The renewal task
1021 // has not yet been polled by the runtime, so `is_finished()` is still
1022 // `false` here — which previously fired the warning on every
1023 // complete/fail/cancel/suspend call. `terminal_op_called` is the
1024 // authoritative signal that a terminal-op path ran to the point of
1025 // stopping renewal; it does not by itself certify the Lua side
1026 // succeeded (see the field doc). The caller surfaces any error via
1027 // the op's return value, so a `Drop` warning is unneeded there.
1028 if !self.terminal_op_called.load(Ordering::Acquire) {
1029 tracing::warn!(
1030 execution_id = %self.execution_id,
1031 "ClaimedTask dropped without terminal operation — lease will expire"
1032 );
1033 }
1034 self.renewal_handle.abort();
1035 }
1036}
1037
1038// ── Lease renewal ──
1039
1040/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1041/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1042/// hooks still see one span per renewal (restores PR #119 Cursor
1043/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1044/// construction time, not per-tick).
1045///
1046/// See `benches/harness/src/bin/long_running.rs` for the bench
1047/// consumer that depends on this span naming.
1048#[tracing::instrument(
1049 name = "renew_lease",
1050 skip_all,
1051 fields(execution_id = %execution_id)
1052)]
1053async fn renew_once(
1054 backend: &dyn EngineBackend,
1055 handle: &Handle,
1056 execution_id: &ExecutionId,
1057) -> Result<(), crate::EngineError> {
1058 backend.renew(handle).await.map(|_| ())
1059}
1060
1061/// Spawn a background tokio task that renews the lease at `ttl / 3`
1062/// intervals.
1063///
1064/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1065/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1066/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1067/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1068/// + the encoded `Handle` instead. Per-tick tracing lives on
1069/// [`renew_once`]; this function itself is sync + one-shot.
1070///
1071/// Stops when:
1072/// - `stop_signal` is notified (complete/fail/cancel called)
1073/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1074/// - The task handle is aborted (ClaimedTask dropped)
1075fn spawn_renewal_task(
1076 backend: Arc<dyn EngineBackend>,
1077 handle: Handle,
1078 execution_id: ExecutionId,
1079 lease_ttl_ms: u64,
1080 stop_signal: Arc<Notify>,
1081 failure_counter: Arc<AtomicU32>,
1082) -> JoinHandle<()> {
1083 // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1084 // panics if a caller (or a misconfigured test) passes a
1085 // lease_ttl_ms < 3. The SDK config validator already enforces
1086 // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1087 // is a cheap belt-and-suspenders (Copilot review finding on
1088 // PR #119).
1089 let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1090
1091 tokio::spawn(async move {
1092 let mut tick = tokio::time::interval(interval);
1093 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1094 // Skip the first immediate tick — the lease was just acquired.
1095 tick.tick().await;
1096
1097 loop {
1098 tokio::select! {
1099 _ = stop_signal.notified() => {
1100 tracing::debug!(
1101 execution_id = %execution_id,
1102 "lease renewal stopped by signal"
1103 );
1104 return;
1105 }
1106 _ = tick.tick() => {
1107 match renew_once(backend.as_ref(), &handle, &execution_id).await {
1108 Ok(_renewal) => {
1109 failure_counter.store(0, Ordering::Relaxed);
1110 tracing::trace!(
1111 execution_id = %execution_id,
1112 "lease renewed"
1113 );
1114 }
1115 Err(e) if is_terminal_renewal_error(&e) => {
1116 failure_counter.fetch_add(1, Ordering::Relaxed);
1117 tracing::warn!(
1118 execution_id = %execution_id,
1119 error = %e,
1120 "lease renewal failed with terminal error, stopping renewal"
1121 );
1122 return;
1123 }
1124 Err(e) => {
1125 let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1126 tracing::warn!(
1127 execution_id = %execution_id,
1128 error = %e,
1129 consecutive_failures = count,
1130 "lease renewal failed (will retry next interval)"
1131 );
1132 }
1133 }
1134 }
1135 }
1136 }
1137 })
1138}
1139
1140/// Check if an engine error means renewal should stop permanently.
1141#[allow(dead_code)]
1142fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1143 use crate::{ContentionKind, EngineError, StateKind};
1144 matches!(
1145 err,
1146 EngineError::State(
1147 StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1148 ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1149 | EngineError::NotFound { entity: "execution" }
1150 )
1151}
1152
1153// ── FCALL result parsing ──
1154
1155/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1156/// function into a typed [`ReportUsageResult`].
1157///
1158/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1159/// `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1160/// Status code `!= 1` is parsed as a [`ScriptError`] via
1161/// [`ScriptError::from_code_with_detail`].
1162///
1163/// Exposed as `pub` so downstream SDKs that speak the same wire format
1164/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1165/// call this directly instead of re-implementing the parse. Keeping one
1166/// parser paired with the producer (the Lua function registered at
1167/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1168/// against silent format drift between producer and consumer.
1169pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1170 let arr = match raw {
1171 Value::Array(arr) => arr,
1172 _ => {
1173 return Err(SdkError::from(ScriptError::Parse {
1174 fcall: "parse_report_usage_result".into(),
1175 execution_id: None,
1176 message: "ff_report_usage_and_check: expected Array".into(),
1177 }));
1178 }
1179 };
1180 let status_code = match arr.first() {
1181 Some(Ok(Value::Int(n))) => *n,
1182 _ => {
1183 return Err(SdkError::from(ScriptError::Parse {
1184 fcall: "parse_report_usage_result".into(),
1185 execution_id: None,
1186 message: "ff_report_usage_and_check: expected Int status code".into(),
1187 }));
1188 }
1189 };
1190 if status_code != 1 {
1191 let error_code = usage_field_str(arr, 1);
1192 let detail = usage_field_str(arr, 2);
1193 return Err(SdkError::from(
1194 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1195 ScriptError::Parse {
1196 fcall: "parse_report_usage_result".into(),
1197 execution_id: None,
1198 message: format!("ff_report_usage_and_check: {error_code}"),
1199 }
1200 }),
1201 ));
1202 }
1203 let sub_status = usage_field_str(arr, 1);
1204 match sub_status.as_str() {
1205 "OK" => Ok(ReportUsageResult::Ok),
1206 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1207 "SOFT_BREACH" => {
1208 let dim = usage_field_str(arr, 2);
1209 let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1210 let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1211 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1212 }
1213 "HARD_BREACH" => {
1214 let dim = usage_field_str(arr, 2);
1215 let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1216 let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1217 Ok(ReportUsageResult::HardBreach {
1218 dimension: dim,
1219 current_usage: current,
1220 hard_limit: limit,
1221 })
1222 }
1223 _ => Err(SdkError::from(ScriptError::Parse {
1224 fcall: "parse_report_usage_result".into(),
1225 execution_id: None,
1226 message: format!(
1227 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1228 ),
1229 })),
1230 }
1231}
1232
1233fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1234 match arr.get(index) {
1235 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1236 Some(Ok(Value::SimpleString(s))) => s.clone(),
1237 Some(Ok(Value::Int(n))) => n.to_string(),
1238 _ => String::new(),
1239 }
1240}
1241
1242/// Parse a required numeric usage field (u64) from the wire array at
1243/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1244/// holds a non-string/non-int value, or contains a string that does
1245/// not parse as u64.
1246///
1247/// Rationale: the Lua producer (`lua/budget.lua:99`,
1248/// `ff_report_usage_and_check`) always emits
1249/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1250/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1251/// non-numeric value here means the Lua and Rust sides drifted;
1252/// silently coercing to `0` would surface drift as "zero-usage breach"
1253/// — arithmetically correct but semantically nonsense. Fail loudly
1254/// instead so drift shows up as a parse error at the first call site.
1255fn parse_usage_u64(
1256 arr: &[Result<Value, ferriskey::Error>],
1257 index: usize,
1258 sub_status: &str,
1259 field_name: &str,
1260) -> Result<u64, SdkError> {
1261 match arr.get(index) {
1262 Some(Ok(Value::Int(n))) => {
1263 u64::try_from(*n).map_err(|_| {
1264 SdkError::from(ScriptError::Parse {
1265 fcall: "parse_usage_u64".into(),
1266 execution_id: None,
1267 message: format!(
1268 "ff_report_usage_and_check {sub_status}: {field_name} \
1269 (index {index}) negative int {n} cannot be u64"
1270 ),
1271 })
1272 })
1273 }
1274 Some(Ok(Value::BulkString(b))) => {
1275 let s = String::from_utf8_lossy(b);
1276 s.parse::<u64>().map_err(|_| {
1277 SdkError::from(ScriptError::Parse {
1278 fcall: "parse_usage_u64".into(),
1279 execution_id: None,
1280 message: format!(
1281 "ff_report_usage_and_check {sub_status}: {field_name} \
1282 (index {index}) not a u64 string: {s:?}"
1283 ),
1284 })
1285 })
1286 }
1287 Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1288 SdkError::from(ScriptError::Parse {
1289 fcall: "parse_usage_u64".into(),
1290 execution_id: None,
1291 message: format!(
1292 "ff_report_usage_and_check {sub_status}: {field_name} \
1293 (index {index}) not a u64 string: {s:?}"
1294 ),
1295 })
1296 }),
1297 Some(_) => Err(SdkError::from(ScriptError::Parse {
1298 fcall: "parse_usage_u64".into(),
1299 execution_id: None,
1300 message: format!(
1301 "ff_report_usage_and_check {sub_status}: {field_name} \
1302 (index {index}) wrong wire type (expected Int or String)"
1303 ),
1304 })),
1305 None => Err(SdkError::from(ScriptError::Parse {
1306 fcall: "parse_usage_u64".into(),
1307 execution_id: None,
1308 message: format!(
1309 "ff_report_usage_and_check {sub_status}: {field_name} \
1310 (index {index}) missing from response"
1311 ),
1312 })),
1313 }
1314}
1315
1316/// Pure helper: decide whether `suspension:current` represents a
1317/// signal-driven resume for the currently-claimed attempt, and extract
1318/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1319/// (no record, stale prior-attempt, non-resumed close). Returns an error
1320/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1321/// bug rather than a missing-data case.
1322///
1323/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1324/// `EngineBackend::observe_signals`, which re-implements this invariant
1325/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1326/// tests so the parsing contract stays exercised at the SDK layer —
1327/// Stage 1d will consolidate (either promote the helper into ff-core
1328/// or drop these tests once the backend-side tests cover equivalent
1329/// ground).
1330#[allow(dead_code)]
1331fn resume_waitpoint_id_from_suspension(
1332 susp: &HashMap<String, String>,
1333 claimed_attempt: AttemptIndex,
1334) -> Result<Option<WaitpointId>, SdkError> {
1335 if susp.is_empty() {
1336 return Ok(None);
1337 }
1338 let susp_att: u32 = susp
1339 .get("attempt_index")
1340 .and_then(|s| s.parse().ok())
1341 .unwrap_or(u32::MAX);
1342 if susp_att != claimed_attempt.0 {
1343 return Ok(None);
1344 }
1345 let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1346 if close_reason != "resumed" {
1347 return Ok(None);
1348 }
1349 let wp_id_str = susp
1350 .get("waitpoint_id")
1351 .map(String::as_str)
1352 .unwrap_or_default();
1353 if wp_id_str.is_empty() {
1354 return Ok(None);
1355 }
1356 let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1357 SdkError::from(ScriptError::Parse {
1358 fcall: "resume_waitpoint_id_from_suspension".into(),
1359 execution_id: None,
1360 message: format!(
1361 "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1362 ),
1363 })
1364 })?;
1365 Ok(Some(waitpoint_id))
1366}
1367
1368pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1369 let arr = match raw {
1370 Value::Array(arr) => arr,
1371 _ => {
1372 return Err(SdkError::from(ScriptError::Parse {
1373 fcall: "parse_success_result".into(),
1374 execution_id: None,
1375 message: format!(
1376 "{function_name}: expected Array, got non-array"
1377 ),
1378 }));
1379 }
1380 };
1381
1382 if arr.is_empty() {
1383 return Err(SdkError::from(ScriptError::Parse {
1384 fcall: "parse_success_result".into(),
1385 execution_id: None,
1386 message: format!(
1387 "{function_name}: empty result array"
1388 ),
1389 }));
1390 }
1391
1392 let status_code = match arr.first() {
1393 Some(Ok(Value::Int(n))) => *n,
1394 _ => {
1395 return Err(SdkError::from(ScriptError::Parse {
1396 fcall: "parse_success_result".into(),
1397 execution_id: None,
1398 message: format!(
1399 "{function_name}: expected Int at index 0"
1400 ),
1401 }));
1402 }
1403 };
1404
1405 if status_code == 1 {
1406 Ok(())
1407 } else {
1408 // Extract error code from index 1 and optional detail from index 2
1409 // (e.g. `capability_mismatch` ships missing tokens there). Variants
1410 // that carry a String payload pick the detail up via
1411 // `from_code_with_detail`; other variants ignore it.
1412 let field_str = |idx: usize| -> String {
1413 arr.get(idx)
1414 .and_then(|v| match v {
1415 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1416 Ok(Value::SimpleString(s)) => Some(s.clone()),
1417 _ => None,
1418 })
1419 .unwrap_or_default()
1420 };
1421 let error_code = {
1422 let s = field_str(1);
1423 if s.is_empty() { "unknown".to_owned() } else { s }
1424 };
1425 // Collect all detail slots (idx >= 2). Most variants only read
1426 // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1427 // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1428 // reconciliation after a network drop.
1429 let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1430 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1431
1432 let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1433 .unwrap_or_else(|| {
1434 ScriptError::Parse {
1435 fcall: "parse_success_result".into(),
1436 execution_id: None,
1437 message: format!("{function_name}: unknown error: {error_code}"),
1438 }
1439 });
1440
1441 Err(SdkError::from(script_err))
1442 }
1443}
1444
1445/// Parse ff_suspend_execution result:
1446/// ok(suspension_id, waitpoint_id, waitpoint_key)
1447/// ok_already_satisfied(suspension_id, waitpoint_id, waitpoint_key)
1448fn parse_suspend_result(
1449 raw: &Value,
1450 suspension_id: SuspensionId,
1451 waitpoint_id: WaitpointId,
1452 waitpoint_key: String,
1453) -> Result<SuspendOutcome, SdkError> {
1454 let arr = match raw {
1455 Value::Array(arr) => arr,
1456 _ => {
1457 return Err(SdkError::from(ScriptError::Parse {
1458 fcall: "parse_suspend_result".into(),
1459 execution_id: None,
1460 message: "ff_suspend_execution: expected Array".into(),
1461 }));
1462 }
1463 };
1464
1465 let status_code = match arr.first() {
1466 Some(Ok(Value::Int(n))) => *n,
1467 _ => {
1468 return Err(SdkError::from(ScriptError::Parse {
1469 fcall: "parse_suspend_result".into(),
1470 execution_id: None,
1471 message: "ff_suspend_execution: bad status code".into(),
1472 }));
1473 }
1474 };
1475
1476 if status_code != 1 {
1477 let err_field_str = |idx: usize| -> String {
1478 arr.get(idx)
1479 .and_then(|v| match v {
1480 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1481 Ok(Value::SimpleString(s)) => Some(s.clone()),
1482 _ => None,
1483 })
1484 .unwrap_or_default()
1485 };
1486 let error_code = {
1487 let s = err_field_str(1);
1488 if s.is_empty() { "unknown".to_owned() } else { s }
1489 };
1490 let detail = err_field_str(2);
1491 return Err(SdkError::from(
1492 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1493 ScriptError::Parse {
1494 fcall: "parse_suspend_result".into(),
1495 execution_id: None,
1496 message: format!("ff_suspend_execution: {error_code}"),
1497 }
1498 }),
1499 ));
1500 }
1501
1502 // Check sub-status at index 1
1503 let sub_status = arr
1504 .get(1)
1505 .and_then(|v| match v {
1506 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1507 Ok(Value::SimpleString(s)) => Some(s.clone()),
1508 _ => None,
1509 })
1510 .unwrap_or_default();
1511
1512 // Lua returns: {1, status, suspension_id, waitpoint_id, waitpoint_key, waitpoint_token}
1513 // The suspension_id/waitpoint_id/waitpoint_key values the worker passed in are
1514 // authoritative (Lua echoes them); waitpoint_token however is MINTED by Lua and
1515 // must be read from the response.
1516 let waitpoint_token = arr
1517 .get(5)
1518 .and_then(|v| match v {
1519 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1520 Ok(Value::SimpleString(s)) => Some(s.clone()),
1521 _ => None,
1522 })
1523 .map(WaitpointToken::new)
1524 .ok_or_else(|| {
1525 SdkError::from(ScriptError::Parse {
1526 fcall: "parse_suspend_result".into(),
1527 execution_id: None,
1528 message: "ff_suspend_execution: missing waitpoint_token in response".into(),
1529 })
1530 })?;
1531
1532 if sub_status == "ALREADY_SATISFIED" {
1533 Ok(SuspendOutcome::AlreadySatisfied {
1534 suspension_id,
1535 waitpoint_id,
1536 waitpoint_key,
1537 waitpoint_token,
1538 })
1539 } else {
1540 Ok(SuspendOutcome::Suspended {
1541 suspension_id,
1542 waitpoint_id,
1543 waitpoint_key,
1544 waitpoint_token,
1545 })
1546 }
1547}
1548
1549/// Parse ff_deliver_signal result:
1550/// ok(signal_id, effect)
1551/// ok_duplicate(existing_signal_id)
1552pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1553 let arr = match raw {
1554 Value::Array(arr) => arr,
1555 _ => {
1556 return Err(SdkError::from(ScriptError::Parse {
1557 fcall: "parse_signal_result".into(),
1558 execution_id: None,
1559 message: "ff_deliver_signal: expected Array".into(),
1560 }));
1561 }
1562 };
1563
1564 let status_code = match arr.first() {
1565 Some(Ok(Value::Int(n))) => *n,
1566 _ => {
1567 return Err(SdkError::from(ScriptError::Parse {
1568 fcall: "parse_signal_result".into(),
1569 execution_id: None,
1570 message: "ff_deliver_signal: bad status code".into(),
1571 }));
1572 }
1573 };
1574
1575 if status_code != 1 {
1576 let err_field_str = |idx: usize| -> String {
1577 arr.get(idx)
1578 .and_then(|v| match v {
1579 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1580 Ok(Value::SimpleString(s)) => Some(s.clone()),
1581 _ => None,
1582 })
1583 .unwrap_or_default()
1584 };
1585 let error_code = {
1586 let s = err_field_str(1);
1587 if s.is_empty() { "unknown".to_owned() } else { s }
1588 };
1589 let detail = err_field_str(2);
1590 return Err(SdkError::from(
1591 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1592 ScriptError::Parse {
1593 fcall: "parse_signal_result".into(),
1594 execution_id: None,
1595 message: format!("ff_deliver_signal: {error_code}"),
1596 }
1597 }),
1598 ));
1599 }
1600
1601 let sub_status = arr
1602 .get(1)
1603 .and_then(|v| match v {
1604 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1605 Ok(Value::SimpleString(s)) => Some(s.clone()),
1606 _ => None,
1607 })
1608 .unwrap_or_default();
1609
1610 if sub_status == "DUPLICATE" {
1611 let existing_id = arr
1612 .get(2)
1613 .and_then(|v| match v {
1614 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1615 Ok(Value::SimpleString(s)) => Some(s.clone()),
1616 _ => None,
1617 })
1618 .unwrap_or_default();
1619 return Ok(SignalOutcome::Duplicate {
1620 existing_signal_id: existing_id,
1621 });
1622 }
1623
1624 // Parse: {1, "OK", signal_id, effect}
1625 let signal_id_str = arr
1626 .get(2)
1627 .and_then(|v| match v {
1628 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1629 Ok(Value::SimpleString(s)) => Some(s.clone()),
1630 _ => None,
1631 })
1632 .unwrap_or_default();
1633
1634 let effect = arr
1635 .get(3)
1636 .and_then(|v| match v {
1637 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1638 Ok(Value::SimpleString(s)) => Some(s.clone()),
1639 _ => None,
1640 })
1641 .unwrap_or_default();
1642
1643 let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1644 SdkError::from(ScriptError::Parse {
1645 fcall: "parse_signal_result".into(),
1646 execution_id: None,
1647 message: format!(
1648 "ff_deliver_signal: invalid signal_id from Lua: {e}"
1649 ),
1650 })
1651 })?;
1652
1653 if effect == "resume_condition_satisfied" {
1654 Ok(SignalOutcome::TriggeredResume { signal_id })
1655 } else {
1656 Ok(SignalOutcome::Accepted { signal_id, effect })
1657 }
1658}
1659
1660/// Parse ff_fail_execution result:
1661/// ok("retry_scheduled", delay_until)
1662/// ok("terminal_failed")
1663#[allow(dead_code)]
1664fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1665 let arr = match raw {
1666 Value::Array(arr) => arr,
1667 _ => {
1668 return Err(SdkError::from(ScriptError::Parse {
1669 fcall: "parse_fail_result".into(),
1670 execution_id: None,
1671 message: "ff_fail_execution: expected Array".into(),
1672 }));
1673 }
1674 };
1675
1676 let status_code = match arr.first() {
1677 Some(Ok(Value::Int(n))) => *n,
1678 _ => {
1679 return Err(SdkError::from(ScriptError::Parse {
1680 fcall: "parse_fail_result".into(),
1681 execution_id: None,
1682 message: "ff_fail_execution: bad status code".into(),
1683 }));
1684 }
1685 };
1686
1687 if status_code != 1 {
1688 let err_field_str = |idx: usize| -> String {
1689 arr.get(idx)
1690 .and_then(|v| match v {
1691 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1692 Ok(Value::SimpleString(s)) => Some(s.clone()),
1693 _ => None,
1694 })
1695 .unwrap_or_default()
1696 };
1697 let error_code = {
1698 let s = err_field_str(1);
1699 if s.is_empty() { "unknown".to_owned() } else { s }
1700 };
1701 let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1702 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1703 return Err(SdkError::from(
1704 ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1705 ScriptError::Parse {
1706 fcall: "parse_fail_result".into(),
1707 execution_id: None,
1708 message: format!("ff_fail_execution: {error_code}"),
1709 }
1710 }),
1711 ));
1712 }
1713
1714 // Parse sub-status from field[2] (index 2 = first field after status+OK)
1715 let sub_status = arr
1716 .get(2)
1717 .and_then(|v| match v {
1718 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1719 Ok(Value::SimpleString(s)) => Some(s.clone()),
1720 _ => None,
1721 })
1722 .unwrap_or_default();
1723
1724 match sub_status.as_str() {
1725 "retry_scheduled" => {
1726 // Lua returns: ok("retry_scheduled", tostring(delay_until))
1727 // arr[3] = delay_until
1728 let delay_str = arr
1729 .get(3)
1730 .and_then(|v| match v {
1731 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1732 Ok(Value::Int(n)) => Some(n.to_string()),
1733 _ => None,
1734 })
1735 .unwrap_or_default();
1736 let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1737
1738 Ok(FailOutcome::RetryScheduled {
1739 delay_until: TimestampMs::from_millis(delay_until),
1740 })
1741 }
1742 "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1743 _ => Err(SdkError::from(ScriptError::Parse {
1744 fcall: "parse_fail_result".into(),
1745 execution_id: None,
1746 message: format!(
1747 "ff_fail_execution: unexpected sub-status: {sub_status}"
1748 ),
1749 })),
1750 }
1751}
1752
1753// ── Stream read / tail (consumer API, RFC-006 #2) ──
1754
1755/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1756/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1757/// server would accept.
1758pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1759
1760/// Maximum frames per read/tail call. Mirrors
1761/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1762/// callers don't need to import ff-core just to read the bound.
1763pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1764
1765/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1766/// signal so polling consumers can exit cleanly.
1767///
1768/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1769pub use ff_core::contracts::StreamFrames;
1770
1771/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1772/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1773/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1774/// `StreamCursor::Start` / `StreamCursor::End` instead.
1775pub use ff_core::contracts::StreamCursor;
1776
1777/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1778/// — XREAD does not accept the open markers. Pulled out as a bare
1779/// function so unit tests can exercise the guard without constructing a
1780/// live `ferriskey::Client`.
1781fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1782 if !after.is_concrete() {
1783 return Err(SdkError::Config {
1784 context: "tail_stream".into(),
1785 field: Some("after".into()),
1786 message: "XREAD cursor must be a concrete entry id; pass \
1787 StreamCursor::from_beginning() to start from the \
1788 beginning"
1789 .into(),
1790 });
1791 }
1792 Ok(())
1793}
1794
1795fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1796 if count_limit == 0 {
1797 return Err(SdkError::Config {
1798 context: "read_stream_frames".into(),
1799 field: Some("count_limit".into()),
1800 message: "count_limit must be >= 1".into(),
1801 });
1802 }
1803 if count_limit > STREAM_READ_HARD_CAP {
1804 return Err(SdkError::Config {
1805 context: "read_stream_frames".into(),
1806 field: Some("count_limit".into()),
1807 message: format!(
1808 "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1809 ),
1810 });
1811 }
1812 Ok(())
1813}
1814
1815/// Read frames from a completed or in-flight attempt's stream.
1816///
1817/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1818/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1819/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1820/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1821/// `0` returns [`SdkError::Config`].
1822///
1823/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1824/// consumers know when the producer has finalized the stream. A
1825/// never-written attempt and an in-progress stream are indistinguishable
1826/// here — both present as `frames=[]`, `closed_at=None`.
1827///
1828/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1829/// client but are not the lease-holding worker — no lease check is
1830/// performed.
1831///
1832/// # Head-of-line note
1833///
1834/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1835/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1836/// calling this on a `client` that is also serving FCALLs stalls those
1837/// FCALLs behind the reply. The REST server isolates reads on its
1838/// `tail_client`; direct SDK callers should either use a dedicated
1839/// client OR paginate through smaller `count_limit` slices.
1840pub async fn read_stream(
1841 backend: &dyn EngineBackend,
1842 execution_id: &ExecutionId,
1843 attempt_index: AttemptIndex,
1844 from: StreamCursor,
1845 to: StreamCursor,
1846 count_limit: u64,
1847) -> Result<StreamFrames, SdkError> {
1848 validate_stream_read_count(count_limit)?;
1849 Ok(backend
1850 .read_stream(execution_id, attempt_index, from, to, count_limit)
1851 .await?)
1852}
1853
1854/// Tail a live attempt's stream.
1855///
1856/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1857/// with id strictly greater than `after`. Pass
1858/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1859/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1860/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1861/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1862///
1863/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1864/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1865/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1866/// SDK and REST ceilings aligned.
1867///
1868/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1869/// polling consumers should loop until `result.is_closed()` is true, then
1870/// drain and exit. Timeout with no new frames presents as
1871/// `frames=[], closed_at=None`.
1872///
1873/// # Head-of-line warning — use a dedicated client
1874///
1875/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1876/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1877/// the read side until a frame arrives or the block elapses. If the
1878/// `client` you pass here is ALSO used for claims, completes, fails,
1879/// appends, or any other FCALL, a 30-second tail will stall all those
1880/// calls for up to 30 seconds.
1881///
1882/// **Strongly recommended**: build a separate `ferriskey::Client` for
1883/// tail callers — mirrors the `Server::tail_client` split that the REST
1884/// server uses internally (see `crates/ff-server/src/server.rs` and
1885/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1886///
1887/// # Tail parallelism caveat (same mux)
1888///
1889/// Even a dedicated tail client is still one multiplexed TCP connection.
1890/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1891/// ferriskey's per-call `request_timeout` starts at future-poll — so
1892/// two concurrent tails against the same client can time out spuriously:
1893/// the second call's BLOCK budget elapses while it waits for the first
1894/// BLOCK to return. The REST server handles this internally with a
1895/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1896/// each call its full `block_ms` budget at the server.
1897///
1898/// **Direct SDK callers that need concurrent tails**: either
1899/// (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1900/// pool of clients, rotated by the caller), OR
1901/// (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1902/// only one BLOCK is in flight per client at a time.
1903/// If you need the REST-side backpressure (429 on contention) and the
1904/// built-in serializer, go through the
1905/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1906/// than calling this directly.
1907///
1908/// This SDK does not enforce either pattern — the mutex belongs at the
1909/// application layer, and the connection pool belongs at the SDK
1910/// caller's DI layer; neither has a structured place inside this
1911/// helper.
1912///
1913/// # Timeout handling
1914///
1915/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1916/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1917/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1918/// 500ms)`, overriding the client's default per-call. A tail with
1919/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1920/// the client was built with a shorter `request_timeout`. No custom client
1921/// configuration is required for timeout reasons — only for head-of-line
1922/// isolation above.
1923pub async fn tail_stream(
1924 backend: &dyn EngineBackend,
1925 execution_id: &ExecutionId,
1926 attempt_index: AttemptIndex,
1927 after: StreamCursor,
1928 block_ms: u64,
1929 count_limit: u64,
1930) -> Result<StreamFrames, SdkError> {
1931 if block_ms > MAX_TAIL_BLOCK_MS {
1932 return Err(SdkError::Config {
1933 context: "tail_stream".into(),
1934 field: Some("block_ms".into()),
1935 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1936 });
1937 }
1938 validate_stream_read_count(count_limit)?;
1939 // XREAD does not accept `-` / `+` markers as cursors — reject at
1940 // the SDK boundary with `SdkError::Config` rather than forwarding
1941 // an invalid `-`/`+` into the backend (which would surface as an
1942 // opaque `EngineError::Transport`).
1943 validate_tail_cursor(&after)?;
1944
1945 Ok(backend
1946 .tail_stream(execution_id, attempt_index, after, block_ms, count_limit)
1947 .await?)
1948}
1949
1950#[cfg(test)]
1951mod tail_stream_boundary_tests {
1952 use super::*;
1953
1954 // `validate_tail_cursor` rejects `StreamCursor::Start` and
1955 // `StreamCursor::End` before `tail_stream` touches the client —
1956 // same shape as the `count_limit` guard above. The matching
1957 // full-path rejection on the REST layer is covered by
1958 // `ff-server::api`.
1959
1960 #[test]
1961 fn rejects_start_cursor() {
1962 let err = validate_tail_cursor(&StreamCursor::Start)
1963 .expect_err("Start must be rejected");
1964 match err {
1965 SdkError::Config { field, context, .. } => {
1966 assert_eq!(field.as_deref(), Some("after"));
1967 assert_eq!(context, "tail_stream");
1968 }
1969 other => panic!("expected SdkError::Config, got {other:?}"),
1970 }
1971 }
1972
1973 #[test]
1974 fn rejects_end_cursor() {
1975 let err = validate_tail_cursor(&StreamCursor::End)
1976 .expect_err("End must be rejected");
1977 assert!(matches!(err, SdkError::Config { .. }));
1978 }
1979
1980 #[test]
1981 fn accepts_at_cursor() {
1982 validate_tail_cursor(&StreamCursor::At("0-0".into()))
1983 .expect("At cursor must be accepted");
1984 validate_tail_cursor(&StreamCursor::from_beginning())
1985 .expect("from_beginning() must be accepted");
1986 validate_tail_cursor(&StreamCursor::At("123-0".into()))
1987 .expect("concrete id must be accepted");
1988 }
1989}
1990
1991#[cfg(test)]
1992mod parse_report_usage_result_tests {
1993 use super::*;
1994
1995 /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
1996 /// BulkString and SimpleString uniformly (see
1997 /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
1998 /// `Value::SimpleString(s)` → clone). SimpleString avoids a
1999 /// dev-dependency on `bytes` just for test construction.
2000 fn s(v: &str) -> Result<Value, ferriskey::Error> {
2001 Ok(Value::SimpleString(v.to_owned()))
2002 }
2003
2004 fn int(n: i64) -> Result<Value, ferriskey::Error> {
2005 Ok(Value::Int(n))
2006 }
2007
2008 fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2009 Value::Array(items)
2010 }
2011
2012 #[test]
2013 fn ok_status() {
2014 let raw = arr(vec![int(1), s("OK")]);
2015 assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2016 }
2017
2018 #[test]
2019 fn already_applied_status() {
2020 let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2021 assert_eq!(
2022 parse_report_usage_result(&raw).unwrap(),
2023 ReportUsageResult::AlreadyApplied
2024 );
2025 }
2026
2027 #[test]
2028 fn soft_breach_status() {
2029 let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2030 match parse_report_usage_result(&raw).unwrap() {
2031 ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2032 assert_eq!(dimension, "tokens");
2033 assert_eq!(current_usage, 150);
2034 assert_eq!(soft_limit, 100);
2035 }
2036 other => panic!("expected SoftBreach, got {other:?}"),
2037 }
2038 }
2039
2040 #[test]
2041 fn hard_breach_status() {
2042 let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2043 match parse_report_usage_result(&raw).unwrap() {
2044 ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2045 assert_eq!(dimension, "requests");
2046 assert_eq!(current_usage, 10001);
2047 assert_eq!(hard_limit, 10000);
2048 }
2049 other => panic!("expected HardBreach, got {other:?}"),
2050 }
2051 }
2052
2053 /// Negative case: non-Array input. Guards against a future Lua refactor
2054 /// that accidentally returns a bare string/int — the parser must fail
2055 /// loudly rather than silently succeed or panic.
2056 #[test]
2057 fn non_array_input_is_parse_error() {
2058 let raw = Value::SimpleString("OK".to_owned());
2059 let err = parse_report_usage_result(&raw).unwrap_err();
2060 let msg = format!("{err}");
2061 assert!(
2062 msg.to_lowercase().contains("expected array"),
2063 "error should mention expected shape, got: {msg}"
2064 );
2065 }
2066
2067 /// Negative case: Array whose first element isn't an Int status code.
2068 /// The Lua function's first return slot is always `status_code` (1 on
2069 /// success, an error code otherwise); a non-Int there is a wire-format
2070 /// break that must surface as a parse error.
2071 #[test]
2072 fn first_element_non_int_is_parse_error() {
2073 let raw = arr(vec![s("not_an_int"), s("OK")]);
2074 let err = parse_report_usage_result(&raw).unwrap_err();
2075 let msg = format!("{err}");
2076 assert!(
2077 msg.to_lowercase().contains("int"),
2078 "error should mention Int status code, got: {msg}"
2079 );
2080 }
2081
2082 /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2083 /// field. Guards against the silent-coercion defect cross-review
2084 /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2085 /// which would have surfaced Lua-side wire-format drift as a
2086 /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2087 /// but semantically wrong (a "breach with zero usage" is nonsense
2088 /// and masks the real error).
2089 #[test]
2090 fn soft_breach_non_numeric_current_is_parse_error() {
2091 let raw = arr(vec![
2092 int(1),
2093 s("SOFT_BREACH"),
2094 s("tokens"),
2095 s("not_a_number"), // current_usage — must fail, not coerce to 0
2096 s("100"),
2097 ]);
2098 let err = parse_report_usage_result(&raw).unwrap_err();
2099 let msg = format!("{err}");
2100 assert!(
2101 msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2102 "error should identify sub-status + field, got: {msg}"
2103 );
2104 assert!(
2105 msg.to_lowercase().contains("u64"),
2106 "error should mention the expected type (u64), got: {msg}"
2107 );
2108 }
2109
2110 /// Negative case: HARD_BREACH with the limit slot missing
2111 /// entirely. Same defence as the non-numeric test above: a
2112 /// truncated response must fail loudly rather than coerce to 0.
2113 #[test]
2114 fn hard_breach_missing_limit_is_parse_error() {
2115 let raw = arr(vec![
2116 int(1),
2117 s("HARD_BREACH"),
2118 s("requests"),
2119 s("10001"),
2120 // no index 4 — hard_limit missing
2121 ]);
2122 let err = parse_report_usage_result(&raw).unwrap_err();
2123 let msg = format!("{err}");
2124 assert!(
2125 msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2126 "error should identify sub-status + field, got: {msg}"
2127 );
2128 assert!(
2129 msg.to_lowercase().contains("missing"),
2130 "error should say 'missing', got: {msg}"
2131 );
2132 }
2133}
2134
2135
2136#[cfg(test)]
2137mod resume_signals_tests {
2138 use super::*;
2139
2140 fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2141 pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2142 }
2143
2144 #[test]
2145 fn empty_suspension_returns_none() {
2146 let susp = m(&[]);
2147 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2148 assert!(out.is_none(), "no suspension record → None");
2149 }
2150
2151 #[test]
2152 fn stale_prior_attempt_returns_none() {
2153 let wp = WaitpointId::new();
2154 let susp = m(&[
2155 ("attempt_index", "0"),
2156 ("close_reason", "resumed"),
2157 ("waitpoint_id", &wp.to_string()),
2158 ]);
2159 // Claimed attempt is 1; suspension belongs to 0 → stale.
2160 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2161 assert!(out.is_none(), "attempt_index mismatch → None");
2162 }
2163
2164 #[test]
2165 fn non_resumed_close_returns_none() {
2166 let wp = WaitpointId::new();
2167 for reason in ["timeout", "cancelled", "", "expired"] {
2168 let susp = m(&[
2169 ("attempt_index", "0"),
2170 ("close_reason", reason),
2171 ("waitpoint_id", &wp.to_string()),
2172 ]);
2173 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2174 assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2175 }
2176 }
2177
2178 #[test]
2179 fn resumed_same_attempt_returns_waitpoint() {
2180 let wp = WaitpointId::new();
2181 let susp = m(&[
2182 ("attempt_index", "2"),
2183 ("close_reason", "resumed"),
2184 ("waitpoint_id", &wp.to_string()),
2185 ]);
2186 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2187 assert_eq!(out, Some(wp));
2188 }
2189
2190 #[test]
2191 fn malformed_waitpoint_id_is_error() {
2192 let susp = m(&[
2193 ("attempt_index", "0"),
2194 ("close_reason", "resumed"),
2195 ("waitpoint_id", "not-a-uuid"),
2196 ]);
2197 let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2198 assert!(
2199 format!("{err}").contains("not a valid UUID"),
2200 "error should mention invalid UUID, got: {err}"
2201 );
2202 }
2203
2204 #[test]
2205 fn empty_waitpoint_id_returns_none() {
2206 // Defensive: an empty waitpoint_id field (shouldn't happen on
2207 // resumed records, but guard against partial writes) is None, not an error.
2208 let susp = m(&[
2209 ("attempt_index", "0"),
2210 ("close_reason", "resumed"),
2211 ("waitpoint_id", ""),
2212 ]);
2213 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2214 assert!(out.is_none());
2215 }
2216
2217 // The previous `matched_signal_ids_from_condition` helper was
2218 // removed when the production path switched from unbounded
2219 // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2220 // feedback on unbounded condition-hash reply size). That loop
2221 // is exercised by the integration tests in `ff-test`.
2222}
2223
2224#[cfg(test)]
2225mod terminal_replay_parsing_tests {
2226 //! Unit tests for the SDK's parse path of the enriched
2227 //! `execution_not_active` error returned on a terminal-op replay.
2228 //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2229 //! the Lua side emits the 4-slot detail; these tests prove the
2230 //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2231 //! variant so the reconciler in complete()/fail()/cancel() can
2232 //! match on them.
2233
2234 use super::*;
2235 use ferriskey::Value;
2236
2237 // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2238 // in the test harness. parse_success_result + parse_fail_result handle both.
2239 fn bulk(s: &str) -> Value {
2240 Value::SimpleString(s.to_owned())
2241 }
2242
2243 /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2244 #[test]
2245 fn parse_success_result_extracts_all_four_detail_slots() {
2246 let raw = Value::Array(vec![
2247 Ok(Value::Int(0)),
2248 Ok(bulk("execution_not_active")),
2249 Ok(bulk("success")),
2250 Ok(bulk("42")),
2251 Ok(bulk("terminal")),
2252 Ok(bulk("11111111-1111-1111-1111-111111111111")),
2253 ]);
2254 let err = parse_success_result(&raw, "test").unwrap_err();
2255 let unboxed = match err {
2256 SdkError::Engine(b) => *b,
2257 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2258 };
2259 match unboxed {
2260 crate::EngineError::Contention(
2261 crate::ContentionKind::ExecutionNotActive {
2262 terminal_outcome,
2263 lease_epoch,
2264 lifecycle_phase,
2265 attempt_id,
2266 },
2267 ) => {
2268 assert_eq!(terminal_outcome, "success");
2269 assert_eq!(lease_epoch, "42");
2270 assert_eq!(lifecycle_phase, "terminal");
2271 assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2272 }
2273 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2274 }
2275 }
2276
2277 /// parse_fail_result must extract all detail slots too so the
2278 /// reconciler in fail() can match lifecycle_phase = "runnable" for
2279 /// retry-scheduled replays.
2280 #[test]
2281 fn parse_fail_result_extracts_all_four_detail_slots() {
2282 let raw = Value::Array(vec![
2283 Ok(Value::Int(0)),
2284 Ok(bulk("execution_not_active")),
2285 Ok(bulk("none")),
2286 Ok(bulk("7")),
2287 Ok(bulk("runnable")),
2288 Ok(bulk("22222222-2222-2222-2222-222222222222")),
2289 ]);
2290 let err = parse_fail_result(&raw).unwrap_err();
2291 let unboxed = match err {
2292 SdkError::Engine(b) => *b,
2293 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2294 };
2295 match unboxed {
2296 crate::EngineError::Contention(
2297 crate::ContentionKind::ExecutionNotActive {
2298 terminal_outcome,
2299 lease_epoch,
2300 lifecycle_phase,
2301 attempt_id,
2302 },
2303 ) => {
2304 assert_eq!(terminal_outcome, "none");
2305 assert_eq!(lease_epoch, "7");
2306 assert_eq!(lifecycle_phase, "runnable");
2307 assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2308 }
2309 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2310 }
2311 }
2312
2313 /// Empty detail slots must default to "" (not panic) so older-Lua
2314 /// producers or malformed replies degrade to an unreconcilable
2315 /// variant rather than a Parse error.
2316 #[test]
2317 fn parse_success_result_missing_slots_defaults_to_empty() {
2318 let raw = Value::Array(vec![
2319 Ok(Value::Int(0)),
2320 Ok(bulk("execution_not_active")),
2321 ]);
2322 let err = parse_success_result(&raw, "test").unwrap_err();
2323 let unboxed = match err {
2324 SdkError::Engine(b) => *b,
2325 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2326 };
2327 match unboxed {
2328 crate::EngineError::Contention(
2329 crate::ContentionKind::ExecutionNotActive {
2330 terminal_outcome,
2331 lease_epoch,
2332 lifecycle_phase,
2333 attempt_id,
2334 },
2335 ) => {
2336 assert_eq!(terminal_outcome, "");
2337 assert_eq!(lease_epoch, "");
2338 assert_eq!(lifecycle_phase, "");
2339 assert_eq!(attempt_id, "");
2340 }
2341 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2342 }
2343 }
2344}