Skip to main content

ClaimedTask

Struct ClaimedTask 

Source
pub struct ClaimedTask { /* private fields */ }
Expand description

A claimed execution with an active lease. The worker processes this task and must call one of complete(), fail(), or cancel() when done.

The lease is automatically renewed in the background at lease_ttl / 3 intervals. Renewal stops when the task is consumed or dropped.

complete, fail, and cancel consume self — this prevents double-complete bugs at the type level.

Implementations§

Source§

impl ClaimedTask

Source

pub fn execution_id(&self) -> &ExecutionId

Source

pub fn attempt_index(&self) -> AttemptIndex

Source

pub fn attempt_id(&self) -> &AttemptId

Source

pub fn lease_id(&self) -> &LeaseId

Source

pub fn lease_epoch(&self) -> LeaseEpoch

Source

pub fn input_payload(&self) -> &[u8]

Source

pub fn execution_kind(&self) -> &str

Source

pub fn tags(&self) -> &HashMap<String, String>

Source

pub fn lane_id(&self) -> &LaneId

Source

pub async fn read_stream( &self, from: StreamCursor, to: StreamCursor, count_limit: u64, ) -> Result<StreamFrames, SdkError>

Read frames from this task’s attempt stream.

Forwards through the task’s Arc<dyn EngineBackend> — consumers no longer need the ferriskey::Client leak (#87). See EngineBackend::read_stream for the backend-level contract.

Validation (count_limit bounds) runs at this boundary — out-of-range input surfaces as SdkError::Config before reaching the backend.

Source

pub async fn tail_stream( &self, after: StreamCursor, block_ms: u64, count_limit: u64, ) -> Result<StreamFrames, SdkError>

Tail this task’s attempt stream for new frames.

See EngineBackend::tail_stream for the head-of-line warning — the task’s backend is shared with claim/complete/fail hot paths. Consumers that need isolation should build a dedicated EngineBackend for tail reads.

Source

pub async fn tail_stream_with_visibility( &self, after: StreamCursor, block_ms: u64, count_limit: u64, visibility: TailVisibility, ) -> Result<StreamFrames, SdkError>

Tail with an explicit TailVisibility filter (RFC-015 §6). Use [TailVisibility::ExcludeBestEffort] (ff_core::backend::TailVisibility::ExcludeBestEffort) to drop StreamMode::BestEffortLive frames server-side.

Source

pub fn is_lease_healthy(&self) -> bool

Check if the lease is likely still valid based on renewal success.

Returns false if 3 or more consecutive renewal attempts have failed. Workers should check this before committing expensive or irreversible side effects. A false return means Valkey may have already expired the lease and another worker could be processing this execution.

Source

pub fn consecutive_renewal_failures(&self) -> u32

Number of consecutive lease renewal failures since the last success.

Returns 0 when renewals are working normally. Useful for observability and custom health policies beyond the default threshold of 3.

Source

pub async fn delay_execution( self, delay_until: TimestampMs, ) -> Result<(), SdkError>

Delay the execution until delay_until.

Releases the lease. The execution moves to delayed state. Consumes self — the task cannot be used after delay.

Source

pub async fn move_to_waiting_children(self) -> Result<(), SdkError>

Move execution to waiting_children state.

Releases the lease. The execution waits for child dependencies to complete. Consumes self.

Source

pub async fn complete( self, result_payload: Option<Vec<u8>>, ) -> Result<(), SdkError>

Complete the execution successfully.

Calls ff_complete_execution via FCALL, then stops lease renewal. Renewal continues during the FCALL to prevent lease expiry under network latency — the Lua fences on lease_id+epoch atomically. Consumes self — the task cannot be used after completion.

§Connection errors and replay

If the Valkey connection drops after the Lua commit but before the client reads the response, retrying complete() with the same ClaimedTask (same lease_epoch + attempt_id) is safe: the SDK reconciles the “already terminal with matching outcome” response into Ok(()). A retry after a different terminal op has raced in (e.g. operator cancel) surfaces as ExecutionNotActive with the populated terminal_outcome so the caller can see what actually happened.

Source

pub async fn fail( self, reason: &str, error_category: &str, ) -> Result<FailOutcome, SdkError>

Fail the execution with a reason and error category.

If the execution policy allows retries, the engine schedules a retry (delayed backoff). Otherwise, the execution becomes terminal failed. Returns FailOutcome so the caller knows what happened.

§Connection errors and replay

fail() is replay-safe under the same conditions as complete(): a retry by the same caller matching lease_epoch + attempt_id is reconciled into the outcome the server actually committed (TerminalFailed if no retries left; RetryScheduled with delay_until = 0 if a retry was scheduled — the exact delay is not recovered on the replay path).

Source

pub async fn cancel(self, reason: &str) -> Result<(), SdkError>

Cancel the execution.

Stops lease renewal, then calls ff_cancel_execution via FCALL. Consumes self.

§Connection errors and replay

cancel() is replay-safe under the same conditions as complete(): a retry by the same caller matching lease_epoch + attempt_id returns Ok(()) if the server’s stored terminal_outcome is cancelled. A retry that finds a different outcome (because a concurrent complete() or fail() won the race) surfaces as ExecutionNotActive with the populated terminal_outcome so the caller can see that the cancel intent was NOT honored.

Source

pub async fn renew_lease(&self) -> Result<(), SdkError>

Manually renew the lease.

RFC-012 Stage 1b. Forwards through the EngineBackend trait. The background renewal task calls backend.renew(&handle) directly (see spawn_renewal_task); this method is the public entry point for workers that want to force a renew out-of-band.

Source

pub async fn update_progress( &self, pct: u8, message: &str, ) -> Result<(), SdkError>

Update progress (pct 0-100 and optional message).

RFC-012 Stage 1b. Forwards through the EngineBackend trait (backend.progress(&handle, Some(pct), Some(msg))).

§Not for stream frames

update_progress writes the progress_percent / progress_message fields on exec_core (the execution’s state hash). It is a scalar heartbeat — each call overwrites the previous value and nothing is appended to the output stream. Producers emitting stream frames (arbitrary frame_type + payload, consumed via ClaimedTask::read_stream / tail_stream or the HTTP stream-tail routes) MUST use Self::append_frame instead; update_progress is invisible to stream consumers.

Source

pub async fn report_usage( &self, budget_id: &BudgetId, dimensions: &[(&str, u64)], dedup_key: Option<&str>, ) -> Result<ReportUsageResult, SdkError>

Report usage against a budget and check limits.

Non-consuming — the worker can report usage multiple times. dimensions is a slice of (dimension_name, delta) pairs. dedup_key prevents double-counting on retries (auto-prefixed with budget hash tag).

RFC-012 §R7.2.3: forwards through EngineBackend::report_usage. The trait’s UsageDimensions.dedup_key carries the raw key; the backend impl applies the usage_dedup_key(hash_tag, k) wrap so the dedup state co-locates with the budget partition (PR #108).

Source

pub async fn create_pending_waitpoint( &self, waitpoint_key: &str, expires_in_ms: u64, ) -> Result<(WaitpointId, WaitpointToken), SdkError>

Create a pending waitpoint for future signal delivery.

Non-consuming — the worker keeps the lease. Signals delivered to the waitpoint are buffered. When the worker later calls suspend() with use_pending_waitpoint, buffered signals may immediately satisfy the resume condition.

Returns both the waitpoint_id AND the HMAC token required by external callers to buffer signals against this pending waitpoint (RFC-004 §Waitpoint Security).

RFC-012 §R7.2.2: forwards through EngineBackend::create_waitpoint. The trait returns PendingWaitpoint whose hmac_token is the same wire HMAC this method has always produced; the SDK unwraps it back to the historical (WaitpointId, WaitpointToken) tuple for caller-shape parity.

Source

pub async fn append_frame( &self, frame_type: &str, payload: &[u8], metadata: Option<&str>, ) -> Result<AppendFrameOutcome, SdkError>

Append a frame to the current attempt’s output stream.

Non-consuming — the worker can append many frames during execution. The stream is created lazily on the first append.

RFC-012 §R7.2.1 / PR #146: forwards through the EngineBackend trait. The free-form frame_type tag and optional metadata (wire correlation_id) travel on the extended ff_core::backend::Frame shape (frame_type: String, correlation_id: Option<String>), giving byte-for-byte wire parity with the pre-migration direct-FCALL path.

§append_frame vs Self::update_progress

Stream-frame producers (arbitrary frame_type + payload consumed via ClaimedTask::read_stream / tail_stream or the HTTP stream-tail routes) MUST use append_frame. update_progress writes scalar progress_percent / progress_message fields to exec_core and is invisible to stream consumers.

Source

pub async fn append_frame_with_mode( &self, frame_type: &str, payload: &[u8], metadata: Option<&str>, mode: StreamMode, ) -> Result<AppendFrameOutcome, SdkError>

Append a frame under an explicit RFC-015 StreamMode. Defaults via Self::append_frame preserve pre-015 behaviour (StreamMode::Durable).

Source

pub async fn suspend( self, reason_code: SuspensionReasonCode, resume_condition: ResumeCondition, timeout: Option<(TimestampMs, TimeoutBehavior)>, resume_policy: ResumePolicy, ) -> Result<SuspendedHandle, SdkError>

Strict suspend. Consumes self and yields a SuspendedHandle or errors. On the early-satisfied path (buffered signals already matched the condition), returns EngineError::State(AlreadySatisfied) — use try_suspend when that branch is part of the flow’s happy path.

RFC-013 §5.1 — this is the classic Rust foo / try_foo split; suspend is the unconditional form. Defaults WaitpointBinding::fresh() for the waitpoint. For pending waitpoints previously issued via create_pending_waitpoint, use try_suspend_on_pending.

Source

pub async fn try_suspend( self, reason_code: SuspensionReasonCode, resume_condition: ResumeCondition, timeout: Option<(TimestampMs, TimeoutBehavior)>, resume_policy: ResumePolicy, ) -> Result<TrySuspendOutcome, SdkError>

Fallible suspend. On the early-satisfied path, the ClaimedTask is handed back unchanged so the worker can continue running against the retained lease.

RFC-013 §5.1 — use this when consuming a pending waitpoint whose buffered signals may already match the condition.

Source

pub async fn try_suspend_on_pending( self, pending: &PendingWaitpoint, reason_code: SuspensionReasonCode, resume_condition: ResumeCondition, timeout: Option<(TimestampMs, TimeoutBehavior)>, resume_policy: ResumePolicy, ) -> Result<TrySuspendOutcome, SdkError>

Convenience: try_suspend against a pending waitpoint previously issued via create_pending_waitpoint.

Source

pub async fn resume_signals(&self) -> Result<Vec<ResumeSignal>, SdkError>

Read the signals that satisfied the waitpoint and triggered this resume.

Non-consuming. Intended to be called immediately after re-claim via crate::FlowFabricWorker::claim_from_reclaim_grant, before any subsequent suspend() (which replaces suspension:current).

Returns Ok(vec![]) when this claim is NOT a signal-resume:

  • No prior suspension on this execution.
  • The prior suspension belonged to an earlier attempt (e.g. the attempt was cancelled/failed and a retry is now claiming).
  • The prior suspension was closed by timeout / cancel / operator override rather than by a matched signal.

Reads suspension:current once, filters by attempt_index to guard against stale prior-attempt records, then fetches the matched signal_id set from waitpoint_condition’s matcher:N:signal_id fields and reads each signal’s metadata + payload directly.

Trait Implementations§

Source§

impl Drop for ClaimedTask

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more