pub struct ClaimedTask { /* private fields */ }Expand description
A claimed execution with an active lease. The worker processes this task
and must call one of complete(), fail(), or cancel() when done.
The lease is automatically renewed in the background at lease_ttl / 3
intervals. Renewal stops when the task is consumed or dropped.
complete, fail, and cancel consume self — this prevents
double-complete bugs at the type level.
Implementations§
Source§impl ClaimedTask
impl ClaimedTask
pub fn execution_id(&self) -> &ExecutionId
pub fn attempt_index(&self) -> AttemptIndex
pub fn attempt_id(&self) -> &AttemptId
pub fn lease_id(&self) -> &LeaseId
pub fn lease_epoch(&self) -> LeaseEpoch
pub fn input_payload(&self) -> &[u8] ⓘ
pub fn execution_kind(&self) -> &str
pub fn lane_id(&self) -> &LaneId
Sourcepub async fn read_stream(
&self,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Result<StreamFrames, SdkError>
pub async fn read_stream( &self, from: StreamCursor, to: StreamCursor, count_limit: u64, ) -> Result<StreamFrames, SdkError>
Read frames from this task’s attempt stream.
Forwards through the task’s Arc<dyn EngineBackend> — consumers
no longer need the ferriskey::Client leak (#87). See
EngineBackend::read_stream
for the backend-level contract.
Validation (count_limit bounds) runs at this boundary — out-of-range
input surfaces as SdkError::Config before reaching the backend.
Sourcepub async fn tail_stream(
&self,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, SdkError>
pub async fn tail_stream( &self, after: StreamCursor, block_ms: u64, count_limit: u64, ) -> Result<StreamFrames, SdkError>
Tail this task’s attempt stream for new frames.
See EngineBackend::tail_stream
for the head-of-line warning — the task’s backend is shared
with claim/complete/fail hot paths. Consumers that need
isolation should build a dedicated EngineBackend for tail
reads.
Sourcepub async fn tail_stream_with_visibility(
&self,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Result<StreamFrames, SdkError>
pub async fn tail_stream_with_visibility( &self, after: StreamCursor, block_ms: u64, count_limit: u64, visibility: TailVisibility, ) -> Result<StreamFrames, SdkError>
Tail with an explicit TailVisibility
filter (RFC-015 §6). Use [TailVisibility::ExcludeBestEffort]
(ff_core::backend::TailVisibility::ExcludeBestEffort) to drop
StreamMode::BestEffortLive
frames server-side.
Sourcepub fn is_lease_healthy(&self) -> bool
pub fn is_lease_healthy(&self) -> bool
Check if the lease is likely still valid based on renewal success.
Returns false if 3 or more consecutive renewal attempts have failed.
Workers should check this before committing expensive or irreversible
side effects. A false return means Valkey may have already expired
the lease and another worker could be processing this execution.
Sourcepub fn consecutive_renewal_failures(&self) -> u32
pub fn consecutive_renewal_failures(&self) -> u32
Number of consecutive lease renewal failures since the last success.
Returns 0 when renewals are working normally. Useful for observability and custom health policies beyond the default threshold of 3.
Sourcepub async fn delay_execution(
self,
delay_until: TimestampMs,
) -> Result<(), SdkError>
pub async fn delay_execution( self, delay_until: TimestampMs, ) -> Result<(), SdkError>
Delay the execution until delay_until.
Releases the lease. The execution moves to delayed state.
Consumes self — the task cannot be used after delay.
Sourcepub async fn move_to_waiting_children(self) -> Result<(), SdkError>
pub async fn move_to_waiting_children(self) -> Result<(), SdkError>
Move execution to waiting_children state.
Releases the lease. The execution waits for child dependencies to complete. Consumes self.
Sourcepub async fn complete(
self,
result_payload: Option<Vec<u8>>,
) -> Result<(), SdkError>
pub async fn complete( self, result_payload: Option<Vec<u8>>, ) -> Result<(), SdkError>
Complete the execution successfully.
Calls ff_complete_execution via FCALL, then stops lease renewal.
Renewal continues during the FCALL to prevent lease expiry under
network latency — the Lua fences on lease_id+epoch atomically.
Consumes self — the task cannot be used after completion.
§Connection errors and replay
If the Valkey connection drops after the Lua commit but before the
client reads the response, retrying complete() with the same
ClaimedTask (same lease_epoch + attempt_id) is safe: the SDK
reconciles the “already terminal with matching outcome” response
into Ok(()). A retry after a different terminal op has raced in
(e.g. operator cancel) surfaces as ExecutionNotActive with the
populated terminal_outcome so the caller can see what actually
happened.
Sourcepub async fn fail(
self,
reason: &str,
error_category: &str,
) -> Result<FailOutcome, SdkError>
pub async fn fail( self, reason: &str, error_category: &str, ) -> Result<FailOutcome, SdkError>
Fail the execution with a reason and error category.
If the execution policy allows retries, the engine schedules a retry
(delayed backoff). Otherwise, the execution becomes terminal failed.
Returns FailOutcome so the caller knows what happened.
§Connection errors and replay
fail() is replay-safe under the same conditions as complete():
a retry by the same caller matching lease_epoch + attempt_id is
reconciled into the outcome the server actually committed
(TerminalFailed if no retries left; RetryScheduled with
delay_until = 0 if a retry was scheduled — the exact delay is
not recovered on the replay path).
Sourcepub async fn cancel(self, reason: &str) -> Result<(), SdkError>
pub async fn cancel(self, reason: &str) -> Result<(), SdkError>
Cancel the execution.
Stops lease renewal, then calls ff_cancel_execution via FCALL.
Consumes self.
§Connection errors and replay
cancel() is replay-safe under the same conditions as complete():
a retry by the same caller matching lease_epoch + attempt_id
returns Ok(()) if the server’s stored terminal_outcome is
cancelled. A retry that finds a different outcome (because a
concurrent complete() or fail() won the race) surfaces as
ExecutionNotActive with the populated terminal_outcome so the
caller can see that the cancel intent was NOT honored.
Sourcepub async fn renew_lease(&self) -> Result<(), SdkError>
pub async fn renew_lease(&self) -> Result<(), SdkError>
Manually renew the lease.
RFC-012 Stage 1b. Forwards through the EngineBackend
trait. The background renewal task calls
backend.renew(&handle) directly (see spawn_renewal_task);
this method is the public entry point for workers that want
to force a renew out-of-band.
Sourcepub async fn update_progress(
&self,
pct: u8,
message: &str,
) -> Result<(), SdkError>
pub async fn update_progress( &self, pct: u8, message: &str, ) -> Result<(), SdkError>
Update progress (pct 0-100 and optional message).
RFC-012 Stage 1b. Forwards through the EngineBackend
trait (backend.progress(&handle, Some(pct), Some(msg))).
§Not for stream frames
update_progress writes the progress_percent / progress_message
fields on exec_core (the execution’s state hash). It is a
scalar heartbeat — each call overwrites the previous value and
nothing is appended to the output stream. Producers emitting
stream frames (arbitrary frame_type + payload, consumed via
ClaimedTask::read_stream / tail_stream or the HTTP
stream-tail routes) MUST use Self::append_frame instead;
update_progress is invisible to stream consumers.
Sourcepub async fn report_usage(
&self,
budget_id: &BudgetId,
dimensions: &[(&str, u64)],
dedup_key: Option<&str>,
) -> Result<ReportUsageResult, SdkError>
pub async fn report_usage( &self, budget_id: &BudgetId, dimensions: &[(&str, u64)], dedup_key: Option<&str>, ) -> Result<ReportUsageResult, SdkError>
Report usage against a budget and check limits.
Non-consuming — the worker can report usage multiple times.
dimensions is a slice of (dimension_name, delta) pairs.
dedup_key prevents double-counting on retries (auto-prefixed with budget hash tag).
RFC-012 §R7.2.3: forwards through
EngineBackend::report_usage.
The trait’s UsageDimensions.dedup_key carries the raw key;
the backend impl applies the usage_dedup_key(hash_tag, k)
wrap so the dedup state co-locates with the budget partition
(PR #108).
Sourcepub async fn create_pending_waitpoint(
&self,
waitpoint_key: &str,
expires_in_ms: u64,
) -> Result<(WaitpointId, WaitpointToken), SdkError>
pub async fn create_pending_waitpoint( &self, waitpoint_key: &str, expires_in_ms: u64, ) -> Result<(WaitpointId, WaitpointToken), SdkError>
Create a pending waitpoint for future signal delivery.
Non-consuming — the worker keeps the lease. Signals delivered to the
waitpoint are buffered. When the worker later calls suspend() with
use_pending_waitpoint, buffered signals may immediately satisfy the
resume condition.
Returns both the waitpoint_id AND the HMAC token required by external callers to buffer signals against this pending waitpoint (RFC-004 §Waitpoint Security).
RFC-012 §R7.2.2: forwards through
EngineBackend::create_waitpoint.
The trait returns
PendingWaitpoint whose
hmac_token is the same wire HMAC this method has always
produced; the SDK unwraps it back to the historical
(WaitpointId, WaitpointToken) tuple for caller-shape parity.
Sourcepub async fn append_frame(
&self,
frame_type: &str,
payload: &[u8],
metadata: Option<&str>,
) -> Result<AppendFrameOutcome, SdkError>
pub async fn append_frame( &self, frame_type: &str, payload: &[u8], metadata: Option<&str>, ) -> Result<AppendFrameOutcome, SdkError>
Append a frame to the current attempt’s output stream.
Non-consuming — the worker can append many frames during execution. The stream is created lazily on the first append.
RFC-012 §R7.2.1 / PR #146: forwards through the
EngineBackend trait. The free-form frame_type tag and
optional metadata (wire correlation_id) travel on the
extended ff_core::backend::Frame shape (frame_type: String, correlation_id: Option<String>), giving byte-for-byte
wire parity with the pre-migration direct-FCALL path.
§append_frame vs Self::update_progress
Stream-frame producers (arbitrary frame_type + payload
consumed via ClaimedTask::read_stream / tail_stream or the
HTTP stream-tail routes) MUST use append_frame.
update_progress writes scalar progress_percent /
progress_message fields to exec_core and is invisible to
stream consumers.
Sourcepub async fn append_frame_with_mode(
&self,
frame_type: &str,
payload: &[u8],
metadata: Option<&str>,
mode: StreamMode,
) -> Result<AppendFrameOutcome, SdkError>
pub async fn append_frame_with_mode( &self, frame_type: &str, payload: &[u8], metadata: Option<&str>, mode: StreamMode, ) -> Result<AppendFrameOutcome, SdkError>
Append a frame under an explicit RFC-015
StreamMode.
Defaults via Self::append_frame preserve pre-015 behaviour
(StreamMode::Durable).
Sourcepub async fn suspend(
self,
reason_code: SuspensionReasonCode,
resume_condition: ResumeCondition,
timeout: Option<(TimestampMs, TimeoutBehavior)>,
resume_policy: ResumePolicy,
) -> Result<SuspendedHandle, SdkError>
pub async fn suspend( self, reason_code: SuspensionReasonCode, resume_condition: ResumeCondition, timeout: Option<(TimestampMs, TimeoutBehavior)>, resume_policy: ResumePolicy, ) -> Result<SuspendedHandle, SdkError>
Strict suspend. Consumes self and yields a
SuspendedHandle or errors. On the early-satisfied path
(buffered signals already matched the condition), returns
EngineError::State(AlreadySatisfied) — use try_suspend when
that branch is part of the flow’s happy path.
RFC-013 §5.1 — this is the classic Rust foo / try_foo
split; suspend is the unconditional form. Defaults
WaitpointBinding::fresh() for the waitpoint. For pending
waitpoints previously issued via create_pending_waitpoint,
use try_suspend_on_pending.
Sourcepub async fn try_suspend(
self,
reason_code: SuspensionReasonCode,
resume_condition: ResumeCondition,
timeout: Option<(TimestampMs, TimeoutBehavior)>,
resume_policy: ResumePolicy,
) -> Result<TrySuspendOutcome, SdkError>
pub async fn try_suspend( self, reason_code: SuspensionReasonCode, resume_condition: ResumeCondition, timeout: Option<(TimestampMs, TimeoutBehavior)>, resume_policy: ResumePolicy, ) -> Result<TrySuspendOutcome, SdkError>
Fallible suspend. On the early-satisfied path, the
ClaimedTask is handed back unchanged so the worker can
continue running against the retained lease.
RFC-013 §5.1 — use this when consuming a pending waitpoint whose buffered signals may already match the condition.
Sourcepub async fn try_suspend_on_pending(
self,
pending: &PendingWaitpoint,
reason_code: SuspensionReasonCode,
resume_condition: ResumeCondition,
timeout: Option<(TimestampMs, TimeoutBehavior)>,
resume_policy: ResumePolicy,
) -> Result<TrySuspendOutcome, SdkError>
pub async fn try_suspend_on_pending( self, pending: &PendingWaitpoint, reason_code: SuspensionReasonCode, resume_condition: ResumeCondition, timeout: Option<(TimestampMs, TimeoutBehavior)>, resume_policy: ResumePolicy, ) -> Result<TrySuspendOutcome, SdkError>
Convenience: try_suspend against a pending waitpoint previously
issued via create_pending_waitpoint.
Sourcepub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError>
pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError>
Read the signals that satisfied the waitpoint and triggered this resume.
Non-consuming. Intended to be called immediately after re-claim via
crate::FlowFabricWorker::claim_from_reclaim_grant, before any
subsequent suspend() (which replaces suspension:current).
Returns Ok(vec![]) when this claim is NOT a signal-resume:
- No prior suspension on this execution.
- The prior suspension belonged to an earlier attempt (e.g. the attempt was cancelled/failed and a retry is now claiming).
- The prior suspension was closed by timeout / cancel / operator override rather than by a matched signal.
Reads suspension:current once, filters by attempt_index to
guard against stale prior-attempt records, then fetches the matched
signal_id set from waitpoint_condition’s matcher:N:signal_id
fields and reads each signal’s metadata + payload directly.
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for ClaimedTask
impl !RefUnwindSafe for ClaimedTask
impl Send for ClaimedTask
impl Sync for ClaimedTask
impl Unpin for ClaimedTask
impl UnsafeUnpin for ClaimedTask
impl !UnwindSafe for ClaimedTask
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more