ff_sdk/task.rs
1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
3use std::sync::Arc;
4use std::time::Duration;
5
6use ferriskey::{Client, Value};
7use ff_core::backend::{Handle, HandleKind, PendingWaitpoint};
8use ff_core::contracts::ReportUsageResult;
9use ff_core::engine_backend::EngineBackend;
10use ff_core::engine_error::StateKind;
11use ff_script::error::ScriptError;
12use ff_core::partition::PartitionConfig;
13use ff_core::types::*;
14use tokio::sync::{Notify, OwnedSemaphorePermit};
15use tokio::task::JoinHandle;
16
17use crate::SdkError;
18
19// ── Phase 3: Suspend/Signal types ──
20
21// RFC-013 Stage 1d — `TimeoutBehavior`, `SuspendOutcome`,
22// `ResumeCondition`, `SignalMatcher`, `ResumePolicy`,
23// `SuspensionReasonCode`, `SuspensionRequester`, `WaitpointBinding`,
24// `SuspendArgs`, `SuspendOutcomeDetails`, `IdempotencyKey`,
25// `CompositeBody` all live in `ff_core::contracts`. The `SuspendOutcome`
26// re-export preserves the `ff_sdk::SuspendOutcome` path; the other
27// paths are new.
28pub use ff_core::contracts::{
29 CompositeBody, CountKind, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget,
30 SignalMatcher, SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
31 SuspensionRequester, TimeoutBehavior, WaitpointBinding,
32};
33
34/// RFC-013 Stage 1d — cookie returned by the strict `suspend` wrapper.
35///
36/// The returned `handle` is a `HandleKind::Suspended` cookie the caller
37/// uses for `observe_signals` and the eventual re-claim via
38/// `claim_resumed_execution`. Consuming `ClaimedTask::suspend`
39/// invalidates the pre-suspend `ClaimedTask` (already moved by the
40/// `self: ClaimedTask` receiver); this handle is the only live cookie
41/// left on the caller side.
42#[derive(Debug)]
43pub struct SuspendedHandle {
44 pub handle: Handle,
45 pub details: SuspendOutcomeDetails,
46}
47
48/// RFC-013 Stage 1d — outcome of the fallible `try_suspend` wrapper.
49///
50/// Unlike the strict wrapper, `AlreadySatisfied` hands the `ClaimedTask`
51/// back to the caller so work continues on the retained lease.
52#[allow(clippy::large_enum_variant)]
53pub enum TrySuspendOutcome {
54 Suspended(SuspendedHandle),
55 AlreadySatisfied {
56 task: ClaimedTask,
57 details: SuspendOutcomeDetails,
58 },
59}
60
61impl std::fmt::Debug for TrySuspendOutcome {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 Self::Suspended(h) => f.debug_tuple("Suspended").field(h).finish(),
65 Self::AlreadySatisfied { details, .. } => f
66 .debug_struct("AlreadySatisfied")
67 .field("details", details)
68 .finish_non_exhaustive(),
69 }
70 }
71}
72
73/// A signal to deliver to a suspended execution's waitpoint.
74#[derive(Clone, Debug)]
75pub struct Signal {
76 pub signal_name: String,
77 pub signal_category: String,
78 pub payload: Option<Vec<u8>>,
79 pub source_type: String,
80 pub source_identity: String,
81 pub idempotency_key: Option<String>,
82 /// HMAC token issued when the waitpoint was created. Required for
83 /// authenticated signal delivery (RFC-004 §Waitpoint Security).
84 pub waitpoint_token: WaitpointToken,
85}
86
87/// Outcome of `deliver_signal()`.
88#[derive(Clone, Debug, PartialEq, Eq)]
89pub enum SignalOutcome {
90 /// Signal accepted, appended to waitpoint but condition not yet satisfied.
91 Accepted { signal_id: SignalId, effect: String },
92 /// Signal triggered resume — execution is now runnable.
93 TriggeredResume { signal_id: SignalId },
94 /// Duplicate signal (idempotency key matched).
95 Duplicate { existing_signal_id: String },
96}
97
98impl SignalOutcome {
99 /// Parse a raw `ff_deliver_signal` FCALL result into a `SignalOutcome`.
100 ///
101 /// Consuming packages that call `ff_deliver_signal` directly can use this
102 /// to interpret the Lua return value without depending on SDK internals.
103 pub fn from_fcall_value(raw: &Value) -> Result<Self, SdkError> {
104 parse_signal_result(raw)
105 }
106}
107
108/// A signal that triggered a resume, readable by a worker after re-claim.
109///
110/// **RFC-012 Stage 0:** the canonical definition has moved to
111/// [`ff_core::backend::ResumeSignal`]. This `pub use` shim preserves the
112/// `ff_sdk::task::ResumeSignal` path (and, via `ff_sdk::lib`'s re-export,
113/// `ff_sdk::ResumeSignal`) through the 0.4.x window. The shim is
114/// scheduled for removal in 0.5.0.
115pub use ff_core::backend::ResumeSignal;
116
117/// Outcome of `append_frame()`.
118///
119/// **RFC-012 §R7.2.1:** canonical definition moved to
120/// [`ff_core::backend::AppendFrameOutcome`]; this `pub use` shim
121/// preserves the `ff_sdk::task::AppendFrameOutcome` path through the
122/// 0.4.x window. Existing consumers construct + match exhaustively
123/// without change. The derive set widened from `Clone, Debug` to
124/// `Clone, Debug, PartialEq, Eq` matching the `FailOutcome` precedent.
125pub use ff_core::backend::AppendFrameOutcome;
126
127/// Outcome of a `fail()` call.
128///
129/// **RFC-012 Stage 1a:** canonical definition moved to
130/// [`ff_core::backend::FailOutcome`]; this `pub use` shim preserves
131/// the `ff_sdk::task::FailOutcome` path (and the `ff_sdk::FailOutcome`
132/// re-export in `lib.rs`) through the 0.4.x window. Existing
133/// consumers construct + match exhaustively without change.
134pub use ff_core::backend::FailOutcome;
135
136/// A claimed execution with an active lease. The worker processes this task
137/// and must call one of `complete()`, `fail()`, or `cancel()` when done.
138///
139/// The lease is automatically renewed in the background at `lease_ttl / 3`
140/// intervals. Renewal stops when the task is consumed or dropped.
141///
142/// `complete`, `fail`, and `cancel` consume `self` — this prevents
143/// double-complete bugs at the type level.
144pub struct ClaimedTask {
145 /// Shared Valkey client.
146 #[allow(dead_code)]
147 client: Client,
148 /// `EngineBackend` the trait-migrated ops forward through.
149 ///
150 /// **RFC-012 Stage 1b + Round-7.** Today this is always a
151 /// `ValkeyBackend` wrapping `client`. Most per-task ops
152 /// (`complete`/`fail`/`cancel`/`delay_execution`/
153 /// `move_to_waiting_children`/`update_progress`/`resume_signals`/
154 /// `create_pending_waitpoint`/`append_frame`/`report_usage`) route
155 /// through `backend.*()`. `suspend` still uses
156 /// `client.fcall("ff_suspend_execution", ...)` directly — this is
157 /// the deferred suspend per RFC-012 §R7.6.1, pending Stage 1d
158 /// input-shape work. Lease renewal goes through
159 /// `backend.renew(&handle)`. Round-7 (#135/#145) closed the four
160 /// trait-shape gaps tracked by #117.
161 ///
162 /// Stage 1c migrates the FlowFabricWorker hot paths (claim,
163 /// deliver_signal) through the same trait surface; Stage 1d
164 /// refactors this struct to carry a single `Handle` rather than
165 /// synthesising one per op via `synth_handle`.
166 backend: Arc<dyn EngineBackend>,
167 /// Partition config for key construction.
168 #[allow(dead_code)]
169 partition_config: PartitionConfig,
170 /// Execution identity.
171 execution_id: ExecutionId,
172 /// Current attempt.
173 attempt_index: AttemptIndex,
174 attempt_id: AttemptId,
175 /// Lease identity.
176 lease_id: LeaseId,
177 lease_epoch: LeaseEpoch,
178 /// Lease timing.
179 lease_ttl_ms: u64,
180 /// Lane used at claim time.
181 lane_id: LaneId,
182 /// Worker instance that holds this lease (for index cleanup keys).
183 worker_instance_id: WorkerInstanceId,
184 /// Execution data.
185 input_payload: Vec<u8>,
186 execution_kind: String,
187 tags: HashMap<String, String>,
188 /// Background renewal task handle.
189 renewal_handle: JoinHandle<()>,
190 /// Signal to stop renewal (used before consuming self).
191 renewal_stop: Arc<Notify>,
192 /// Consecutive lease renewal failures. Shared with the background renewal
193 /// task. Reset to 0 on each successful renewal. Workers should check
194 /// `is_lease_healthy()` before committing expensive side effects.
195 renewal_failures: Arc<AtomicU32>,
196 /// Set to `true` by `stop_renewal()` after a terminal op's FCALL
197 /// response is received, just before `self` is consumed into `Drop`.
198 /// `Drop` reads this instead of `renewal_handle.is_finished()` to
199 /// suppress the false-positive "dropped without terminal operation"
200 /// warning: after `notify_one`, the renewal task has not yet been
201 /// polled by the runtime, so `is_finished()` is still `false` on the
202 /// happy path when self is being consumed.
203 ///
204 /// Note: the flag is set for any terminal-op path that reaches
205 /// `stop_renewal()`, which includes Lua-level script errors (the
206 /// FCALL returned a `{0, "error", ...}` payload). That is intentional:
207 /// the caller already receives the `Err` via the op's return value,
208 /// so an additional `Drop` warning would be noise. The warning is
209 /// reserved for genuine drop-without-terminal-op cases (panic, early
210 /// return, transport failure before stop_renewal ran).
211 terminal_op_called: AtomicBool,
212 /// Concurrency permit from the worker's semaphore. Held for the lifetime
213 /// of the task; released on complete/fail/cancel/drop.
214 _concurrency_permit: Option<OwnedSemaphorePermit>,
215}
216
217impl ClaimedTask {
218 /// Construct a `ClaimedTask` from the results of a successful
219 /// `ff_claim_execution` or `ff_claim_resumed_execution` FCALL.
220 ///
221 /// # Arguments
222 ///
223 /// * `client` — shared Valkey client used for subsequent lease
224 /// renewals, signal delivery, and the final
225 /// complete/fail/cancel FCALL.
226 /// * `partition_config` — partition topology snapshot read at
227 /// `FlowFabricWorker::connect`. Used for key construction on
228 /// the lifetime of this task.
229 /// * `execution_id` — the claimed execution's UUID.
230 /// * `attempt_index` / `attempt_id` — current attempt identity.
231 /// `attempt_index` is 0 on a fresh claim, preserved on a
232 /// resumed claim.
233 /// * `lease_id` / `lease_epoch` — lease identity. `lease_epoch`
234 /// is returned by the Lua FCALL and bumped on each resumed
235 /// claim.
236 /// * `lease_ttl_ms` — lease TTL used to schedule the background
237 /// renewal task (renews at `lease_ttl_ms / 3`).
238 /// * `lane_id` — the lane the task was claimed on. Used for
239 /// index key construction (`lane_active`, `lease_expiry`,
240 /// etc.).
241 /// * `worker_instance_id` — this worker's identity. Used to
242 /// resolve `worker_leases` index entries during renewal and
243 /// completion.
244 /// * `input_payload` / `execution_kind` / `tags` — pre-read
245 /// execution metadata, exposed via getters so the worker
246 /// doesn't round-trip to Valkey to inspect what it just
247 /// claimed.
248 ///
249 /// # Invariant: constructor is `pub(crate)` on purpose
250 ///
251 /// **External callers cannot construct a `ClaimedTask`** — only
252 /// the in-crate claim entry points (`claim_next`,
253 /// `claim_from_grant`, `claim_from_reclaim_grant`) may. This is
254 /// load-bearing: those entry points are the ONLY sites that
255 /// acquire a permit from the worker's concurrency semaphore
256 /// (via `FlowFabricWorker::concurrency_semaphore`) and attach
257 /// it through `ClaimedTask::set_concurrency_permit` before
258 /// returning the task. Promoting `new` to `pub` would let
259 /// consumers build tasks that bypass the concurrency contract
260 /// the worker's `max_concurrent_tasks` config advertises — the
261 /// returned task would run alongside other in-flight work
262 /// without debiting the permit bank, and the
263 /// complete/fail/cancel/drop path would have nothing to
264 /// release.
265 ///
266 /// If an external callsite genuinely needs to rehydrate a
267 /// task from saved FCALL results, the right answer is a new
268 /// `FlowFabricWorker` entry point that wraps `new` + acquires a
269 /// permit — not promoting this constructor.
270 #[allow(clippy::too_many_arguments)]
271 pub(crate) fn new(
272 client: Client,
273 backend: Arc<dyn EngineBackend>,
274 partition_config: PartitionConfig,
275 execution_id: ExecutionId,
276 attempt_index: AttemptIndex,
277 attempt_id: AttemptId,
278 lease_id: LeaseId,
279 lease_epoch: LeaseEpoch,
280 lease_ttl_ms: u64,
281 lane_id: LaneId,
282 worker_instance_id: WorkerInstanceId,
283 input_payload: Vec<u8>,
284 execution_kind: String,
285 tags: HashMap<String, String>,
286 ) -> Self {
287 let renewal_stop = Arc::new(Notify::new());
288 let renewal_failures = Arc::new(AtomicU32::new(0));
289
290 // Stage 1b: the renewal task forwards through `backend.renew(&handle)`.
291 // Build the handle once at construction time and hand both Arc clones
292 // into the spawned task.
293 let renewal_handle_cookie = ff_backend_valkey::ValkeyBackend::encode_handle(
294 execution_id.clone(),
295 attempt_index,
296 attempt_id.clone(),
297 lease_id.clone(),
298 lease_epoch,
299 lease_ttl_ms,
300 lane_id.clone(),
301 worker_instance_id.clone(),
302 HandleKind::Fresh,
303 );
304 let renewal_handle = spawn_renewal_task(
305 backend.clone(),
306 renewal_handle_cookie,
307 execution_id.clone(),
308 lease_ttl_ms,
309 renewal_stop.clone(),
310 renewal_failures.clone(),
311 );
312
313 Self {
314 client,
315 backend,
316 partition_config,
317 execution_id,
318 attempt_index,
319 attempt_id,
320 lease_id,
321 lease_epoch,
322 lease_ttl_ms,
323 lane_id,
324 worker_instance_id,
325 input_payload,
326 execution_kind,
327 tags,
328 renewal_handle,
329 renewal_stop,
330 renewal_failures,
331 terminal_op_called: AtomicBool::new(false),
332 _concurrency_permit: None,
333 }
334 }
335
336 /// Attach a concurrency permit from the worker's semaphore.
337 /// The permit is held for the task's lifetime and released on drop.
338 #[allow(dead_code)]
339 pub(crate) fn set_concurrency_permit(&mut self, permit: OwnedSemaphorePermit) {
340 self._concurrency_permit = Some(permit);
341 }
342
343 // ── Accessors ──
344
345 pub fn execution_id(&self) -> &ExecutionId {
346 &self.execution_id
347 }
348
349 pub fn attempt_index(&self) -> AttemptIndex {
350 self.attempt_index
351 }
352
353 pub fn attempt_id(&self) -> &AttemptId {
354 &self.attempt_id
355 }
356
357 pub fn lease_id(&self) -> &LeaseId {
358 &self.lease_id
359 }
360
361 pub fn lease_epoch(&self) -> LeaseEpoch {
362 self.lease_epoch
363 }
364
365 pub fn input_payload(&self) -> &[u8] {
366 &self.input_payload
367 }
368
369 pub fn execution_kind(&self) -> &str {
370 &self.execution_kind
371 }
372
373 pub fn tags(&self) -> &HashMap<String, String> {
374 &self.tags
375 }
376
377 pub fn lane_id(&self) -> &LaneId {
378 &self.lane_id
379 }
380
381 /// Read frames from this task's attempt stream.
382 ///
383 /// Forwards through the task's `Arc<dyn EngineBackend>` — consumers
384 /// no longer need the `ferriskey::Client` leak (#87). See
385 /// [`EngineBackend::read_stream`](ff_core::engine_backend::EngineBackend::read_stream)
386 /// for the backend-level contract.
387 ///
388 /// Validation (count_limit bounds) runs at this boundary — out-of-range
389 /// input surfaces as [`SdkError::Config`] before reaching the backend.
390 pub async fn read_stream(
391 &self,
392 from: StreamCursor,
393 to: StreamCursor,
394 count_limit: u64,
395 ) -> Result<StreamFrames, SdkError> {
396 validate_stream_read_count(count_limit)?;
397 Ok(self
398 .backend
399 .read_stream(&self.execution_id, self.attempt_index, from, to, count_limit)
400 .await?)
401 }
402
403 /// Tail this task's attempt stream for new frames.
404 ///
405 /// See [`EngineBackend::tail_stream`](ff_core::engine_backend::EngineBackend::tail_stream)
406 /// for the head-of-line warning — the task's backend is shared
407 /// with claim/complete/fail hot paths. Consumers that need
408 /// isolation should build a dedicated `EngineBackend` for tail
409 /// reads.
410 pub async fn tail_stream(
411 &self,
412 after: StreamCursor,
413 block_ms: u64,
414 count_limit: u64,
415 ) -> Result<StreamFrames, SdkError> {
416 self.tail_stream_with_visibility(
417 after,
418 block_ms,
419 count_limit,
420 ff_core::backend::TailVisibility::All,
421 )
422 .await
423 }
424
425 /// Tail with an explicit [`TailVisibility`](ff_core::backend::TailVisibility)
426 /// filter (RFC-015 §6). Use [`TailVisibility::ExcludeBestEffort`]
427 /// (ff_core::backend::TailVisibility::ExcludeBestEffort) to drop
428 /// [`StreamMode::BestEffortLive`](ff_core::backend::StreamMode::BestEffortLive)
429 /// frames server-side.
430 pub async fn tail_stream_with_visibility(
431 &self,
432 after: StreamCursor,
433 block_ms: u64,
434 count_limit: u64,
435 visibility: ff_core::backend::TailVisibility,
436 ) -> Result<StreamFrames, SdkError> {
437 if block_ms > MAX_TAIL_BLOCK_MS {
438 return Err(SdkError::Config {
439 context: "tail_stream".into(),
440 field: Some("block_ms".into()),
441 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
442 });
443 }
444 validate_stream_read_count(count_limit)?;
445 validate_tail_cursor(&after)?;
446 Ok(self
447 .backend
448 .tail_stream(
449 &self.execution_id,
450 self.attempt_index,
451 after,
452 block_ms,
453 count_limit,
454 visibility,
455 )
456 .await?)
457 }
458
459 /// Synthesise a Valkey-backend `Handle` encoding this task's
460 /// attempt-cookie fields. Private to the SDK — the trait
461 /// forwarders call this per op to produce the `&Handle` argument
462 /// the `EngineBackend` methods take.
463 ///
464 /// **RFC-012 Stage 1b option A.** Refactoring `ClaimedTask` to
465 /// carry one cached `Handle` instead of synthesising per op is
466 /// deferred to Stage 1d (issue #89 follow-up). The per-op cost is
467 /// a single allocation of a small byte buffer — negligible
468 /// compared to the FCALL round-trip that follows.
469 ///
470 /// Kind is always `HandleKind::Fresh` because today the 9
471 /// Stage-1b ops all treat the backend tag + opaque bytes as
472 /// authoritative; they do not dispatch on kind. Stage 1d routes
473 /// resumed-claim callers through a `Resumed` synth.
474 fn synth_handle(&self) -> Handle {
475 ff_backend_valkey::ValkeyBackend::encode_handle(
476 self.execution_id.clone(),
477 self.attempt_index,
478 self.attempt_id.clone(),
479 self.lease_id.clone(),
480 self.lease_epoch,
481 self.lease_ttl_ms,
482 self.lane_id.clone(),
483 self.worker_instance_id.clone(),
484 HandleKind::Fresh,
485 )
486 }
487
488 /// Check if the lease is likely still valid based on renewal success.
489 ///
490 /// Returns `false` if 3 or more consecutive renewal attempts have failed.
491 /// Workers should check this before committing expensive or irreversible
492 /// side effects. A `false` return means Valkey may have already expired
493 /// the lease and another worker could be processing this execution.
494 pub fn is_lease_healthy(&self) -> bool {
495 self.renewal_failures.load(Ordering::Relaxed) < 3
496 }
497
498 /// Number of consecutive lease renewal failures since the last success.
499 ///
500 /// Returns 0 when renewals are working normally. Useful for observability
501 /// and custom health policies beyond the default threshold of 3.
502 pub fn consecutive_renewal_failures(&self) -> u32 {
503 self.renewal_failures.load(Ordering::Relaxed)
504 }
505
506 // ── Terminal operations (consume self) ──
507
508 /// Delay the execution until `delay_until`.
509 ///
510 /// Releases the lease. The execution moves to `delayed` state.
511 /// Consumes self — the task cannot be used after delay.
512 pub async fn delay_execution(self, delay_until: TimestampMs) -> Result<(), SdkError> {
513 // RFC-012 Stage 1b: forwards through `backend.delay`. Matches
514 // the pre-migration `stop_renewal` contract: called only when
515 // the FCALL round-tripped (Ok or a typed Lua/engine error);
516 // raw transport errors bubble up without stopping renewal so
517 // the `Drop` warning still fires if the caller later drops
518 // `self` without retrying.
519 let handle = self.synth_handle();
520 let out = self.backend.delay(&handle, delay_until).await;
521 if fcall_landed(&out) {
522 self.stop_renewal();
523 }
524 out.map_err(SdkError::from)
525 }
526
527 /// Move execution to waiting_children state.
528 ///
529 /// Releases the lease. The execution waits for child dependencies to complete.
530 /// Consumes self.
531 pub async fn move_to_waiting_children(self) -> Result<(), SdkError> {
532 // RFC-012 Stage 1b: forwards through `backend.wait_children`.
533 // See `delay_execution` for the stop_renewal contract.
534 let handle = self.synth_handle();
535 let out = self.backend.wait_children(&handle).await;
536 if fcall_landed(&out) {
537 self.stop_renewal();
538 }
539 out.map_err(SdkError::from)
540 }
541
542 /// Complete the execution successfully.
543 ///
544 /// Calls `ff_complete_execution` via FCALL, then stops lease renewal.
545 /// Renewal continues during the FCALL to prevent lease expiry under
546 /// network latency — the Lua fences on lease_id+epoch atomically.
547 /// Consumes self — the task cannot be used after completion.
548 ///
549 /// # Connection errors and replay
550 ///
551 /// If the Valkey connection drops after the Lua commit but before the
552 /// client reads the response, retrying `complete()` with the same
553 /// `ClaimedTask` (same `lease_epoch` + `attempt_id`) is safe: the SDK
554 /// reconciles the "already terminal with matching outcome" response
555 /// into `Ok(())`. A retry after a different terminal op has raced in
556 /// (e.g. operator cancel) surfaces as `ExecutionNotActive` with the
557 /// populated `terminal_outcome` so the caller can see what actually
558 /// happened.
559 pub async fn complete(self, result_payload: Option<Vec<u8>>) -> Result<(), SdkError> {
560 // RFC-012 Stage 1b: forwards through `backend.complete`.
561 // The FCALL body + the `ExecutionNotActive` replay reconciliation
562 // (match same epoch + attempt_id + outcome=="success" → Ok)
563 // moved to `ff_backend_valkey::complete_impl`.
564 let handle = self.synth_handle();
565 let out = self.backend.complete(&handle, result_payload).await;
566 if fcall_landed(&out) {
567 self.stop_renewal();
568 }
569 out.map_err(SdkError::from)
570 }
571
572 /// Fail the execution with a reason and error category.
573 ///
574 /// If the execution policy allows retries, the engine schedules a retry
575 /// (delayed backoff). Otherwise, the execution becomes terminal failed.
576 /// Returns [`FailOutcome`] so the caller knows what happened.
577 ///
578 /// # Connection errors and replay
579 ///
580 /// `fail()` is replay-safe under the same conditions as `complete()`:
581 /// a retry by the same caller matching `lease_epoch` + `attempt_id` is
582 /// reconciled into the outcome the server actually committed
583 /// (`TerminalFailed` if no retries left; `RetryScheduled` with
584 /// `delay_until = 0` if a retry was scheduled — the exact delay is
585 /// not recovered on the replay path).
586 pub async fn fail(
587 self,
588 reason: &str,
589 error_category: &str,
590 ) -> Result<FailOutcome, SdkError> {
591 // RFC-012 Stage 1b: forwards through `backend.fail`.
592 // The FCALL body + retry-policy read + the two-shape replay
593 // reconciliation (terminal/failed → TerminalFailed, runnable
594 // → RetryScheduled{0}) moved to
595 // `ff_backend_valkey::fail_impl`.
596 //
597 // **Preserving the caller's raw category string.** The
598 // pre-Stage-1b code passed `error_category` straight to the
599 // Lua `failure_category` ARGV. Real callers use arbitrary
600 // lower_snake_case strings (`"lease_stale"`, `"bad_signal"`,
601 // `"inference_error"`, …) that do not correspond to the 5
602 // named `FailureClass` variants. A lossy enum conversion
603 // would rewrite every non-canonical category to
604 // `Transient` — a real behavior change for
605 // ff-readiness-tests + the examples crates.
606 //
607 // Instead: pack the raw category bytes into
608 // `FailureReason.detail`; `fail_impl` reads them back and
609 // threads them to Lua verbatim. The `classification` enum
610 // is derived from the string for the few consumers who
611 // match on the trait return today (none in-workspace).
612 // Stage 1d (or issue #117) widens `FailureClass` with a
613 // `Custom(String)` arm; this stash carrier retires then.
614 let handle = self.synth_handle();
615 let failure_reason = ff_core::backend::FailureReason::with_detail(
616 reason.to_owned(),
617 error_category.as_bytes().to_vec(),
618 );
619 let classification = error_category_to_class(error_category);
620 let out = self.backend.fail(&handle, failure_reason, classification).await;
621 if fcall_landed(&out) {
622 self.stop_renewal();
623 }
624 out.map_err(SdkError::from)
625 }
626
627 /// Cancel the execution.
628 ///
629 /// Stops lease renewal, then calls `ff_cancel_execution` via FCALL.
630 /// Consumes self.
631 ///
632 /// # Connection errors and replay
633 ///
634 /// `cancel()` is replay-safe under the same conditions as `complete()`:
635 /// a retry by the same caller matching `lease_epoch` + `attempt_id`
636 /// returns `Ok(())` if the server's stored `terminal_outcome` is
637 /// `cancelled`. A retry that finds a different outcome (because a
638 /// concurrent `complete()` or `fail()` won the race) surfaces as
639 /// `ExecutionNotActive` with the populated `terminal_outcome` so the
640 /// caller can see that the cancel intent was NOT honored.
641 pub async fn cancel(self, reason: &str) -> Result<(), SdkError> {
642 // RFC-012 Stage 1b: forwards through `backend.cancel`.
643 // The 21-KEYS FCALL body, the `current_waitpoint_id` pre-read,
644 // and the `ExecutionNotActive` → outcome=="cancelled" replay
645 // reconciliation all moved to `ff_backend_valkey::cancel_impl`.
646 let handle = self.synth_handle();
647 let out = self.backend.cancel(&handle, reason).await;
648 if fcall_landed(&out) {
649 self.stop_renewal();
650 }
651 out.map_err(SdkError::from)
652 }
653
654 // ── Non-terminal operations ──
655
656 /// Manually renew the lease.
657 ///
658 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
659 /// trait. The background renewal task calls
660 /// `backend.renew(&handle)` directly (see `spawn_renewal_task`);
661 /// this method is the public entry point for workers that want
662 /// to force a renew out-of-band.
663 pub async fn renew_lease(&self) -> Result<(), SdkError> {
664 let handle = self.synth_handle();
665 self.backend
666 .renew(&handle)
667 .await
668 .map(|_renewal| ())
669 .map_err(SdkError::from)
670 }
671
672 /// Update progress (pct 0-100 and optional message).
673 ///
674 /// **RFC-012 Stage 1b.** Forwards through the `EngineBackend`
675 /// trait (`backend.progress(&handle, Some(pct), Some(msg))`).
676 ///
677 /// # Not for stream frames
678 ///
679 /// `update_progress` writes the `progress_percent` / `progress_message`
680 /// fields on `exec_core` (the execution's state hash). It is a
681 /// scalar heartbeat — each call overwrites the previous value and
682 /// nothing is appended to the output stream. Producers emitting
683 /// stream frames (arbitrary `frame_type` + payload, consumed via
684 /// `ClaimedTask::read_stream` / `tail_stream` or the HTTP
685 /// stream-tail routes) MUST use [`Self::append_frame`] instead;
686 /// `update_progress` is invisible to stream consumers.
687 pub async fn update_progress(&self, pct: u8, message: &str) -> Result<(), SdkError> {
688 let handle = self.synth_handle();
689 self.backend
690 .progress(&handle, Some(pct), Some(message.to_owned()))
691 .await
692 .map_err(SdkError::from)
693 }
694
695 /// Report usage against a budget and check limits.
696 ///
697 /// Non-consuming — the worker can report usage multiple times.
698 /// `dimensions` is a slice of `(dimension_name, delta)` pairs.
699 /// `dedup_key` prevents double-counting on retries (auto-prefixed with budget hash tag).
700 ///
701 /// **RFC-012 §R7.2.3:** forwards through
702 /// [`EngineBackend::report_usage`](ff_core::engine_backend::EngineBackend::report_usage).
703 /// The trait's `UsageDimensions.dedup_key` carries the raw key;
704 /// the backend impl applies the `usage_dedup_key(hash_tag, k)`
705 /// wrap so the dedup state co-locates with the budget partition
706 /// (PR #108).
707 pub async fn report_usage(
708 &self,
709 budget_id: &BudgetId,
710 dimensions: &[(&str, u64)],
711 dedup_key: Option<&str>,
712 ) -> Result<ReportUsageResult, SdkError> {
713 let handle = self.synth_handle();
714 let mut dims = ff_core::backend::UsageDimensions::default();
715 for (name, delta) in dimensions {
716 dims.custom.insert((*name).to_owned(), *delta);
717 }
718 dims.dedup_key = dedup_key
719 .filter(|k| !k.is_empty())
720 .map(|k| k.to_owned());
721 self.backend
722 .report_usage(&handle, budget_id, dims)
723 .await
724 .map_err(SdkError::from)
725 }
726
727 /// Create a pending waitpoint for future signal delivery.
728 ///
729 /// Non-consuming — the worker keeps the lease. Signals delivered to the
730 /// waitpoint are buffered. When the worker later calls `suspend()` with
731 /// `use_pending_waitpoint`, buffered signals may immediately satisfy the
732 /// resume condition.
733 ///
734 /// Returns both the waitpoint_id AND the HMAC token required by external
735 /// callers to buffer signals against this pending waitpoint
736 /// (RFC-004 §Waitpoint Security).
737 ///
738 /// **RFC-012 §R7.2.2:** forwards through
739 /// [`EngineBackend::create_waitpoint`](ff_core::engine_backend::EngineBackend::create_waitpoint).
740 /// The trait returns
741 /// [`PendingWaitpoint`](ff_core::backend::PendingWaitpoint) whose
742 /// `hmac_token` is the same wire HMAC this method has always
743 /// produced; the SDK unwraps it back to the historical
744 /// `(WaitpointId, WaitpointToken)` tuple for caller-shape parity.
745 pub async fn create_pending_waitpoint(
746 &self,
747 waitpoint_key: &str,
748 expires_in_ms: u64,
749 ) -> Result<(WaitpointId, WaitpointToken), SdkError> {
750 let handle = self.synth_handle();
751 let expires_in = std::time::Duration::from_millis(expires_in_ms);
752 let pending = self
753 .backend
754 .create_waitpoint(&handle, waitpoint_key, expires_in)
755 .await
756 .map_err(SdkError::from)?;
757 // `WaitpointHmac` wraps the canonical `WaitpointToken`; the
758 // `.token()` accessor borrows the underlying
759 // `WaitpointToken`, which is the caller's historical return
760 // shape.
761 Ok((pending.waitpoint_id, pending.hmac_token.token().clone()))
762 }
763
764 // ── Phase 4: Streaming ──
765
766 /// Append a frame to the current attempt's output stream.
767 ///
768 /// Non-consuming — the worker can append many frames during execution.
769 /// The stream is created lazily on the first append.
770 ///
771 /// **RFC-012 §R7.2.1 / PR #146:** forwards through the
772 /// `EngineBackend` trait. The free-form `frame_type` tag and
773 /// optional `metadata` (wire `correlation_id`) travel on the
774 /// extended [`ff_core::backend::Frame`] shape (`frame_type:
775 /// String`, `correlation_id: Option<String>`), giving byte-for-byte
776 /// wire parity with the pre-migration direct-FCALL path.
777 ///
778 /// # `append_frame` vs [`Self::update_progress`]
779 ///
780 /// Stream-frame producers (arbitrary `frame_type` + payload
781 /// consumed via `ClaimedTask::read_stream` / `tail_stream` or the
782 /// HTTP stream-tail routes) MUST use `append_frame`.
783 /// `update_progress` writes scalar `progress_percent` /
784 /// `progress_message` fields to `exec_core` and is invisible to
785 /// stream consumers.
786 pub async fn append_frame(
787 &self,
788 frame_type: &str,
789 payload: &[u8],
790 metadata: Option<&str>,
791 ) -> Result<AppendFrameOutcome, SdkError> {
792 self.append_frame_with_mode(
793 frame_type,
794 payload,
795 metadata,
796 ff_core::backend::StreamMode::Durable,
797 )
798 .await
799 }
800
801 /// Append a frame under an explicit RFC-015
802 /// [`StreamMode`](ff_core::backend::StreamMode).
803 /// Defaults via [`Self::append_frame`] preserve pre-015 behaviour
804 /// ([`StreamMode::Durable`](ff_core::backend::StreamMode::Durable)).
805 pub async fn append_frame_with_mode(
806 &self,
807 frame_type: &str,
808 payload: &[u8],
809 metadata: Option<&str>,
810 mode: ff_core::backend::StreamMode,
811 ) -> Result<AppendFrameOutcome, SdkError> {
812 let handle = self.synth_handle();
813 let mut frame = ff_core::backend::Frame::new(
814 payload.to_vec(),
815 ff_core::backend::FrameKind::Event,
816 )
817 .with_frame_type(frame_type)
818 .with_mode(mode);
819 if let Some(cid) = metadata {
820 frame = frame.with_correlation_id(cid);
821 }
822 self.backend
823 .append_frame(&handle, frame)
824 .await
825 .map_err(SdkError::from)
826 }
827
828 // ── RFC-013 Stage 1d: Suspend ──
829
830 /// **Strict suspend.** Consumes `self` and yields a
831 /// [`SuspendedHandle`] or errors. On the early-satisfied path
832 /// (buffered signals already matched the condition), returns
833 /// `EngineError::State(AlreadySatisfied)` — use `try_suspend` when
834 /// that branch is part of the flow's happy path.
835 ///
836 /// RFC-013 §5.1 — this is the classic Rust `foo` / `try_foo`
837 /// split; `suspend` is the unconditional form. Defaults
838 /// `WaitpointBinding::fresh()` for the waitpoint. For pending
839 /// waitpoints previously issued via `create_pending_waitpoint`,
840 /// use `try_suspend_on_pending`.
841 pub async fn suspend(
842 self,
843 reason_code: SuspensionReasonCode,
844 resume_condition: ResumeCondition,
845 timeout: Option<(TimestampMs, TimeoutBehavior)>,
846 resume_policy: ResumePolicy,
847 ) -> Result<SuspendedHandle, SdkError> {
848 let outcome = self
849 .try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
850 .await?;
851 match outcome {
852 TrySuspendOutcome::Suspended(h) => Ok(h),
853 TrySuspendOutcome::AlreadySatisfied { .. } => Err(SdkError::from(
854 crate::EngineError::State(StateKind::AlreadySatisfied),
855 )),
856 }
857 }
858
859 /// **Fallible suspend.** On the early-satisfied path, the
860 /// `ClaimedTask` is handed back unchanged so the worker can
861 /// continue running against the retained lease.
862 ///
863 /// RFC-013 §5.1 — use this when consuming a pending waitpoint whose
864 /// buffered signals may already match the condition.
865 pub async fn try_suspend(
866 self,
867 reason_code: SuspensionReasonCode,
868 resume_condition: ResumeCondition,
869 timeout: Option<(TimestampMs, TimeoutBehavior)>,
870 resume_policy: ResumePolicy,
871 ) -> Result<TrySuspendOutcome, SdkError> {
872 self.try_suspend_inner(WaitpointBinding::fresh(), reason_code, resume_condition, timeout, resume_policy)
873 .await
874 }
875
876 /// Convenience: `try_suspend` against a pending waitpoint previously
877 /// issued via `create_pending_waitpoint`.
878 pub async fn try_suspend_on_pending(
879 self,
880 pending: &PendingWaitpoint,
881 reason_code: SuspensionReasonCode,
882 resume_condition: ResumeCondition,
883 timeout: Option<(TimestampMs, TimeoutBehavior)>,
884 resume_policy: ResumePolicy,
885 ) -> Result<TrySuspendOutcome, SdkError> {
886 self.try_suspend_inner(
887 WaitpointBinding::use_pending(pending),
888 reason_code,
889 resume_condition,
890 timeout,
891 resume_policy,
892 )
893 .await
894 }
895
896 async fn try_suspend_inner(
897 self,
898 waitpoint: WaitpointBinding,
899 reason_code: SuspensionReasonCode,
900 resume_condition: ResumeCondition,
901 timeout: Option<(TimestampMs, TimeoutBehavior)>,
902 resume_policy: ResumePolicy,
903 ) -> Result<TrySuspendOutcome, SdkError> {
904 let handle = self.synth_handle();
905 let (timeout_at, timeout_behavior) = match timeout {
906 Some((at, b)) => (Some(at), b),
907 None => (None, TimeoutBehavior::Fail),
908 };
909 // If the caller passed `ResumeCondition::Single { waitpoint_key }`
910 // alongside the default `WaitpointBinding::fresh()` (which mints a
911 // random internal key), rebind the Fresh binding to use the
912 // condition's key so RFC-013 §2.4 "waitpoint_key cross-field
913 // invariant" is honored. UsePending retains its own binding.
914 // RFC-014: for `Composite`, rebind to the first waitpoint_key
915 // observed in the composite tree (Single.waitpoint_key or
916 // Count.waitpoints[0]); consumers using single-waitpoint
917 // composites must keep all Single/Count waitpoint_keys equal
918 // (see RFC-014 single-waitpoint scoping).
919 let waitpoint = match (&waitpoint, &resume_condition) {
920 (
921 WaitpointBinding::Fresh { waitpoint_id, .. },
922 ResumeCondition::Single { waitpoint_key, .. },
923 ) => WaitpointBinding::Fresh {
924 waitpoint_id: waitpoint_id.clone(),
925 waitpoint_key: waitpoint_key.clone(),
926 },
927 (
928 WaitpointBinding::Fresh { waitpoint_id, .. },
929 ResumeCondition::Composite(body),
930 ) => {
931 if let Some(key) = composite_first_waitpoint_key(body) {
932 WaitpointBinding::Fresh {
933 waitpoint_id: waitpoint_id.clone(),
934 waitpoint_key: key,
935 }
936 } else {
937 waitpoint
938 }
939 }
940 _ => waitpoint,
941 };
942 let mut args = SuspendArgs::new(
943 SuspensionId::new(),
944 waitpoint,
945 resume_condition,
946 resume_policy,
947 reason_code,
948 TimestampMs::now(),
949 )
950 .with_requester(SuspensionRequester::Worker);
951 if let Some(at) = timeout_at {
952 args = args.with_timeout(at, timeout_behavior);
953 }
954
955 let outcome = self
956 .backend
957 .suspend(&handle, args)
958 .await
959 .map_err(SdkError::from)?;
960 match outcome {
961 SuspendOutcome::Suspended { details, handle: new_handle } => {
962 // Renewal is stopped only AFTER a successful Suspended
963 // outcome. The suspended-kind handle is what the caller
964 // will later hand to `observe_signals` / `claim_from_reclaim`.
965 self.stop_renewal();
966 Ok(TrySuspendOutcome::Suspended(SuspendedHandle {
967 handle: new_handle,
968 details,
969 }))
970 }
971 SuspendOutcome::AlreadySatisfied { details } => {
972 // Lease is retained Lua-side; keep the ClaimedTask alive
973 // so the worker continues against the existing lease.
974 Ok(TrySuspendOutcome::AlreadySatisfied {
975 task: self,
976 details,
977 })
978 }
979 _ => Err(SdkError::from(ScriptError::Parse {
980 fcall: "try_suspend_inner".into(),
981 execution_id: None,
982 message: "unexpected SuspendOutcome variant".into(),
983 })),
984 }
985 }
986
987 /// Read the signals that satisfied the waitpoint and triggered this
988 /// resume.
989 ///
990 /// Non-consuming. Intended to be called immediately after re-claim via
991 /// [`crate::FlowFabricWorker::claim_from_reclaim_grant`], before any
992 /// subsequent `suspend()` (which replaces `suspension:current`).
993 ///
994 /// Returns `Ok(vec![])` when this claim is NOT a signal-resume:
995 ///
996 /// - No prior suspension on this execution.
997 /// - The prior suspension belonged to an earlier attempt (e.g. the
998 /// attempt was cancelled/failed and a retry is now claiming).
999 /// - The prior suspension was closed by timeout / cancel / operator
1000 /// override rather than by a matched signal.
1001 ///
1002 /// Reads `suspension:current` once, filters by `attempt_index` to
1003 /// guard against stale prior-attempt records, then fetches the matched
1004 /// `signal_id` set from `waitpoint_condition`'s `matcher:N:signal_id`
1005 /// fields and reads each signal's metadata + payload directly.
1006 pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError> {
1007 // RFC-012 Stage 1b: forwards through `backend.observe_signals`.
1008 // Pre-migration body (HGETALL suspension_current + HMGET
1009 // matchers + pipelined HGETALL signal_hash / GET
1010 // signal_payload) lives in
1011 // `ff_backend_valkey::observe_signals_impl`.
1012 let handle = self.synth_handle();
1013 self.backend
1014 .observe_signals(&handle)
1015 .await
1016 .map_err(SdkError::from)
1017 }
1018
1019 /// Signal the renewal task to stop. Called by every terminal op
1020 /// (`complete`/`fail`/`cancel`/`suspend`/`delay_execution`/
1021 /// `move_to_waiting_children`) after the FCALL returns. Also marks
1022 /// `terminal_op_called` so the `Drop` impl can distinguish happy-path
1023 /// consumption from a genuine drop-without-terminal-op.
1024 fn stop_renewal(&self) {
1025 self.terminal_op_called.store(true, Ordering::Release);
1026 self.renewal_stop.notify_one();
1027 }
1028
1029}
1030
1031/// True iff the backend's FCALL result represents a round-trip that
1032/// reached the Lua side. `Ok(_)` and typed engine errors (validation,
1033/// contention, conflict, state, bug) all count as "landed" — the
1034/// server either committed or rejected with a typed response. Only
1035/// raw `Transport` errors (connection drops, request timeouts, parse
1036/// failures) count as "did not land", which matches the pre-Stage-1b
1037/// SDK's `fcall(...).await.map_err(SdkError::from)?` short-circuit
1038/// — those errors returned before `stop_renewal()` ran, preserving
1039/// the `Drop` warning for genuine "lease will leak" cases.
1040///
1041/// Stage 1b terminal-op forwarders use this predicate to decide
1042/// whether to call `stop_renewal()`: yes for landed responses, no
1043/// for transport errors so the caller's retry path still sees a
1044/// running renewal task.
1045fn fcall_landed<T>(r: &Result<T, crate::EngineError>) -> bool {
1046 match r {
1047 Ok(_) => true,
1048 Err(crate::EngineError::Transport { .. }) => false,
1049 Err(_) => true,
1050 }
1051}
1052
1053/// Map the SDK's free-form `error_category: &str` to the typed
1054/// `FailureClass` the trait's `fail` method takes. Unknown categories
1055/// fall through to `Transient` — the Lua side already tolerated
1056/// arbitrary strings, so the worst a category drift does under the
1057/// Stage 1b forwarder is reclassify a novel category as transient.
1058/// Stage 1d (or issue #117) widens `FailureClass` with a
1059/// `Custom(String)` arm for exact round-trip.
1060fn error_category_to_class(s: &str) -> ff_core::backend::FailureClass {
1061 use ff_core::backend::FailureClass;
1062 match s {
1063 "transient" => FailureClass::Transient,
1064 "permanent" => FailureClass::Permanent,
1065 "infra_crash" => FailureClass::InfraCrash,
1066 "timeout" => FailureClass::Timeout,
1067 "cancelled" => FailureClass::Cancelled,
1068 _ => FailureClass::Transient,
1069 }
1070}
1071
1072impl Drop for ClaimedTask {
1073 fn drop(&mut self) {
1074 // Abort the background renewal task on drop.
1075 // This is a safety net — complete/fail/cancel already stop renewal
1076 // via notify before consuming self. But if the task is dropped
1077 // without being consumed (e.g., panic), abort prevents leaked renewals.
1078 //
1079 // Why check `terminal_op_called` instead of `renewal_handle.is_finished()`:
1080 // on the happy path, `stop_renewal()` fires `notify_one` synchronously
1081 // and then self is consumed into Drop immediately. The renewal task
1082 // has not yet been polled by the runtime, so `is_finished()` is still
1083 // `false` here — which previously fired the warning on every
1084 // complete/fail/cancel/suspend call. `terminal_op_called` is the
1085 // authoritative signal that a terminal-op path ran to the point of
1086 // stopping renewal; it does not by itself certify the Lua side
1087 // succeeded (see the field doc). The caller surfaces any error via
1088 // the op's return value, so a `Drop` warning is unneeded there.
1089 if !self.terminal_op_called.load(Ordering::Acquire) {
1090 tracing::warn!(
1091 execution_id = %self.execution_id,
1092 "ClaimedTask dropped without terminal operation — lease will expire"
1093 );
1094 }
1095 self.renewal_handle.abort();
1096 }
1097}
1098
1099// ── Lease renewal ──
1100
1101/// Per-tick renewal: single `backend.renew(&handle)` call wrapped in
1102/// the `renew_lease` tracing span so bench harnesses' on_enter / on_exit
1103/// hooks still see one span per renewal (restores PR #119 Cursor
1104/// Bugbot finding — the top-level `spawn_renewal_task` fires once at
1105/// construction time, not per-tick).
1106///
1107/// See `benches/harness/src/bin/long_running.rs` for the bench
1108/// consumer that depends on this span naming.
1109#[tracing::instrument(
1110 name = "renew_lease",
1111 skip_all,
1112 fields(execution_id = %execution_id)
1113)]
1114async fn renew_once(
1115 backend: &dyn EngineBackend,
1116 handle: &Handle,
1117 execution_id: &ExecutionId,
1118) -> Result<(), crate::EngineError> {
1119 backend.renew(handle).await.map(|_| ())
1120}
1121
1122/// Spawn a background tokio task that renews the lease at `ttl / 3`
1123/// intervals.
1124///
1125/// **RFC-012 Stage 1b.** The renewal loop now forwards through the
1126/// `EngineBackend` trait (`backend.renew(&handle)`) instead of calling
1127/// `ff_renew_lease` via a direct FCALL. The Stage-1a `renew_lease_inner`
1128/// free function was deleted; this task holds an `Arc<dyn EngineBackend>`
1129/// + the encoded `Handle` instead. Per-tick tracing lives on
1130/// [`renew_once`]; this function itself is sync + one-shot.
1131///
1132/// Stops when:
1133/// - `stop_signal` is notified (complete/fail/cancel called)
1134/// - Renewal fails with a terminal error (stale_lease, lease_expired, etc.)
1135/// - The task handle is aborted (ClaimedTask dropped)
1136fn spawn_renewal_task(
1137 backend: Arc<dyn EngineBackend>,
1138 handle: Handle,
1139 execution_id: ExecutionId,
1140 lease_ttl_ms: u64,
1141 stop_signal: Arc<Notify>,
1142 failure_counter: Arc<AtomicU32>,
1143) -> JoinHandle<()> {
1144 // Clamp to ≥1ms so `tokio::time::interval(Duration::ZERO)` never
1145 // panics if a caller (or a misconfigured test) passes a
1146 // lease_ttl_ms < 3. The SDK config validator already enforces
1147 // `lease_ttl_ms >= 1_000` for healthy deployments, but the clamp
1148 // is a cheap belt-and-suspenders (Copilot review finding on
1149 // PR #119).
1150 let interval = Duration::from_millis((lease_ttl_ms / 3).max(1));
1151
1152 tokio::spawn(async move {
1153 let mut tick = tokio::time::interval(interval);
1154 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1155 // Skip the first immediate tick — the lease was just acquired.
1156 tick.tick().await;
1157
1158 loop {
1159 tokio::select! {
1160 _ = stop_signal.notified() => {
1161 tracing::debug!(
1162 execution_id = %execution_id,
1163 "lease renewal stopped by signal"
1164 );
1165 return;
1166 }
1167 _ = tick.tick() => {
1168 match renew_once(backend.as_ref(), &handle, &execution_id).await {
1169 Ok(_renewal) => {
1170 failure_counter.store(0, Ordering::Relaxed);
1171 tracing::trace!(
1172 execution_id = %execution_id,
1173 "lease renewed"
1174 );
1175 }
1176 Err(e) if is_terminal_renewal_error(&e) => {
1177 failure_counter.fetch_add(1, Ordering::Relaxed);
1178 tracing::warn!(
1179 execution_id = %execution_id,
1180 error = %e,
1181 "lease renewal failed with terminal error, stopping renewal"
1182 );
1183 return;
1184 }
1185 Err(e) => {
1186 let count = failure_counter.fetch_add(1, Ordering::Relaxed) + 1;
1187 tracing::warn!(
1188 execution_id = %execution_id,
1189 error = %e,
1190 consecutive_failures = count,
1191 "lease renewal failed (will retry next interval)"
1192 );
1193 }
1194 }
1195 }
1196 }
1197 }
1198 })
1199}
1200
1201/// Check if an engine error means renewal should stop permanently.
1202#[allow(dead_code)]
1203fn is_terminal_renewal_error(err: &crate::EngineError) -> bool {
1204 use crate::{ContentionKind, EngineError, StateKind};
1205 matches!(
1206 err,
1207 EngineError::State(
1208 StateKind::StaleLease | StateKind::LeaseExpired | StateKind::LeaseRevoked
1209 ) | EngineError::Contention(ContentionKind::ExecutionNotActive { .. })
1210 | EngineError::NotFound { entity: "execution" }
1211 )
1212}
1213
1214// ── FCALL result parsing ──
1215
1216/// Parse the wire-format result of the `ff_report_usage_and_check` Lua
1217/// function into a typed [`ReportUsageResult`].
1218///
1219/// Standard format: `{1, "OK"}`, `{1, "SOFT_BREACH", dim, current, limit}`,
1220/// `{1, "HARD_BREACH", dim, current, limit}`, `{1, "ALREADY_APPLIED"}`.
1221/// Status code `!= 1` is parsed as a [`ScriptError`] via
1222/// [`ScriptError::from_code_with_detail`].
1223///
1224/// Exposed as `pub` so downstream SDKs that speak the same wire format
1225/// — notably cairn-fabric's `budget_service::parse_spend_result` — can
1226/// call this directly instead of re-implementing the parse. Keeping one
1227/// parser paired with the producer (the Lua function registered at
1228/// `lua/budget.lua:99`, `ff_report_usage_and_check`) is the defence
1229/// against silent format drift between producer and consumer.
1230pub fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, SdkError> {
1231 let arr = match raw {
1232 Value::Array(arr) => arr,
1233 _ => {
1234 return Err(SdkError::from(ScriptError::Parse {
1235 fcall: "parse_report_usage_result".into(),
1236 execution_id: None,
1237 message: "ff_report_usage_and_check: expected Array".into(),
1238 }));
1239 }
1240 };
1241 let status_code = match arr.first() {
1242 Some(Ok(Value::Int(n))) => *n,
1243 _ => {
1244 return Err(SdkError::from(ScriptError::Parse {
1245 fcall: "parse_report_usage_result".into(),
1246 execution_id: None,
1247 message: "ff_report_usage_and_check: expected Int status code".into(),
1248 }));
1249 }
1250 };
1251 if status_code != 1 {
1252 let error_code = usage_field_str(arr, 1);
1253 let detail = usage_field_str(arr, 2);
1254 return Err(SdkError::from(
1255 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1256 ScriptError::Parse {
1257 fcall: "parse_report_usage_result".into(),
1258 execution_id: None,
1259 message: format!("ff_report_usage_and_check: {error_code}"),
1260 }
1261 }),
1262 ));
1263 }
1264 let sub_status = usage_field_str(arr, 1);
1265 match sub_status.as_str() {
1266 "OK" => Ok(ReportUsageResult::Ok),
1267 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
1268 "SOFT_BREACH" => {
1269 let dim = usage_field_str(arr, 2);
1270 let current = parse_usage_u64(arr, 3, "SOFT_BREACH", "current_usage")?;
1271 let limit = parse_usage_u64(arr, 4, "SOFT_BREACH", "soft_limit")?;
1272 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
1273 }
1274 "HARD_BREACH" => {
1275 let dim = usage_field_str(arr, 2);
1276 let current = parse_usage_u64(arr, 3, "HARD_BREACH", "current_usage")?;
1277 let limit = parse_usage_u64(arr, 4, "HARD_BREACH", "hard_limit")?;
1278 Ok(ReportUsageResult::HardBreach {
1279 dimension: dim,
1280 current_usage: current,
1281 hard_limit: limit,
1282 })
1283 }
1284 _ => Err(SdkError::from(ScriptError::Parse {
1285 fcall: "parse_report_usage_result".into(),
1286 execution_id: None,
1287 message: format!(
1288 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
1289 ),
1290 })),
1291 }
1292}
1293
1294fn usage_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
1295 match arr.get(index) {
1296 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
1297 Some(Ok(Value::SimpleString(s))) => s.clone(),
1298 Some(Ok(Value::Int(n))) => n.to_string(),
1299 _ => String::new(),
1300 }
1301}
1302
1303/// Parse a required numeric usage field (u64) from the wire array at
1304/// `index`. Returns `Err(ScriptError::Parse)` if the slot is missing,
1305/// holds a non-string/non-int value, or contains a string that does
1306/// not parse as u64.
1307///
1308/// Rationale: the Lua producer (`lua/budget.lua:99`,
1309/// `ff_report_usage_and_check`) always emits
1310/// `tostring(current_usage)` / `tostring(soft_or_hard_limit)` for
1311/// SOFT_BREACH/HARD_BREACH, never an empty slot. A missing or
1312/// non-numeric value here means the Lua and Rust sides drifted;
1313/// silently coercing to `0` would surface drift as "zero-usage breach"
1314/// — arithmetically correct but semantically nonsense. Fail loudly
1315/// instead so drift shows up as a parse error at the first call site.
1316fn parse_usage_u64(
1317 arr: &[Result<Value, ferriskey::Error>],
1318 index: usize,
1319 sub_status: &str,
1320 field_name: &str,
1321) -> Result<u64, SdkError> {
1322 match arr.get(index) {
1323 Some(Ok(Value::Int(n))) => {
1324 u64::try_from(*n).map_err(|_| {
1325 SdkError::from(ScriptError::Parse {
1326 fcall: "parse_usage_u64".into(),
1327 execution_id: None,
1328 message: format!(
1329 "ff_report_usage_and_check {sub_status}: {field_name} \
1330 (index {index}) negative int {n} cannot be u64"
1331 ),
1332 })
1333 })
1334 }
1335 Some(Ok(Value::BulkString(b))) => {
1336 let s = String::from_utf8_lossy(b);
1337 s.parse::<u64>().map_err(|_| {
1338 SdkError::from(ScriptError::Parse {
1339 fcall: "parse_usage_u64".into(),
1340 execution_id: None,
1341 message: format!(
1342 "ff_report_usage_and_check {sub_status}: {field_name} \
1343 (index {index}) not a u64 string: {s:?}"
1344 ),
1345 })
1346 })
1347 }
1348 Some(Ok(Value::SimpleString(s))) => s.parse::<u64>().map_err(|_| {
1349 SdkError::from(ScriptError::Parse {
1350 fcall: "parse_usage_u64".into(),
1351 execution_id: None,
1352 message: format!(
1353 "ff_report_usage_and_check {sub_status}: {field_name} \
1354 (index {index}) not a u64 string: {s:?}"
1355 ),
1356 })
1357 }),
1358 Some(_) => Err(SdkError::from(ScriptError::Parse {
1359 fcall: "parse_usage_u64".into(),
1360 execution_id: None,
1361 message: format!(
1362 "ff_report_usage_and_check {sub_status}: {field_name} \
1363 (index {index}) wrong wire type (expected Int or String)"
1364 ),
1365 })),
1366 None => Err(SdkError::from(ScriptError::Parse {
1367 fcall: "parse_usage_u64".into(),
1368 execution_id: None,
1369 message: format!(
1370 "ff_report_usage_and_check {sub_status}: {field_name} \
1371 (index {index}) missing from response"
1372 ),
1373 })),
1374 }
1375}
1376
1377/// Pure helper: decide whether `suspension:current` represents a
1378/// signal-driven resume for the currently-claimed attempt, and extract
1379/// the waitpoint_id if so. Returns `Ok(None)` for every non-match case
1380/// (no record, stale prior-attempt, non-resumed close). Returns an error
1381/// only for a present-but-malformed waitpoint_id, which indicates a Lua
1382/// bug rather than a missing-data case.
1383///
1384/// RFC-012 Stage 1b: `ClaimedTask::resume_signals` now forwards through
1385/// `EngineBackend::observe_signals`, which re-implements this invariant
1386/// inside `ff_backend_valkey`. The SDK helper is retained with its unit
1387/// tests so the parsing contract stays exercised at the SDK layer —
1388/// Stage 1d will consolidate (either promote the helper into ff-core
1389/// or drop these tests once the backend-side tests cover equivalent
1390/// ground).
1391#[allow(dead_code)]
1392fn resume_waitpoint_id_from_suspension(
1393 susp: &HashMap<String, String>,
1394 claimed_attempt: AttemptIndex,
1395) -> Result<Option<WaitpointId>, SdkError> {
1396 if susp.is_empty() {
1397 return Ok(None);
1398 }
1399 let susp_att: u32 = susp
1400 .get("attempt_index")
1401 .and_then(|s| s.parse().ok())
1402 .unwrap_or(u32::MAX);
1403 if susp_att != claimed_attempt.0 {
1404 return Ok(None);
1405 }
1406 let close_reason = susp.get("close_reason").map(String::as_str).unwrap_or("");
1407 if close_reason != "resumed" {
1408 return Ok(None);
1409 }
1410 let wp_id_str = susp
1411 .get("waitpoint_id")
1412 .map(String::as_str)
1413 .unwrap_or_default();
1414 if wp_id_str.is_empty() {
1415 return Ok(None);
1416 }
1417 let waitpoint_id = WaitpointId::parse(wp_id_str).map_err(|e| {
1418 SdkError::from(ScriptError::Parse {
1419 fcall: "resume_waitpoint_id_from_suspension".into(),
1420 execution_id: None,
1421 message: format!(
1422 "resume_signals: suspension_current.waitpoint_id is not a valid UUID: {e}"
1423 ),
1424 })
1425 })?;
1426 Ok(Some(waitpoint_id))
1427}
1428
1429pub(crate) fn parse_success_result(raw: &Value, function_name: &str) -> Result<(), SdkError> {
1430 let arr = match raw {
1431 Value::Array(arr) => arr,
1432 _ => {
1433 return Err(SdkError::from(ScriptError::Parse {
1434 fcall: "parse_success_result".into(),
1435 execution_id: None,
1436 message: format!(
1437 "{function_name}: expected Array, got non-array"
1438 ),
1439 }));
1440 }
1441 };
1442
1443 if arr.is_empty() {
1444 return Err(SdkError::from(ScriptError::Parse {
1445 fcall: "parse_success_result".into(),
1446 execution_id: None,
1447 message: format!(
1448 "{function_name}: empty result array"
1449 ),
1450 }));
1451 }
1452
1453 let status_code = match arr.first() {
1454 Some(Ok(Value::Int(n))) => *n,
1455 _ => {
1456 return Err(SdkError::from(ScriptError::Parse {
1457 fcall: "parse_success_result".into(),
1458 execution_id: None,
1459 message: format!(
1460 "{function_name}: expected Int at index 0"
1461 ),
1462 }));
1463 }
1464 };
1465
1466 if status_code == 1 {
1467 Ok(())
1468 } else {
1469 // Extract error code from index 1 and optional detail from index 2
1470 // (e.g. `capability_mismatch` ships missing tokens there). Variants
1471 // that carry a String payload pick the detail up via
1472 // `from_code_with_detail`; other variants ignore it.
1473 let field_str = |idx: usize| -> String {
1474 arr.get(idx)
1475 .and_then(|v| match v {
1476 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1477 Ok(Value::SimpleString(s)) => Some(s.clone()),
1478 _ => None,
1479 })
1480 .unwrap_or_default()
1481 };
1482 let error_code = {
1483 let s = field_str(1);
1484 if s.is_empty() { "unknown".to_owned() } else { s }
1485 };
1486 // Collect all detail slots (idx >= 2). Most variants only read
1487 // slot 0; ExecutionNotActive consumes slots 0..=3 (terminal_outcome,
1488 // lease_epoch, lifecycle_phase, attempt_id) for terminal-op replay
1489 // reconciliation after a network drop.
1490 let details: Vec<String> = (2..arr.len()).map(field_str).collect();
1491 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1492
1493 let script_err = ScriptError::from_code_with_details(&error_code, &detail_refs)
1494 .unwrap_or_else(|| {
1495 ScriptError::Parse {
1496 fcall: "parse_success_result".into(),
1497 execution_id: None,
1498 message: format!("{function_name}: unknown error: {error_code}"),
1499 }
1500 });
1501
1502 Err(SdkError::from(script_err))
1503 }
1504}
1505
1506// RFC-013 Stage 1d — `parse_suspend_result` removed. The backend impl
1507// in `ff_backend_valkey` now parses the Lua return and produces a typed
1508// `SuspendOutcome`; the SDK forwarder consumes that directly.
1509
1510/// Parse ff_deliver_signal result:
1511/// ok(signal_id, effect)
1512/// ok_duplicate(existing_signal_id)
1513pub(crate) fn parse_signal_result(raw: &Value) -> Result<SignalOutcome, SdkError> {
1514 let arr = match raw {
1515 Value::Array(arr) => arr,
1516 _ => {
1517 return Err(SdkError::from(ScriptError::Parse {
1518 fcall: "parse_signal_result".into(),
1519 execution_id: None,
1520 message: "ff_deliver_signal: expected Array".into(),
1521 }));
1522 }
1523 };
1524
1525 let status_code = match arr.first() {
1526 Some(Ok(Value::Int(n))) => *n,
1527 _ => {
1528 return Err(SdkError::from(ScriptError::Parse {
1529 fcall: "parse_signal_result".into(),
1530 execution_id: None,
1531 message: "ff_deliver_signal: bad status code".into(),
1532 }));
1533 }
1534 };
1535
1536 if status_code != 1 {
1537 let err_field_str = |idx: usize| -> String {
1538 arr.get(idx)
1539 .and_then(|v| match v {
1540 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1541 Ok(Value::SimpleString(s)) => Some(s.clone()),
1542 _ => None,
1543 })
1544 .unwrap_or_default()
1545 };
1546 let error_code = {
1547 let s = err_field_str(1);
1548 if s.is_empty() { "unknown".to_owned() } else { s }
1549 };
1550 let detail = err_field_str(2);
1551 return Err(SdkError::from(
1552 ScriptError::from_code_with_detail(&error_code, &detail).unwrap_or_else(|| {
1553 ScriptError::Parse {
1554 fcall: "parse_signal_result".into(),
1555 execution_id: None,
1556 message: format!("ff_deliver_signal: {error_code}"),
1557 }
1558 }),
1559 ));
1560 }
1561
1562 let sub_status = arr
1563 .get(1)
1564 .and_then(|v| match v {
1565 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1566 Ok(Value::SimpleString(s)) => Some(s.clone()),
1567 _ => None,
1568 })
1569 .unwrap_or_default();
1570
1571 if sub_status == "DUPLICATE" {
1572 let existing_id = arr
1573 .get(2)
1574 .and_then(|v| match v {
1575 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1576 Ok(Value::SimpleString(s)) => Some(s.clone()),
1577 _ => None,
1578 })
1579 .unwrap_or_default();
1580 return Ok(SignalOutcome::Duplicate {
1581 existing_signal_id: existing_id,
1582 });
1583 }
1584
1585 // Parse: {1, "OK", signal_id, effect}
1586 let signal_id_str = arr
1587 .get(2)
1588 .and_then(|v| match v {
1589 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1590 Ok(Value::SimpleString(s)) => Some(s.clone()),
1591 _ => None,
1592 })
1593 .unwrap_or_default();
1594
1595 let effect = arr
1596 .get(3)
1597 .and_then(|v| match v {
1598 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1599 Ok(Value::SimpleString(s)) => Some(s.clone()),
1600 _ => None,
1601 })
1602 .unwrap_or_default();
1603
1604 let signal_id = SignalId::parse(&signal_id_str).map_err(|e| {
1605 SdkError::from(ScriptError::Parse {
1606 fcall: "parse_signal_result".into(),
1607 execution_id: None,
1608 message: format!(
1609 "ff_deliver_signal: invalid signal_id from Lua: {e}"
1610 ),
1611 })
1612 })?;
1613
1614 if effect == "resume_condition_satisfied" {
1615 Ok(SignalOutcome::TriggeredResume { signal_id })
1616 } else {
1617 Ok(SignalOutcome::Accepted { signal_id, effect })
1618 }
1619}
1620
1621/// Parse ff_fail_execution result:
1622/// ok("retry_scheduled", delay_until)
1623/// ok("terminal_failed")
1624#[allow(dead_code)]
1625fn parse_fail_result(raw: &Value) -> Result<FailOutcome, SdkError> {
1626 let arr = match raw {
1627 Value::Array(arr) => arr,
1628 _ => {
1629 return Err(SdkError::from(ScriptError::Parse {
1630 fcall: "parse_fail_result".into(),
1631 execution_id: None,
1632 message: "ff_fail_execution: expected Array".into(),
1633 }));
1634 }
1635 };
1636
1637 let status_code = match arr.first() {
1638 Some(Ok(Value::Int(n))) => *n,
1639 _ => {
1640 return Err(SdkError::from(ScriptError::Parse {
1641 fcall: "parse_fail_result".into(),
1642 execution_id: None,
1643 message: "ff_fail_execution: bad status code".into(),
1644 }));
1645 }
1646 };
1647
1648 if status_code != 1 {
1649 let err_field_str = |idx: usize| -> String {
1650 arr.get(idx)
1651 .and_then(|v| match v {
1652 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1653 Ok(Value::SimpleString(s)) => Some(s.clone()),
1654 _ => None,
1655 })
1656 .unwrap_or_default()
1657 };
1658 let error_code = {
1659 let s = err_field_str(1);
1660 if s.is_empty() { "unknown".to_owned() } else { s }
1661 };
1662 let details: Vec<String> = (2..arr.len()).map(err_field_str).collect();
1663 let detail_refs: Vec<&str> = details.iter().map(|s| s.as_str()).collect();
1664 return Err(SdkError::from(
1665 ScriptError::from_code_with_details(&error_code, &detail_refs).unwrap_or_else(|| {
1666 ScriptError::Parse {
1667 fcall: "parse_fail_result".into(),
1668 execution_id: None,
1669 message: format!("ff_fail_execution: {error_code}"),
1670 }
1671 }),
1672 ));
1673 }
1674
1675 // Parse sub-status from field[2] (index 2 = first field after status+OK)
1676 let sub_status = arr
1677 .get(2)
1678 .and_then(|v| match v {
1679 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1680 Ok(Value::SimpleString(s)) => Some(s.clone()),
1681 _ => None,
1682 })
1683 .unwrap_or_default();
1684
1685 match sub_status.as_str() {
1686 "retry_scheduled" => {
1687 // Lua returns: ok("retry_scheduled", tostring(delay_until))
1688 // arr[3] = delay_until
1689 let delay_str = arr
1690 .get(3)
1691 .and_then(|v| match v {
1692 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
1693 Ok(Value::Int(n)) => Some(n.to_string()),
1694 _ => None,
1695 })
1696 .unwrap_or_default();
1697 let delay_until = delay_str.parse::<i64>().unwrap_or(0);
1698
1699 Ok(FailOutcome::RetryScheduled {
1700 delay_until: TimestampMs::from_millis(delay_until),
1701 })
1702 }
1703 "terminal_failed" => Ok(FailOutcome::TerminalFailed),
1704 _ => Err(SdkError::from(ScriptError::Parse {
1705 fcall: "parse_fail_result".into(),
1706 execution_id: None,
1707 message: format!(
1708 "ff_fail_execution: unexpected sub-status: {sub_status}"
1709 ),
1710 })),
1711 }
1712}
1713
1714// ── Stream read / tail (consumer API, RFC-006 #2) ──
1715
1716/// Maximum tail block duration accepted by [`tail_stream`]. Mirrors the REST
1717/// endpoint ceiling so SDK callers can't wedge a connection longer than the
1718/// server would accept.
1719pub const MAX_TAIL_BLOCK_MS: u64 = 30_000;
1720
1721/// Maximum frames per read/tail call. Mirrors
1722/// `ff_core::contracts::STREAM_READ_HARD_CAP` — re-exported here so SDK
1723/// callers don't need to import ff-core just to read the bound.
1724pub use ff_core::contracts::STREAM_READ_HARD_CAP;
1725
1726/// Result of [`read_stream`] / [`tail_stream`] — frames plus the terminal
1727/// signal so polling consumers can exit cleanly.
1728///
1729/// Re-export of `ff_core::contracts::StreamFrames` for SDK ergonomics.
1730pub use ff_core::contracts::StreamFrames;
1731
1732/// Opaque cursor for [`read_stream`] / [`tail_stream`] — re-export of
1733/// `ff_core::contracts::StreamCursor`. Wire tokens: `"start"`, `"end"`,
1734/// `"<ms>"`, `"<ms>-<seq>"`. Bare `-` / `+` are rejected — use
1735/// `StreamCursor::Start` / `StreamCursor::End` instead.
1736pub use ff_core::contracts::StreamCursor;
1737
1738/// Reject `Start` / `End` cursors at the XREAD (`tail_stream`) boundary
1739/// — XREAD does not accept the open markers. Pulled out as a bare
1740/// function so unit tests can exercise the guard without constructing a
1741/// live `ferriskey::Client`.
1742fn validate_tail_cursor(after: &StreamCursor) -> Result<(), SdkError> {
1743 if !after.is_concrete() {
1744 return Err(SdkError::Config {
1745 context: "tail_stream".into(),
1746 field: Some("after".into()),
1747 message: "XREAD cursor must be a concrete entry id; pass \
1748 StreamCursor::from_beginning() to start from the \
1749 beginning"
1750 .into(),
1751 });
1752 }
1753 Ok(())
1754}
1755
1756fn validate_stream_read_count(count_limit: u64) -> Result<(), SdkError> {
1757 if count_limit == 0 {
1758 return Err(SdkError::Config {
1759 context: "read_stream_frames".into(),
1760 field: Some("count_limit".into()),
1761 message: "count_limit must be >= 1".into(),
1762 });
1763 }
1764 if count_limit > STREAM_READ_HARD_CAP {
1765 return Err(SdkError::Config {
1766 context: "read_stream_frames".into(),
1767 field: Some("count_limit".into()),
1768 message: format!(
1769 "count_limit exceeds STREAM_READ_HARD_CAP ({STREAM_READ_HARD_CAP})"
1770 ),
1771 });
1772 }
1773 Ok(())
1774}
1775
1776/// Read frames from a completed or in-flight attempt's stream.
1777///
1778/// `from` / `to` are [`StreamCursor`] values — `StreamCursor::Start` /
1779/// `StreamCursor::End` are equivalent to XRANGE `-` / `+`, and
1780/// `StreamCursor::At("<id>")` reads from a concrete entry id.
1781/// `count_limit` MUST be in `1..=STREAM_READ_HARD_CAP` —
1782/// `0` returns [`SdkError::Config`].
1783///
1784/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` so
1785/// consumers know when the producer has finalized the stream. A
1786/// never-written attempt and an in-progress stream are indistinguishable
1787/// here — both present as `frames=[]`, `closed_at=None`.
1788///
1789/// Intended for consumers (audit, checkpoint replay) that hold a ferriskey
1790/// client but are not the lease-holding worker — no lease check is
1791/// performed.
1792///
1793/// # Head-of-line note
1794///
1795/// A max-limit XRANGE reply (10_000 frames × ~64 KB each) is a
1796/// multi-MB reply serialized on one TCP socket. Like [`tail_stream`],
1797/// calling this on a `client` that is also serving FCALLs stalls those
1798/// FCALLs behind the reply. The REST server isolates reads on its
1799/// `tail_client`; direct SDK callers should either use a dedicated
1800/// client OR paginate through smaller `count_limit` slices.
1801pub async fn read_stream(
1802 backend: &dyn EngineBackend,
1803 execution_id: &ExecutionId,
1804 attempt_index: AttemptIndex,
1805 from: StreamCursor,
1806 to: StreamCursor,
1807 count_limit: u64,
1808) -> Result<StreamFrames, SdkError> {
1809 validate_stream_read_count(count_limit)?;
1810 Ok(backend
1811 .read_stream(execution_id, attempt_index, from, to, count_limit)
1812 .await?)
1813}
1814
1815/// Tail a live attempt's stream.
1816///
1817/// `after` is an exclusive [`StreamCursor`] — XREAD returns entries
1818/// with id strictly greater than `after`. Pass
1819/// `StreamCursor::from_beginning()` (i.e. `At("0-0")`) to start from
1820/// the beginning. `StreamCursor::Start` / `StreamCursor::End` are
1821/// REJECTED at this boundary because XREAD does not accept `-` / `+`
1822/// as cursors — an invalid `after` surfaces as [`SdkError::Config`].
1823///
1824/// `block_ms == 0` → non-blocking peek. `block_ms > 0` → blocks up to that
1825/// many ms. Rejects `block_ms > MAX_TAIL_BLOCK_MS` and `count_limit`
1826/// outside `1..=STREAM_READ_HARD_CAP` with [`SdkError::Config`] to keep
1827/// SDK and REST ceilings aligned.
1828///
1829/// Returns a [`StreamFrames`] including `closed_at`/`closed_reason` —
1830/// polling consumers should loop until `result.is_closed()` is true, then
1831/// drain and exit. Timeout with no new frames presents as
1832/// `frames=[], closed_at=None`.
1833///
1834/// # Head-of-line warning — use a dedicated client
1835///
1836/// `ferriskey::Client` is a pipelined multiplexed connection; Valkey
1837/// processes commands FIFO on it. `XREAD BLOCK block_ms` does not yield
1838/// the read side until a frame arrives or the block elapses. If the
1839/// `client` you pass here is ALSO used for claims, completes, fails,
1840/// appends, or any other FCALL, a 30-second tail will stall all those
1841/// calls for up to 30 seconds.
1842///
1843/// **Strongly recommended**: build a separate `ferriskey::Client` for
1844/// tail callers — mirrors the `Server::tail_client` split that the REST
1845/// server uses internally (see `crates/ff-server/src/server.rs` and
1846/// RFC-006 Impl Notes §"Dedicated stream-op connection").
1847///
1848/// # Tail parallelism caveat (same mux)
1849///
1850/// Even a dedicated tail client is still one multiplexed TCP connection.
1851/// Valkey processes `XREAD BLOCK` calls FIFO on that one socket, and
1852/// ferriskey's per-call `request_timeout` starts at future-poll — so
1853/// two concurrent tails against the same client can time out spuriously:
1854/// the second call's BLOCK budget elapses while it waits for the first
1855/// BLOCK to return. The REST server handles this internally with a
1856/// `tokio::sync::Mutex` that serializes `xread_block` calls, giving
1857/// each call its full `block_ms` budget at the server.
1858///
1859/// **Direct SDK callers that need concurrent tails**: either
1860/// (1) build ONE `ferriskey::Client` per concurrent tail call (a small
1861/// pool of clients, rotated by the caller), OR
1862/// (2) wrap `tail_stream` calls in your own `tokio::sync::Mutex` so
1863/// only one BLOCK is in flight per client at a time.
1864/// If you need the REST-side backpressure (429 on contention) and the
1865/// built-in serializer, go through the
1866/// `/v1/executions/{eid}/attempts/{idx}/stream/tail` endpoint rather
1867/// than calling this directly.
1868///
1869/// This SDK does not enforce either pattern — the mutex belongs at the
1870/// application layer, and the connection pool belongs at the SDK
1871/// caller's DI layer; neither has a structured place inside this
1872/// helper.
1873///
1874/// # Timeout handling
1875///
1876/// Blocking calls do not hit ferriskey's default `request_timeout` (5s on
1877/// the server default). For `XREAD`/`XREADGROUP` with a `BLOCK` argument,
1878/// ferriskey's `get_request_timeout` returns `BlockingCommand(block_ms +
1879/// 500ms)`, overriding the client's default per-call. A tail with
1880/// `block_ms = 30_000` gets a 30_500ms effective transport timeout even if
1881/// the client was built with a shorter `request_timeout`. No custom client
1882/// configuration is required for timeout reasons — only for head-of-line
1883/// isolation above.
1884pub async fn tail_stream(
1885 backend: &dyn EngineBackend,
1886 execution_id: &ExecutionId,
1887 attempt_index: AttemptIndex,
1888 after: StreamCursor,
1889 block_ms: u64,
1890 count_limit: u64,
1891) -> Result<StreamFrames, SdkError> {
1892 tail_stream_with_visibility(
1893 backend,
1894 execution_id,
1895 attempt_index,
1896 after,
1897 block_ms,
1898 count_limit,
1899 ff_core::backend::TailVisibility::All,
1900 )
1901 .await
1902}
1903
1904/// Tail helper with an explicit RFC-015
1905/// [`TailVisibility`](ff_core::backend::TailVisibility) filter.
1906pub async fn tail_stream_with_visibility(
1907 backend: &dyn EngineBackend,
1908 execution_id: &ExecutionId,
1909 attempt_index: AttemptIndex,
1910 after: StreamCursor,
1911 block_ms: u64,
1912 count_limit: u64,
1913 visibility: ff_core::backend::TailVisibility,
1914) -> Result<StreamFrames, SdkError> {
1915 if block_ms > MAX_TAIL_BLOCK_MS {
1916 return Err(SdkError::Config {
1917 context: "tail_stream".into(),
1918 field: Some("block_ms".into()),
1919 message: format!("exceeds {MAX_TAIL_BLOCK_MS}ms ceiling"),
1920 });
1921 }
1922 validate_stream_read_count(count_limit)?;
1923 // XREAD does not accept `-` / `+` markers as cursors — reject at
1924 // the SDK boundary with `SdkError::Config` rather than forwarding
1925 // an invalid `-`/`+` into the backend (which would surface as an
1926 // opaque `EngineError::Transport`).
1927 validate_tail_cursor(&after)?;
1928
1929 Ok(backend
1930 .tail_stream(
1931 execution_id,
1932 attempt_index,
1933 after,
1934 block_ms,
1935 count_limit,
1936 visibility,
1937 )
1938 .await?)
1939}
1940
1941#[cfg(test)]
1942mod tail_stream_boundary_tests {
1943 use super::*;
1944
1945 // `validate_tail_cursor` rejects `StreamCursor::Start` and
1946 // `StreamCursor::End` before `tail_stream` touches the client —
1947 // same shape as the `count_limit` guard above. The matching
1948 // full-path rejection on the REST layer is covered by
1949 // `ff-server::api`.
1950
1951 #[test]
1952 fn rejects_start_cursor() {
1953 let err = validate_tail_cursor(&StreamCursor::Start)
1954 .expect_err("Start must be rejected");
1955 match err {
1956 SdkError::Config { field, context, .. } => {
1957 assert_eq!(field.as_deref(), Some("after"));
1958 assert_eq!(context, "tail_stream");
1959 }
1960 other => panic!("expected SdkError::Config, got {other:?}"),
1961 }
1962 }
1963
1964 #[test]
1965 fn rejects_end_cursor() {
1966 let err = validate_tail_cursor(&StreamCursor::End)
1967 .expect_err("End must be rejected");
1968 assert!(matches!(err, SdkError::Config { .. }));
1969 }
1970
1971 #[test]
1972 fn accepts_at_cursor() {
1973 validate_tail_cursor(&StreamCursor::At("0-0".into()))
1974 .expect("At cursor must be accepted");
1975 validate_tail_cursor(&StreamCursor::from_beginning())
1976 .expect("from_beginning() must be accepted");
1977 validate_tail_cursor(&StreamCursor::At("123-0".into()))
1978 .expect("concrete id must be accepted");
1979 }
1980}
1981
1982#[cfg(test)]
1983mod parse_report_usage_result_tests {
1984 use super::*;
1985
1986 /// `Value::SimpleString` from a `&str`. `usage_field_str` handles
1987 /// BulkString and SimpleString uniformly (see
1988 /// `usage_field_str` — `Value::BulkString(b)` → `String::from_utf8_lossy`,
1989 /// `Value::SimpleString(s)` → clone). SimpleString avoids a
1990 /// dev-dependency on `bytes` just for test construction.
1991 fn s(v: &str) -> Result<Value, ferriskey::Error> {
1992 Ok(Value::SimpleString(v.to_owned()))
1993 }
1994
1995 fn int(n: i64) -> Result<Value, ferriskey::Error> {
1996 Ok(Value::Int(n))
1997 }
1998
1999 fn arr(items: Vec<Result<Value, ferriskey::Error>>) -> Value {
2000 Value::Array(items)
2001 }
2002
2003 #[test]
2004 fn ok_status() {
2005 let raw = arr(vec![int(1), s("OK")]);
2006 assert_eq!(parse_report_usage_result(&raw).unwrap(), ReportUsageResult::Ok);
2007 }
2008
2009 #[test]
2010 fn already_applied_status() {
2011 let raw = arr(vec![int(1), s("ALREADY_APPLIED")]);
2012 assert_eq!(
2013 parse_report_usage_result(&raw).unwrap(),
2014 ReportUsageResult::AlreadyApplied
2015 );
2016 }
2017
2018 #[test]
2019 fn soft_breach_status() {
2020 let raw = arr(vec![int(1), s("SOFT_BREACH"), s("tokens"), s("150"), s("100")]);
2021 match parse_report_usage_result(&raw).unwrap() {
2022 ReportUsageResult::SoftBreach { dimension, current_usage, soft_limit } => {
2023 assert_eq!(dimension, "tokens");
2024 assert_eq!(current_usage, 150);
2025 assert_eq!(soft_limit, 100);
2026 }
2027 other => panic!("expected SoftBreach, got {other:?}"),
2028 }
2029 }
2030
2031 #[test]
2032 fn hard_breach_status() {
2033 let raw = arr(vec![int(1), s("HARD_BREACH"), s("requests"), s("10001"), s("10000")]);
2034 match parse_report_usage_result(&raw).unwrap() {
2035 ReportUsageResult::HardBreach { dimension, current_usage, hard_limit } => {
2036 assert_eq!(dimension, "requests");
2037 assert_eq!(current_usage, 10001);
2038 assert_eq!(hard_limit, 10000);
2039 }
2040 other => panic!("expected HardBreach, got {other:?}"),
2041 }
2042 }
2043
2044 /// Negative case: non-Array input. Guards against a future Lua refactor
2045 /// that accidentally returns a bare string/int — the parser must fail
2046 /// loudly rather than silently succeed or panic.
2047 #[test]
2048 fn non_array_input_is_parse_error() {
2049 let raw = Value::SimpleString("OK".to_owned());
2050 let err = parse_report_usage_result(&raw).unwrap_err();
2051 let msg = format!("{err}");
2052 assert!(
2053 msg.to_lowercase().contains("expected array"),
2054 "error should mention expected shape, got: {msg}"
2055 );
2056 }
2057
2058 /// Negative case: Array whose first element isn't an Int status code.
2059 /// The Lua function's first return slot is always `status_code` (1 on
2060 /// success, an error code otherwise); a non-Int there is a wire-format
2061 /// break that must surface as a parse error.
2062 #[test]
2063 fn first_element_non_int_is_parse_error() {
2064 let raw = arr(vec![s("not_an_int"), s("OK")]);
2065 let err = parse_report_usage_result(&raw).unwrap_err();
2066 let msg = format!("{err}");
2067 assert!(
2068 msg.to_lowercase().contains("int"),
2069 "error should mention Int status code, got: {msg}"
2070 );
2071 }
2072
2073 /// Negative case: SOFT_BREACH with a non-numeric `current_usage`
2074 /// field. Guards against the silent-coercion defect cross-review
2075 /// caught: the old parser used `.unwrap_or(0)` on numeric fields,
2076 /// which would have surfaced Lua-side wire-format drift as a
2077 /// `SoftBreach { current_usage: 0, ... }` — arithmetically valid
2078 /// but semantically wrong (a "breach with zero usage" is nonsense
2079 /// and masks the real error).
2080 #[test]
2081 fn soft_breach_non_numeric_current_is_parse_error() {
2082 let raw = arr(vec![
2083 int(1),
2084 s("SOFT_BREACH"),
2085 s("tokens"),
2086 s("not_a_number"), // current_usage — must fail, not coerce to 0
2087 s("100"),
2088 ]);
2089 let err = parse_report_usage_result(&raw).unwrap_err();
2090 let msg = format!("{err}");
2091 assert!(
2092 msg.contains("SOFT_BREACH") && msg.contains("current_usage"),
2093 "error should identify sub-status + field, got: {msg}"
2094 );
2095 assert!(
2096 msg.to_lowercase().contains("u64"),
2097 "error should mention the expected type (u64), got: {msg}"
2098 );
2099 }
2100
2101 /// Negative case: HARD_BREACH with the limit slot missing
2102 /// entirely. Same defence as the non-numeric test above: a
2103 /// truncated response must fail loudly rather than coerce to 0.
2104 #[test]
2105 fn hard_breach_missing_limit_is_parse_error() {
2106 let raw = arr(vec![
2107 int(1),
2108 s("HARD_BREACH"),
2109 s("requests"),
2110 s("10001"),
2111 // no index 4 — hard_limit missing
2112 ]);
2113 let err = parse_report_usage_result(&raw).unwrap_err();
2114 let msg = format!("{err}");
2115 assert!(
2116 msg.contains("HARD_BREACH") && msg.contains("hard_limit"),
2117 "error should identify sub-status + field, got: {msg}"
2118 );
2119 assert!(
2120 msg.to_lowercase().contains("missing"),
2121 "error should say 'missing', got: {msg}"
2122 );
2123 }
2124}
2125
2126/// RFC-014 helper: pick the first waitpoint_key out of a composite
2127/// tree so `try_suspend_inner` can rebind a Fresh waitpoint to the
2128/// user-supplied key. Single-waitpoint scoping: all
2129/// Single.waitpoint_key / Count.waitpoints[i] in the tree must be
2130/// equal; this function returns the first one it encounters.
2131fn composite_first_waitpoint_key(body: &CompositeBody) -> Option<String> {
2132 match body {
2133 CompositeBody::AllOf { members } => members.iter().find_map(|m| match m {
2134 ResumeCondition::Single { waitpoint_key, .. } => Some(waitpoint_key.clone()),
2135 ResumeCondition::Composite(inner) => composite_first_waitpoint_key(inner),
2136 _ => None,
2137 }),
2138 CompositeBody::Count { waitpoints, .. } => waitpoints.first().cloned(),
2139 _ => None,
2140 }
2141}
2142
2143
2144#[cfg(test)]
2145mod resume_signals_tests {
2146 use super::*;
2147
2148 fn m(pairs: &[(&str, &str)]) -> HashMap<String, String> {
2149 pairs.iter().map(|(k, v)| ((*k).to_owned(), (*v).to_owned())).collect()
2150 }
2151
2152 #[test]
2153 fn empty_suspension_returns_none() {
2154 let susp = m(&[]);
2155 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2156 assert!(out.is_none(), "no suspension record → None");
2157 }
2158
2159 #[test]
2160 fn stale_prior_attempt_returns_none() {
2161 let wp = WaitpointId::new();
2162 let susp = m(&[
2163 ("attempt_index", "0"),
2164 ("close_reason", "resumed"),
2165 ("waitpoint_id", &wp.to_string()),
2166 ]);
2167 // Claimed attempt is 1; suspension belongs to 0 → stale.
2168 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(1)).unwrap();
2169 assert!(out.is_none(), "attempt_index mismatch → None");
2170 }
2171
2172 #[test]
2173 fn non_resumed_close_returns_none() {
2174 let wp = WaitpointId::new();
2175 for reason in ["timeout", "cancelled", "", "expired"] {
2176 let susp = m(&[
2177 ("attempt_index", "0"),
2178 ("close_reason", reason),
2179 ("waitpoint_id", &wp.to_string()),
2180 ]);
2181 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2182 assert!(out.is_none(), "close_reason={reason:?} must not return signals");
2183 }
2184 }
2185
2186 #[test]
2187 fn resumed_same_attempt_returns_waitpoint() {
2188 let wp = WaitpointId::new();
2189 let susp = m(&[
2190 ("attempt_index", "2"),
2191 ("close_reason", "resumed"),
2192 ("waitpoint_id", &wp.to_string()),
2193 ]);
2194 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(2)).unwrap();
2195 assert_eq!(out, Some(wp));
2196 }
2197
2198 #[test]
2199 fn malformed_waitpoint_id_is_error() {
2200 let susp = m(&[
2201 ("attempt_index", "0"),
2202 ("close_reason", "resumed"),
2203 ("waitpoint_id", "not-a-uuid"),
2204 ]);
2205 let err = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap_err();
2206 assert!(
2207 format!("{err}").contains("not a valid UUID"),
2208 "error should mention invalid UUID, got: {err}"
2209 );
2210 }
2211
2212 #[test]
2213 fn empty_waitpoint_id_returns_none() {
2214 // Defensive: an empty waitpoint_id field (shouldn't happen on
2215 // resumed records, but guard against partial writes) is None, not an error.
2216 let susp = m(&[
2217 ("attempt_index", "0"),
2218 ("close_reason", "resumed"),
2219 ("waitpoint_id", ""),
2220 ]);
2221 let out = resume_waitpoint_id_from_suspension(&susp, AttemptIndex::new(0)).unwrap();
2222 assert!(out.is_none());
2223 }
2224
2225 // The previous `matched_signal_ids_from_condition` helper was
2226 // removed when the production path switched from unbounded
2227 // HGETALL to a bounded HGET + per-matcher HMGET loop (review
2228 // feedback on unbounded condition-hash reply size). That loop
2229 // is exercised by the integration tests in `ff-test`.
2230}
2231
2232#[cfg(test)]
2233mod terminal_replay_parsing_tests {
2234 //! Unit tests for the SDK's parse path of the enriched
2235 //! `execution_not_active` error returned on a terminal-op replay.
2236 //! The integration test in ff-test/tests/e2e_lifecycle.rs proves
2237 //! the Lua side emits the 4-slot detail; these tests prove the
2238 //! Rust parser threads all 4 slots into the `ExecutionNotActive`
2239 //! variant so the reconciler in complete()/fail()/cancel() can
2240 //! match on them.
2241
2242 use super::*;
2243 use ferriskey::Value;
2244
2245 // Use SimpleString rather than BulkString to avoid depending on bytes::Bytes
2246 // in the test harness. parse_success_result + parse_fail_result handle both.
2247 fn bulk(s: &str) -> Value {
2248 Value::SimpleString(s.to_owned())
2249 }
2250
2251 /// parse_success_result must fold idx 2..=5 into ExecutionNotActive.
2252 #[test]
2253 fn parse_success_result_extracts_all_four_detail_slots() {
2254 let raw = Value::Array(vec![
2255 Ok(Value::Int(0)),
2256 Ok(bulk("execution_not_active")),
2257 Ok(bulk("success")),
2258 Ok(bulk("42")),
2259 Ok(bulk("terminal")),
2260 Ok(bulk("11111111-1111-1111-1111-111111111111")),
2261 ]);
2262 let err = parse_success_result(&raw, "test").unwrap_err();
2263 let unboxed = match err {
2264 SdkError::Engine(b) => *b,
2265 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2266 };
2267 match unboxed {
2268 crate::EngineError::Contention(
2269 crate::ContentionKind::ExecutionNotActive {
2270 terminal_outcome,
2271 lease_epoch,
2272 lifecycle_phase,
2273 attempt_id,
2274 },
2275 ) => {
2276 assert_eq!(terminal_outcome, "success");
2277 assert_eq!(lease_epoch, "42");
2278 assert_eq!(lifecycle_phase, "terminal");
2279 assert_eq!(attempt_id, "11111111-1111-1111-1111-111111111111");
2280 }
2281 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2282 }
2283 }
2284
2285 /// parse_fail_result must extract all detail slots too so the
2286 /// reconciler in fail() can match lifecycle_phase = "runnable" for
2287 /// retry-scheduled replays.
2288 #[test]
2289 fn parse_fail_result_extracts_all_four_detail_slots() {
2290 let raw = Value::Array(vec![
2291 Ok(Value::Int(0)),
2292 Ok(bulk("execution_not_active")),
2293 Ok(bulk("none")),
2294 Ok(bulk("7")),
2295 Ok(bulk("runnable")),
2296 Ok(bulk("22222222-2222-2222-2222-222222222222")),
2297 ]);
2298 let err = parse_fail_result(&raw).unwrap_err();
2299 let unboxed = match err {
2300 SdkError::Engine(b) => *b,
2301 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2302 };
2303 match unboxed {
2304 crate::EngineError::Contention(
2305 crate::ContentionKind::ExecutionNotActive {
2306 terminal_outcome,
2307 lease_epoch,
2308 lifecycle_phase,
2309 attempt_id,
2310 },
2311 ) => {
2312 assert_eq!(terminal_outcome, "none");
2313 assert_eq!(lease_epoch, "7");
2314 assert_eq!(lifecycle_phase, "runnable");
2315 assert_eq!(attempt_id, "22222222-2222-2222-2222-222222222222");
2316 }
2317 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2318 }
2319 }
2320
2321 /// Empty detail slots must default to "" (not panic) so older-Lua
2322 /// producers or malformed replies degrade to an unreconcilable
2323 /// variant rather than a Parse error.
2324 #[test]
2325 fn parse_success_result_missing_slots_defaults_to_empty() {
2326 let raw = Value::Array(vec![
2327 Ok(Value::Int(0)),
2328 Ok(bulk("execution_not_active")),
2329 ]);
2330 let err = parse_success_result(&raw, "test").unwrap_err();
2331 let unboxed = match err {
2332 SdkError::Engine(b) => *b,
2333 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2334 };
2335 match unboxed {
2336 crate::EngineError::Contention(
2337 crate::ContentionKind::ExecutionNotActive {
2338 terminal_outcome,
2339 lease_epoch,
2340 lifecycle_phase,
2341 attempt_id,
2342 },
2343 ) => {
2344 assert_eq!(terminal_outcome, "");
2345 assert_eq!(lease_epoch, "");
2346 assert_eq!(lifecycle_phase, "");
2347 assert_eq!(attempt_id, "");
2348 }
2349 other => panic!("expected ExecutionNotActive struct variant, got {other:?}"),
2350 }
2351 }
2352}