Skip to main content

EngineBackend

Trait EngineBackend 

Source
pub trait EngineBackend:
    Send
    + Sync
    + 'static {
Show 20 methods // Required methods fn claim<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, lane: &'life1 LaneId, capabilities: &'life2 CapabilitySet, policy: ClaimPolicy, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn renew<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn progress<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, percent: Option<u8>, message: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn append_frame<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, frame: Frame, ) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn complete<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, payload: Option<Vec<u8>>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn fail<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: FailureReason, classification: FailureClass, ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn cancel<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn suspend<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoints: Vec<WaitpointSpec>, timeout: Option<Duration>, ) -> Pin<Box<dyn Future<Output = Result<Handle, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoint_key: &'life2 str, expires_in: Duration, ) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn observe_signals<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn claim_from_reclaim<'life0, 'async_trait>( &'life0 self, token: ReclaimToken, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn delay<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, delay_until: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn wait_children<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn describe_execution<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn describe_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn list_edges<'life0, 'life1, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, direction: EdgeDirection, ) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn cancel_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, policy: CancelFlowPolicy, wait: CancelFlowWait, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn report_usage<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, budget: &'life2 BudgetId, dimensions: UsageDimensions, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn read_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, from: StreamCursor, to: StreamCursor, count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn tail_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, after: StreamCursor, block_ms: u64, count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait;
}
Expand description

The engine write surface — a single trait a backend implementation honours to serve a FlowFabricWorker.

See RFC-012 §3.1 for the inventory rationale and §3.3 for the type-level shape. 16 methods (Round-7 added create_waitpoint; append_frame return widened; report_usage return replaced — RFC-012 §R7).

§Note on complete payload shape

The RFC §3.3 sketch uses Option<Bytes>; the Stage 1a trait uses Option<Vec<u8>> to match the existing ff_sdk::ClaimedTask::complete signature and avoid adding a bytes public-type dep for zero consumer benefit. Round-4 §7.17 resolved the payload container debate to Box<[u8]> in the public type (see HandleOpaque); Option<Vec<u8>> is the zero-churn choice consistent with today’s code. Consumers that need &[u8] can borrow via .as_deref() on the Option.

Required Methods§

Source

fn claim<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, lane: &'life1 LaneId, capabilities: &'life2 CapabilitySet, policy: ClaimPolicy, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Fresh-work claim. Returns Ok(None) when no work is currently available; Err only on transport or input-validation faults.

Source

fn renew<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Renew a held lease. Returns the updated expiry + epoch on success; typed State::StaleLease / State::LeaseExpired when the lease has been stolen or timed out.

Source

fn progress<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, percent: Option<u8>, message: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Numeric-progress heartbeat.

Source

fn append_frame<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, frame: Frame, ) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Append one stream frame. Distinct from progress per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry id and post-append frame count (RFC-012 §R7.2.1).

Source

fn complete<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, payload: Option<Vec<u8>>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Terminal success. Borrows handle (round-4 M-D2) so callers can retry under EngineError::Transport without losing the cookie. Payload is Option<Vec<u8>> per the note above.

Source

fn fail<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: FailureReason, classification: FailureClass, ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Terminal failure with classification. Returns FailOutcome so the caller learns whether a retry was scheduled.

Source

fn cancel<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Cooperative cancel by the worker holding the lease.

Source

fn suspend<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoints: Vec<WaitpointSpec>, timeout: Option<Duration>, ) -> Pin<Box<dyn Future<Output = Result<Handle, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Suspend the execution awaiting one or more waitpoints. Returns a fresh Handle whose HandleKind::Suspended supersedes the caller’s pre-suspend handle.

Source

fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoint_key: &'life2 str, expires_in: Duration, ) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Issue a pending waitpoint for future signal delivery.

