Skip to main content

EngineBackend

Trait EngineBackend 

Source
pub trait EngineBackend:
    Send
    + Sync
    + 'static {
Show 30 methods // Required methods fn claim<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, lane: &'life1 LaneId, capabilities: &'life2 CapabilitySet, policy: ClaimPolicy, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn renew<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn progress<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, percent: Option<u8>, message: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn append_frame<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, frame: Frame, ) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn complete<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, payload: Option<Vec<u8>>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn fail<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: FailureReason, classification: FailureClass, ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn cancel<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn suspend<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, args: SuspendArgs, ) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoint_key: &'life2 str, expires_in: Duration, ) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn observe_signals<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn claim_from_reclaim<'life0, 'async_trait>( &'life0 self, token: ReclaimToken, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn delay<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, delay_until: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn wait_children<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn describe_execution<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn describe_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn list_edges<'life0, 'life1, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, direction: EdgeDirection, ) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn describe_edge<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, edge_id: &'life2 EdgeId, ) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>( &'life0 self, eid: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn list_flows<'life0, 'async_trait>( &'life0 self, partition: PartitionKey, cursor: Option<FlowId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn list_lanes<'life0, 'async_trait>( &'life0 self, cursor: Option<LaneId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn list_suspended<'life0, 'async_trait>( &'life0 self, partition: PartitionKey, cursor: Option<ExecutionId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn list_executions<'life0, 'async_trait>( &'life0 self, partition: PartitionKey, cursor: Option<ExecutionId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn deliver_signal<'life0, 'async_trait>( &'life0 self, args: DeliverSignalArgs, ) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn claim_resumed_execution<'life0, 'async_trait>( &'life0 self, args: ClaimResumedExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn cancel_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, policy: CancelFlowPolicy, wait: CancelFlowWait, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, downstream_execution_id: &'life2 ExecutionId, policy: EdgeDependencyPolicy, ) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn report_usage<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, budget: &'life2 BudgetId, dimensions: UsageDimensions, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn read_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, from: StreamCursor, to: StreamCursor, count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn tail_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, after: StreamCursor, block_ms: u64, count_limit: u64, visibility: TailVisibility, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn read_summary<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, ) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait;
}
Expand description

The engine write surface — a single trait a backend implementation honours to serve a FlowFabricWorker.

See RFC-012 §3.1 for the inventory rationale and §3.3 for the type-level shape. 16 methods (Round-7 added create_waitpoint; append_frame return widened; report_usage return replaced — RFC-012 §R7). Issue #150 added the two trigger-surface methods (deliver_signal / claim_resumed_execution).

§Note on complete payload shape

The RFC §3.3 sketch uses Option<Bytes>; the Stage 1a trait uses Option<Vec<u8>> to match the existing ff_sdk::ClaimedTask::complete signature and avoid adding a bytes public-type dep for zero consumer benefit. Round-4 §7.17 resolved the payload container debate to Box<[u8]> in the public type (see HandleOpaque); Option<Vec<u8>> is the zero-churn choice consistent with today’s code. Consumers that need &[u8] can borrow via .as_deref() on the Option.

Required Methods§

Source

fn claim<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, lane: &'life1 LaneId, capabilities: &'life2 CapabilitySet, policy: ClaimPolicy, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Fresh-work claim. Returns Ok(None) when no work is currently available; Err only on transport or input-validation faults.

Source

fn renew<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Renew a held lease. Returns the updated expiry + epoch on success; typed State::StaleLease / State::LeaseExpired when the lease has been stolen or timed out.

Source

fn progress<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, percent: Option<u8>, message: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Numeric-progress heartbeat.

Writes scalar progress_percent / progress_message fields on exec_core; each call overwrites the previous value. This does NOT append to the output stream — stream-frame producers must use append_frame instead.

Source

fn append_frame<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, frame: Frame, ) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Append one stream frame. Distinct from progress per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry id and post-append frame count (RFC-012 §R7.2.1).

Stream-frame producers (arbitrary frame_type + payload, consumed via the read/tail surfaces) MUST use this method rather than progress; the latter updates scalar fields on exec_core and is invisible to stream consumers.

Source

fn complete<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, payload: Option<Vec<u8>>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Terminal success. Borrows handle (round-4 M-D2) so callers can retry under EngineError::Transport without losing the cookie. Payload is Option<Vec<u8>> per the note above.

Source

fn fail<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: FailureReason, classification: FailureClass, ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Terminal failure with classification. Returns FailOutcome so the caller learns whether a retry was scheduled.

Source

fn cancel<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Cooperative cancel by the worker holding the lease.

Source

fn suspend<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, args: SuspendArgs, ) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Suspend the execution awaiting a typed resume condition (RFC-013 Stage 1d).