Waitpoints have two states in the Valkey wire contract: pending (token issued, not yet backing a suspension) and active (bound to a suspension). This method creates a waitpoint in the pending state. A later suspend call transitions a pending waitpoint to active (see Lua use_pending_waitpoint ARGV flag at flowfabric.lua:3603,3641,3690) — or, if buffered signals already satisfy its condition, the suspend call returns SuspendOutcome::AlreadySatisfied and the waitpoint activates without ever releasing the lease.

Pending-waitpoint expiry is a first-class terminal error on the wire (PendingWaitpointExpired at ff-script/src/error.rs:170,403-408). The attempt retains its lease while the waitpoint is pending; signals delivered to this waitpoint are buffered server-side (RFC-012 §R7.2.2).

Source

fn observe_signals<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Non-mutating observation of signals that satisfied the handle’s resume condition.

Source

fn claim_from_reclaim<'life0, 'async_trait>( &'life0 self, token: ReclaimToken, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Consume a reclaim grant to mint a resumed-kind handle. Returns Ok(None) when the grant’s target execution is no longer resumable (already reclaimed, terminal, etc.).

Source

fn delay<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, delay_until: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Park the execution until delay_until, releasing the lease.

Source

fn wait_children<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Mark the execution as waiting for its child flow to complete, releasing the lease.

Source

fn describe_execution<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Snapshot an execution by id. Ok(None) ⇒ no such execution.

Source

fn describe_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Snapshot a flow by id. Ok(None) ⇒ no such flow.

Source

fn list_edges<'life0, 'life1, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, direction: EdgeDirection, ) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List dependency edges adjacent to an execution. Read-only; the backend resolves the subject execution’s flow, reads the direction-specific adjacency SET, and decodes each member’s flow-scoped edge:<edge_id> hash.

Returns an empty Vec when the subject has no edges on the requested side — including standalone executions (no owning flow). Ordering is unspecified: the underlying adjacency SET is an unordered SMEMBERS read. Callers that need deterministic order should sort by EdgeSnapshot::edge_id / EdgeSnapshot::created_at themselves.

Parse failures on the edge hash surface as EngineError::Validation { kind: ValidationKind::Corruption, .. } — unknown fields, missing required fields, endpoint mismatches against the adjacency SET all fail loud rather than silently returning partial results.

Gated on the core feature — edge reads are part of the minimal engine surface a Postgres-style backend must honour.

Source

fn cancel_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, policy: CancelFlowPolicy, wait: CancelFlowWait, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Operator-initiated cancellation of a flow and (optionally) its member executions. See RFC-012 §3.1.1 for the policy /wait matrix.

Source

fn report_usage<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, budget: &'life2 BudgetId, dimensions: UsageDimensions, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Report usage against a budget and check limits. Returns the typed ReportUsageResult variant; backends enforce idempotency via the caller-supplied [UsageDimensions::dedup_key] (RFC-012 §R7.2.3 — replaces the pre-Round-7 AdmissionDecision return).

Source

fn read_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, from: StreamCursor, to: StreamCursor, count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read frames from a completed or in-flight attempt’s stream.

from / to are StreamCursor values — StreamCursor::Start / StreamCursor::End are equivalent to XRANGE - / +, and StreamCursor::At("<id>") reads from a concrete entry id.

Input validation (count_limit bounds, cursor shape) is the caller’s responsibility — SDK-side wrappers in ff-sdk enforce bounds before forwarding. Backends MAY additionally reject out-of-range input via EngineError::Validation.

Gated on the streaming feature — stream reads are part of the stream-subset surface a backend without XREAD-like primitives may omit.

Source

fn tail_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, after: StreamCursor, block_ms: u64, count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Tail a live attempt’s stream.

after is an exclusive StreamCursor — entries with id strictly greater than after are returned. StreamCursor::Start / StreamCursor::End are NOT accepted here; callers MUST pass a concrete id (or StreamCursor::from_beginning()). The SDK wrapper rejects the open markers before reaching the backend.

block_ms == 0 → non-blocking peek. block_ms > 0 → blocks up to that many ms for a new entry.

Gated on the streaming feature — see read_stream.

Implementors§