Borrows handle (round-4 M-D2). Terminal-looking behaviour is expressed through SuspendOutcome:

  • SuspendOutcome::Suspended — the pre-suspend handle is logically invalidated; the fresh HandleKind::Suspended handle inside the variant supersedes it. Runtime enforcement via the fence triple: subsequent ops against the stale handle surface as Contention(LeaseConflict).
  • SuspendOutcome::AlreadySatisfied — buffered signals on a pending waitpoint already matched the resume condition at suspension time. The lease is NOT released; the caller’s pre-suspend handle remains valid.

See RFC-013 §2 for the type shapes, §3 for the replay / idempotency contract, §4 for the error taxonomy.

Source

fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoint_key: &'life2 str, expires_in: Duration, ) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Issue a pending waitpoint for future signal delivery.

Waitpoints have two states in the Valkey wire contract: pending (token issued, not yet backing a suspension) and active (bound to a suspension). This method creates a waitpoint in the pending state. A later suspend call transitions a pending waitpoint to active (see Lua use_pending_waitpoint ARGV flag at flowfabric.lua:3603,3641,3690) — or, if buffered signals already satisfy its condition, the suspend call returns SuspendOutcome::AlreadySatisfied and the waitpoint activates without ever releasing the lease.

Pending-waitpoint expiry is a first-class terminal error on the wire (PendingWaitpointExpired at ff-script/src/error.rs:170,403-408). The attempt retains its lease while the waitpoint is pending; signals delivered to this waitpoint are buffered server-side (RFC-012 §R7.2.2).

Source

fn observe_signals<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Non-mutating observation of signals that satisfied the handle’s resume condition.

Source

fn claim_from_reclaim<'life0, 'async_trait>( &'life0 self, token: ReclaimToken, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Consume a reclaim grant to mint a resumed-kind handle. Returns Ok(None) when the grant’s target execution is no longer resumable (already reclaimed, terminal, etc.).

Source

fn delay<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, delay_until: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Park the execution until delay_until, releasing the lease.

Source

fn wait_children<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Mark the execution as waiting for its child flow to complete, releasing the lease.

Source

fn describe_execution<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Snapshot an execution by id. Ok(None) ⇒ no such execution.

Source

fn describe_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Snapshot a flow by id. Ok(None) ⇒ no such flow.

Source

fn list_edges<'life0, 'life1, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, direction: EdgeDirection, ) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List dependency edges adjacent to an execution. Read-only; the backend resolves the subject execution’s flow, reads the direction-specific adjacency SET, and decodes each member’s flow-scoped edge:<edge_id> hash.

Returns an empty Vec when the subject has no edges on the requested side — including standalone executions (no owning flow). Ordering is unspecified: the underlying adjacency SET is an unordered SMEMBERS read. Callers that need deterministic order should sort by EdgeSnapshot::edge_id / EdgeSnapshot::created_at themselves.

Parse failures on the edge hash surface as EngineError::Validation { kind: ValidationKind::Corruption, .. } — unknown fields, missing required fields, endpoint mismatches against the adjacency SET all fail loud rather than silently returning partial results.

Gated on the core feature — edge reads are part of the minimal engine surface a Postgres-style backend must honour.

Source

fn describe_edge<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, edge_id: &'life2 EdgeId, ) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Snapshot a single dependency edge by its owning flow + edge id.

Ok(None) when the edge hash is absent (never staged, or staged under a different flow than flow_id). Parse failures on a present edge hash surface as EngineError::Validation { kind: ValidationKind::Corruption, .. } — the stored flow_id field is cross-checked against the caller’s expected flow_id so a wrong-key read fails loud rather than returning an unrelated edge.

Gated on the core feature — single-edge reads are part of the minimal snapshot surface an alternate backend must honour alongside Self::describe_execution / Self::describe_flow / Self::list_edges.

Source

fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>( &'life0 self, eid: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Resolve an execution’s owning flow id, if any.

Ok(None) when the execution’s core record is absent or has no associated flow (standalone execution). A present-but- malformed flow_id field surfaces as EngineError::Validation { kind: ValidationKind::Corruption, .. }.

Gated on the core feature. Used by ff-sdk’s list_outgoing_edges / list_incoming_edges to pivot from a consumer-supplied ExecutionId to the FlowId required by Self::list_edges. A Valkey backend serves this with a single HGET exec_core flow_id; a Postgres backend serves it with the equivalent single-column row lookup.

Source

fn list_flows<'life0, 'async_trait>( &'life0 self, partition: PartitionKey, cursor: Option<FlowId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List flows on a partition with cursor-based pagination (issue #185).

Returns a ListFlowsPage of FlowSummary rows ordered by flow_id (UUID byte-lexicographic). cursor is None for the first page; callers forward the returned next_cursor verbatim to continue iteration, and the listing is exhausted when next_cursor is None. limit is the maximum number of rows to return on this page — implementations MAY return fewer (end of partition) but MUST NOT exceed it.

Ordering rationale: flow ids are UUIDs, and both Valkey (sort after-the-fact) and Postgres (ORDER BY flow_id) can agree on byte-lexicographic order — the same order FlowId::to_string() produces for canonical hyphenated UUIDs. Mapping to cursor > flow_id keeps the contract backend- independent.

§Postgres implementation pattern

A Postgres-backed implementation serves this directly with

SELECT flow_id, created_at_ms, public_flow_state
  FROM ff_flow
 WHERE partition_key = $1
   AND ($2::uuid IS NULL OR flow_id > $2)
 ORDER BY flow_id
 LIMIT $3 + 1;

— reading one extra row to decide whether next_cursor should be set to the last row’s flow_id. The Valkey implementation maintains the ff:idx:{fp:N}:flow_index SET and performs the sort + slice client-side (SMEMBERS then sort-by-UUID-bytes), pipelining HGETALL flow_core for each row on the page.

Gated on the core feature — flow listing is part of the minimal engine surface a Postgres-style backend must honour.

Source

fn list_lanes<'life0, 'async_trait>( &'life0 self, cursor: Option<LaneId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enumerate registered lanes with cursor-based pagination.

Lanes are global (not partition-scoped) — the backend serves this from its lane registry and does NOT accept a crate::partition::Partition argument. Results are sorted by LaneId name so the ordering is stable across calls and cursors address a deterministic position in the sort.

  • cursor — exclusive lower bound. None starts from the first lane. To continue a walk, pass the previous page’s ListLanesPage::next_cursor.
  • limit — hard cap on the number of lanes returned in the page. Backends MAY round this down when the registry size is smaller; they MUST NOT return more than limit.

ListLanesPage::next_cursor is Some(last_lane_in_page) iff at least one more lane exists after the returned page, and None on the final page. Callers loop until next_cursor is None to read the full registry.

Gated on the core feature — lane enumeration is part of the minimal snapshot surface an alternate backend must honour alongside Self::describe_flow / Self::list_edges.

Source

fn list_suspended<'life0, 'async_trait>( &'life0 self, partition: PartitionKey, cursor: Option<ExecutionId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List suspended executions in one partition, cursor-paginated, with each entry’s suspension reason_code populated (issue #183).

Consumer-facing “what’s blocked on what?” panels (ff-board’s suspended-executions view, operator CLIs) need the reason in the list response so the UI does not round-trip per row to describe_execution for a field it knows it needs. reason on [SuspendedExecutionEntry] carries the free-form suspension:current.reason_code field — see the type rustdoc for the String-not-enum rationale.

cursor is opaque to callers; pass None to start a fresh scan and feed the returned ListSuspendedPage::next_cursor back in on subsequent pages until it comes back None. limit bounds the entries count; backends MAY return fewer when the partition is exhausted.

Ordering is by ascending suspended_at_ms (the per-lane suspended ZSET score == timeout_at or the no-timeout sentinel) with execution id as a lex tiebreak, so cursor continuation is deterministic across calls.

Gated on the core feature — suspended-list enumeration is part of the minimal engine surface a Postgres-style backend must honour.

Source

fn list_executions<'life0, 'async_trait>( &'life0 self, partition: PartitionKey, cursor: Option<ExecutionId>, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Forward-only paginated listing of the executions indexed under one partition.

Reads the partition-wide ff:idx:{p:N}:all_executions set, sorts lexicographically on ExecutionId, and returns the page of ids strictly greater than cursor (or starting from the smallest id when cursor = None). The returned ListExecutionsPage::next_cursor is the last id on the page iff at least one more id exists past it; None signals end-of-stream.

limit is the maximum number of ids returned on this page. A limit of 0 returns an empty page with next_cursor = None. Backends MAY cap limit internally (Valkey: 1000) and return fewer ids than requested; callers continue paginating until next_cursor == None.

Ordering is stable under concurrent inserts for already-emitted ids (an id less-than-or-equal-to the caller’s cursor is never re-emitted in later pages) but new inserts past the cursor WILL appear in subsequent pages — consistent with forward-only cursor semantics.

Gated on the core feature — partition-scoped listing is part of the minimal engine surface every backend must honour.

Source

fn deliver_signal<'life0, 'async_trait>( &'life0 self, args: DeliverSignalArgs, ) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Deliver an external signal to a suspended execution’s waitpoint.

The backend atomically records the signal, evaluates the resume condition, and — when satisfied — transitions the execution from suspended to runnable (or buffers the signal when the waitpoint is still pending). Duplicate delivery — same idempotency_key + waitpoint — surfaces as DeliverSignalResult::Duplicate with the pre-existing signal_id rather than mutating state twice.

Input validation (HMAC token presence, payload size limits, signal-name shape) is the backend’s responsibility; callers pass a fully populated DeliverSignalArgs and receive typed outcomes or typed errors (ScriptError::invalid_token, ScriptError::token_expired, ScriptError::ExecutionNotFound surfaced via EngineError::Transport on the Valkey backend).

Gated on the core feature — signal delivery is part of the minimal trigger surface every backend must honour so ff-server / REST handlers can dispatch against Arc<dyn EngineBackend> without knowing which backend is running underneath.

Source

fn claim_resumed_execution<'life0, 'async_trait>( &'life0 self, args: ClaimResumedExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Claim a resumed execution — a previously-suspended attempt that has cleared its resume condition (e.g. via Self::deliver_signal) and now needs a worker to pick up the same attempt index.

Distinct from Self::claim (fresh work) and Self::claim_from_reclaim (grant-based ownership transfer after a crash): the resumed-claim path re-binds an existing attempt rather than minting a new one. The backend issues a fresh lease_id + bumps the lease_epoch, preserving attempt_id / attempt_index so stream frames and progress updates continue on the same attempt.

Typed failures surface via ScriptErrorEngineError: NotAResumedExecution when the attempt state is not attempt_interrupted, ExecutionNotLeaseable when the lifecycle phase is not runnable, and InvalidClaimGrant when the grant key is missing or was already consumed.

Gated on the core feature — resumed-claim is part of the minimal trigger surface every backend must honour.

Source

fn cancel_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, policy: CancelFlowPolicy, wait: CancelFlowWait, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Operator-initiated cancellation of a flow and (optionally) its member executions. See RFC-012 §3.1.1 for the policy /wait matrix.

Source

fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, flow_id: &'life1 FlowId, downstream_execution_id: &'life2 ExecutionId, policy: EdgeDependencyPolicy, ) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

RFC-016 Stage A: set the inbound-edge-group policy for a downstream execution. Must be called before the first add_dependency(... -> downstream_execution_id) — the backend rejects with EngineError::Conflict if edges have already been staged for this group.

Stage A honours only EdgeDependencyPolicy::AllOf; the AnyOf / Quorum variants return EngineError::Validation with detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B" until Stage B’s resolver lands.

Source

fn report_usage<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, budget: &'life2 BudgetId, dimensions: UsageDimensions, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Report usage against a budget and check limits. Returns the typed ReportUsageResult variant; backends enforce idempotency via the caller-supplied [UsageDimensions::dedup_key] (RFC-012 §R7.2.3 — replaces the pre-Round-7 AdmissionDecision return).

Source

fn read_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, from: StreamCursor, to: StreamCursor, count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read frames from a completed or in-flight attempt’s stream.

from / to are StreamCursor values — StreamCursor::Start / StreamCursor::End are equivalent to XRANGE - / +, and StreamCursor::At("<id>") reads from a concrete entry id.

Input validation (count_limit bounds, cursor shape) is the caller’s responsibility — SDK-side wrappers in ff-sdk enforce bounds before forwarding. Backends MAY additionally reject out-of-range input via EngineError::Validation.

Gated on the streaming feature — stream reads are part of the stream-subset surface a backend without XREAD-like primitives may omit.

Source

fn tail_stream<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, after: StreamCursor, block_ms: u64, count_limit: u64, visibility: TailVisibility, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Tail a live attempt’s stream.

after is an exclusive StreamCursor — entries with id strictly greater than after are returned. StreamCursor::Start / StreamCursor::End are NOT accepted here; callers MUST pass a concrete id (or StreamCursor::from_beginning()). The SDK wrapper rejects the open markers before reaching the backend.

block_ms == 0 → non-blocking peek. block_ms > 0 → blocks up to that many ms for a new entry.

visibility (RFC-015 §6.1) filters the returned entries by their stored StreamMode mode field. Default TailVisibility::All preserves v1 behaviour.

Gated on the streaming feature — see read_stream.

Source

fn read_summary<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, attempt_index: AttemptIndex, ) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the rolling summary document for an attempt (RFC-015 §6.3).

Returns Ok(None) when no StreamMode::DurableSummary frame has ever been appended for the attempt. Non-blocking Hash read; safe to call from any consumer without holding the lease.

Gated on the streaming feature — summary reads are part of the stream-subset surface.

Implementors§