pub trait EngineBackend:
Send
+ Sync
+ 'static {
Show 63 methods
// Required methods
fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn claim_from_reclaim<'life0, 'async_trait>(
&'life0 self,
token: ReclaimToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn list_flows<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn list_lanes<'life0, 'async_trait>(
&'life0 self,
cursor: Option<LaneId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn list_suspended<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn list_executions<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
downstream_execution_id: &'life2 ExecutionId,
policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
// Provided methods
fn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn backend_label(&self) -> &'static str { ... }
fn capabilities(&self) -> Capabilities { ... }
fn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_instance_tags<'life0, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
) -> Pin<Box<dyn Future<Output = Result<InstanceTagSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
}Expand description
The engine write surface — a single trait a backend implementation
honours to serve a FlowFabricWorker.
See RFC-012 §3.1 for the inventory rationale and §3.3 for the
type-level shape. 16 methods (Round-7 added create_waitpoint;
append_frame return widened; report_usage return replaced —
RFC-012 §R7). Issue #150 added the two trigger-surface methods
(deliver_signal / claim_resumed_execution).
§Note on complete payload shape
The RFC §3.3 sketch uses Option<Bytes>; the Stage 1a trait uses
Option<Vec<u8>> to match the existing
ff_sdk::ClaimedTask::complete signature and avoid adding a
bytes public-type dep for zero consumer benefit. Round-4 §7.17
resolved the payload container debate to Box<[u8]> in the
public type (see HandleOpaque); Option<Vec<u8>> is the
zero-churn choice consistent with today’s code. Consumers that
need &[u8] can borrow via .as_deref() on the Option.
Required Methods§
Sourcefn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Fresh-work claim. Returns Ok(None) when no work is currently
available; Err only on transport or input-validation faults.
Sourcefn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Renew a held lease. Returns the updated expiry + epoch on
success; typed State::StaleLease / State::LeaseExpired
when the lease has been stolen or timed out.
Sourcefn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Numeric-progress heartbeat.
Writes scalar progress_percent / progress_message fields on
exec_core; each call overwrites the previous value. This does
NOT append to the output stream — stream-frame producers must use
append_frame instead.
Sourcefn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Append one stream frame. Distinct from progress
per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
id and post-append frame count (RFC-012 §R7.2.1).
Stream-frame producers (arbitrary frame_type + payload, consumed
via the read/tail surfaces) MUST use this method rather than
progress; the latter updates scalar fields on
exec_core and is invisible to stream consumers.
Sourcefn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Terminal success. Borrows handle (round-4 M-D2) so callers
can retry under EngineError::Transport without losing the
cookie. Payload is Option<Vec<u8>> per the note above.
Sourcefn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Terminal failure with classification. Returns FailOutcome
so the caller learns whether a retry was scheduled.
Sourcefn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Cooperative cancel by the worker holding the lease.
Sourcefn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Suspend the execution awaiting a typed resume condition (RFC-013 Stage 1d).
Borrows handle (round-4 M-D2). Terminal-looking behaviour is
expressed through SuspendOutcome:
SuspendOutcome::Suspended— the pre-suspend handle is logically invalidated; the freshHandleKind::Suspendedhandle inside the variant supersedes it. Runtime enforcement via the fence triple: subsequent ops against the stale handle surface asContention(LeaseConflict).SuspendOutcome::AlreadySatisfied— buffered signals on a pending waitpoint already matched the resume condition at suspension time. The lease is NOT released; the caller’s pre-suspend handle remains valid.
See RFC-013 §2 for the type shapes, §3 for the replay / idempotency contract, §4 for the error taxonomy.
Sourcefn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Issue a pending waitpoint for future signal delivery.
Waitpoints have two states in the Valkey wire contract:
pending (token issued, not yet backing a suspension) and
active (bound to a suspension). This method creates a
waitpoint in the pending state. A later suspend call
transitions a pending waitpoint to active (see Lua
use_pending_waitpoint ARGV flag at
flowfabric.lua:3603,3641,3690) — or, if buffered signals
already satisfy its condition, the suspend call returns
SuspendOutcome::AlreadySatisfied and the waitpoint activates
without ever releasing the lease.
Pending-waitpoint expiry is a first-class terminal error on
the wire (PendingWaitpointExpired at
ff-script/src/error.rs:170,403-408). The attempt retains its
lease while the waitpoint is pending; signals delivered to
this waitpoint are buffered server-side (RFC-012 §R7.2.2).
Sourcefn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Non-mutating observation of signals that satisfied the handle’s resume condition.
Sourcefn claim_from_reclaim<'life0, 'async_trait>(
&'life0 self,
token: ReclaimToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_from_reclaim<'life0, 'async_trait>(
&'life0 self,
token: ReclaimToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consume a reclaim grant to mint a resumed-kind handle. Returns
Ok(None) when the grant’s target execution is no longer
resumable (already reclaimed, terminal, etc.).
Sourcefn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Park the execution until delay_until, releasing the lease.
Sourcefn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Mark the execution as waiting for its child flow to complete, releasing the lease.
Sourcefn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Snapshot an execution by id. Ok(None) ⇒ no such execution.
Sourcefn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Snapshot a flow by id. Ok(None) ⇒ no such flow.
Sourcefn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
List dependency edges adjacent to an execution. Read-only; the
backend resolves the subject execution’s flow, reads the
direction-specific adjacency SET, and decodes each member’s
flow-scoped edge:<edge_id> hash.
Returns an empty Vec when the subject has no edges on the
requested side — including standalone executions (no owning
flow). Ordering is unspecified: the underlying adjacency SET
is an unordered SMEMBERS read. Callers that need deterministic
order should sort by EdgeSnapshot::edge_id /
EdgeSnapshot::created_at themselves.
Parse failures on the edge hash surface as
EngineError::Validation { kind: ValidationKind::Corruption, .. }
— unknown fields, missing required fields, endpoint mismatches
against the adjacency SET all fail loud rather than silently
returning partial results.
Gated on the core feature — edge reads are part of the
minimal engine surface a Postgres-style backend must honour.
Sourcefn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Snapshot a single dependency edge by its owning flow + edge id.
Ok(None) when the edge hash is absent (never staged, or
staged under a different flow than flow_id). Parse failures
on a present edge hash surface as
EngineError::Validation { kind: ValidationKind::Corruption, .. }
— the stored flow_id field is cross-checked against the
caller’s expected flow_id so a wrong-key read fails loud
rather than returning an unrelated edge.
Gated on the core feature — single-edge reads are part of
the minimal snapshot surface an alternate backend must honour
alongside Self::describe_execution / Self::describe_flow
/ Self::list_edges.
Sourcefn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Resolve an execution’s owning flow id, if any.
Ok(None) when the execution’s core record is absent or has
no associated flow (standalone execution). A present-but-
malformed flow_id field surfaces as
EngineError::Validation { kind: ValidationKind::Corruption, .. }.
Gated on the core feature. Used by ff-sdk’s
list_outgoing_edges / list_incoming_edges to pivot from a
consumer-supplied ExecutionId to the FlowId required by
Self::list_edges. A Valkey backend serves this with a
single HGET exec_core flow_id; a Postgres backend serves it
with the equivalent single-column row lookup.
Sourcefn list_flows<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_flows<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List flows on a partition with cursor-based pagination (issue #185).
Returns a ListFlowsPage of FlowSummary
rows ordered by flow_id (UUID byte-lexicographic). cursor
is None for the first page; callers forward the returned
next_cursor verbatim to continue iteration, and the listing
is exhausted when next_cursor is None. limit is the
maximum number of rows to return on this page — implementations
MAY return fewer (end of partition) but MUST NOT exceed it.
Ordering rationale: flow ids are UUIDs, and both Valkey
(sort after-the-fact) and Postgres (ORDER BY flow_id) can
agree on byte-lexicographic order — the same order
FlowId::to_string() produces for canonical hyphenated UUIDs.
Mapping to cursor > flow_id keeps the contract backend-
independent.
§Postgres implementation pattern
A Postgres-backed implementation serves this directly with
SELECT flow_id, created_at_ms, public_flow_state
FROM ff_flow
WHERE partition_key = $1
AND ($2::uuid IS NULL OR flow_id > $2)
ORDER BY flow_id
LIMIT $3 + 1;— reading one extra row to decide whether next_cursor should
be set to the last row’s flow_id. The Valkey implementation
maintains the ff:idx:{fp:N}:flow_index SET and performs the
sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
pipelining HGETALL flow_core for each row on the page.
Gated on the core feature — flow listing is part of the
minimal engine surface a Postgres-style backend must honour.
Sourcefn list_lanes<'life0, 'async_trait>(
&'life0 self,
cursor: Option<LaneId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_lanes<'life0, 'async_trait>(
&'life0 self,
cursor: Option<LaneId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Enumerate registered lanes with cursor-based pagination.
Lanes are global (not partition-scoped) — the backend serves
this from its lane registry and does NOT accept a
crate::partition::Partition argument. Results are sorted
by LaneId name so the ordering is stable across calls and
cursors address a deterministic position in the sort.
cursor— exclusive lower bound.Nonestarts from the first lane. To continue a walk, pass the previous page’sListLanesPage::next_cursor.limit— hard cap on the number of lanes returned in the page. Backends MAY round this down when the registry size is smaller; they MUST NOT return more thanlimit.
ListLanesPage::next_cursor is Some(last_lane_in_page)
iff at least one more lane exists after the returned page,
and None on the final page. Callers loop until next_cursor
is None to read the full registry.
Gated on the core feature — lane enumeration is part of the
minimal snapshot surface an alternate backend must honour
alongside Self::describe_flow / Self::list_edges.
Sourcefn list_suspended<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_suspended<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List suspended executions in one partition, cursor-paginated,
with each entry’s suspension reason_code populated (issue
#183).
Consumer-facing “what’s blocked on what?” panels (ff-board’s
suspended-executions view, operator CLIs) need the reason in
the list response so the UI does not round-trip per row to
describe_execution for a field it knows it needs. reason
on [SuspendedExecutionEntry] carries the free-form
suspension:current.reason_code field — see the type rustdoc
for the String-not-enum rationale.
cursor is opaque to callers; pass None to start a fresh
scan and feed the returned ListSuspendedPage::next_cursor
back in on subsequent pages until it comes back None.
limit bounds the entries count; backends MAY return fewer
when the partition is exhausted.
Ordering is by ascending suspended_at_ms (the per-lane
suspended ZSET score == timeout_at or the no-timeout
sentinel) with execution id as a lex tiebreak, so cursor
continuation is deterministic across calls.
Gated on the core feature — suspended-list enumeration is
part of the minimal engine surface a Postgres-style backend
must honour.
Sourcefn list_executions<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_executions<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Forward-only paginated listing of the executions indexed under one partition.
Reads the partition-wide ff:idx:{p:N}:all_executions set,
sorts lexicographically on ExecutionId, and returns the page
of ids strictly greater than cursor (or starting from the
smallest id when cursor = None). The returned
ListExecutionsPage::next_cursor is the last id on the page
iff at least one more id exists past it; None signals
end-of-stream.
limit is the maximum number of ids returned on this page. A
limit of 0 returns an empty page with next_cursor = None.
Backends MAY cap limit internally (Valkey: 1000) and return
fewer ids than requested; callers continue paginating until
next_cursor == None.
Ordering is stable under concurrent inserts for already-emitted ids (an id less-than-or-equal-to the caller’s cursor is never re-emitted in later pages) but new inserts past the cursor WILL appear in subsequent pages — consistent with forward-only cursor semantics.
Gated on the core feature — partition-scoped listing is part
of the minimal engine surface every backend must honour.
Sourcefn deliver_signal<'life0, 'async_trait>(
&'life0 self,
args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Deliver an external signal to a suspended execution’s waitpoint.
The backend atomically records the signal, evaluates the resume
condition, and — when satisfied — transitions the execution
from suspended to runnable (or buffers the signal when the
waitpoint is still pending). Duplicate delivery — same
idempotency_key + waitpoint — surfaces as
DeliverSignalResult::Duplicate with the pre-existing
signal_id rather than mutating state twice.
Input validation (HMAC token presence, payload size limits,
signal-name shape) is the backend’s responsibility; callers
pass a fully populated DeliverSignalArgs and receive typed
outcomes or typed errors (ScriptError::invalid_token,
ScriptError::token_expired, ScriptError::ExecutionNotFound
surfaced via EngineError::Transport on the Valkey backend).
Gated on the core feature — signal delivery is part of the
minimal trigger surface every backend must honour so ff-server
/ REST handlers can dispatch against Arc<dyn EngineBackend>
without knowing which backend is running underneath.
Sourcefn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Claim a resumed execution — a previously-suspended attempt that
has cleared its resume condition (e.g. via
Self::deliver_signal) and now needs a worker to pick up the
same attempt index.
Distinct from Self::claim (fresh work) and
Self::claim_from_reclaim (grant-based ownership transfer
after a crash): the resumed-claim path re-binds an existing
attempt rather than minting a new one. The backend issues a
fresh lease_id + bumps the lease_epoch, preserving
attempt_id / attempt_index so stream frames and progress
updates continue on the same attempt.
Typed failures surface via ScriptError → EngineError:
NotAResumedExecution when the attempt state is not
attempt_interrupted, ExecutionNotLeaseable when the
lifecycle phase is not runnable, and InvalidClaimGrant
when the grant key is missing or was already consumed.
Gated on the core feature — resumed-claim is part of the
minimal trigger surface every backend must honour.
Sourcefn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Operator-initiated cancellation of a flow and (optionally) its member executions. See RFC-012 §3.1.1 for the policy /wait matrix.
Sourcefn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
downstream_execution_id: &'life2 ExecutionId,
policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
downstream_execution_id: &'life2 ExecutionId,
policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-016 Stage A: set the inbound-edge-group policy for a
downstream execution. Must be called before the first
add_dependency(... -> downstream_execution_id) — the backend
rejects with EngineError::Conflict if edges have already
been staged for this group.
Stage A honours only
EdgeDependencyPolicy::AllOf;
the AnyOf / Quorum variants return
EngineError::Validation with
detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"
until Stage B’s resolver lands.
Sourcefn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Report usage against a budget and check limits. Returns the
typed ReportUsageResult variant; backends enforce
idempotency via the caller-supplied
[UsageDimensions::dedup_key] (RFC-012 §R7.2.3 — replaces
the pre-Round-7 AdmissionDecision return).
Sourcefn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read frames from a completed or in-flight attempt’s stream.
from / to are StreamCursor values — StreamCursor::Start
/ StreamCursor::End are equivalent to XRANGE - / +, and
StreamCursor::At("<id>") reads from a concrete entry id.
Input validation (count_limit bounds, cursor shape) is the
caller’s responsibility — SDK-side wrappers in
ff-sdk enforce bounds before
forwarding. Backends MAY additionally reject out-of-range
input via EngineError::Validation.
Gated on the streaming feature — stream reads are part of
the stream-subset surface a backend without XREAD-like
primitives may omit.
Sourcefn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Tail a live attempt’s stream.
after is an exclusive StreamCursor — entries with id
strictly greater than after are returned. StreamCursor::Start
/ StreamCursor::End are NOT accepted here; callers MUST pass
a concrete id (or StreamCursor::from_beginning()). The SDK
wrapper rejects the open markers before reaching the backend.
block_ms == 0 → non-blocking peek. block_ms > 0 → blocks up
to that many ms for a new entry.
visibility (RFC-015 §6.1) filters the returned entries by
their stored StreamMode
mode field. Default
TailVisibility::All
preserves v1 behaviour.
Gated on the streaming feature — see read_stream.
Sourcefn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read the rolling summary document for an attempt (RFC-015 §6.3).
Returns Ok(None) when no StreamMode::DurableSummary
frame has ever been appended for the attempt. Non-blocking Hash
read; safe to call from any consumer without holding the lease.
Gated on the streaming feature — summary reads are part of
the stream-subset surface.
Provided Methods§
Sourcefn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Suspend by execution id + lease fence triple, for service-layer
callers that hold a run record / lease-claim descriptor but no
worker Handle (cairn issue #322).
Semantics mirror Self::suspend exactly — the same
SuspendArgs validation, the same SuspendOutcome
lifecycle, the same RFC-013 §3 dedup / replay contract. The
only difference is the fencing source: instead of the
(lease_id, lease_epoch, attempt_id) fields embedded in a
Handle, the backend fences against the triple passed directly.
Attempt-index, lane, and worker-instance metadata that
Self::suspend reads from the handle payload are recovered
from the backend’s authoritative execution record (Valkey:
exec_core HGETs; Postgres: ff_attempt row lookup).
The default impl returns EngineError::Unavailable so
existing backend impls remain non-breaking. Production backends
(Valkey, Postgres) override.
Sourcefn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Rotate the waitpoint HMAC signing kid cluster-wide.
v0.7 migration-master Q4 (adjudicated 2026-04-24). Additive trait surface so Valkey and Postgres backends can both expose the “rotate everywhere” semantic under one name.
- Valkey impl fans out an
ff_rotate_waitpoint_hmac_secretFCALL per execution partition.entries.len() == num_flow_partitionsand per-partition failures are surfaced as innerErrentries — the call as a whole does not fail when one partition’s FCALL fails, matching [ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions]’s partial-success contract. - Postgres impl (Wave 4) writes one row to
ff_waitpoint_hmac(kid, secret, rotated_at)and returns a single-entry vec withpartition = 0.
The default impl returns
EngineError::Unavailable with
op = "rotate_waitpoint_hmac_secret_all" so backends that
haven’t implemented the method surface the miss loudly rather
than silently no-op’ing. Both concrete backends override.
Sourcefn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Seed the initial waitpoint HMAC secret for a fresh deployment (issue #280).
Idempotent. If a current_kid (Valkey per-partition) or
an active kid row (Postgres) already exists with the given
kid, the method returns
SeedOutcome::AlreadySeeded without overwriting, reporting
whether the stored secret matches the caller-supplied one via
same_secret. Callers (cairn boot, operator tooling) invoke
this on every boot and let the backend decide whether to
install — removing the client-side “check then HSET” race that
cairn’s raw-HSET boot path silently tolerated.
For rotation of an already-seeded secret, use
Self::rotate_waitpoint_hmac_secret_all instead; seed is
install-only.
The default impl returns EngineError::Unavailable with
op = "seed_waitpoint_hmac_secret" so backends that haven’t
implemented the method surface the miss loudly.
Sourcefn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create an execution. Ingress row 6 (RFC-017 §4). Wraps
ff_create_execution on Valkey; INSERT INTO ff_execution ...
on Postgres. The idempotency_key + backend-side default
dedup_ttl_ms = 86400000 make duplicate submissions idempotent.
Sourcefn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a flow header. Ingress row 5.
Sourcefn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Atomically add an execution to a flow (single-FCALL co-located commit on Valkey; single-transaction UPSERT on Postgres).
Sourcefn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Stage a dependency edge between flow members. CAS-guarded on
graph_revision — stale rev returns Contention(StaleGraphRevision).
Sourcefn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Apply a staged dependency edge to its downstream child.
Sourcefn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Operator-initiated execution cancel (row 2).
Sourcefn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Re-score an execution’s eligibility priority (row 17).
Sourcefn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Replay a terminal execution (row 22). Variadic KEYS handling (inbound-edge pre-read) is hidden inside the Valkey impl per RFC-017 §4 row 3.
Sourcefn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Operator-initiated lease revoke (row 19).
Sourcefn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a budget definition (row 6).
Sourcefn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Reset a budget’s usage counters (row 10).
Sourcefn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a quota policy (row 7).
Sourcefn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read-only budget status for operator visibility (row 8).
Sourcefn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Admin-path report_usage (row 9 + RFC-017 §5 round-1 F4).
Distinct from the existing Self::report_usage which takes
a worker handle — the admin path has no lease context.
Sourcefn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Fetch the stored result payload for a completed execution
(row 4). Returns Ok(None) when the execution is missing, not
yet complete, or its payload was trimmed by retention policy.
Sourcefn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List the pending-or-active waitpoints for an execution, cursor
paginated (row 5 / §8). Stage A preserves the existing
PendingWaitpointInfo shape; Stage D ships the §8 HMAC
sanitisation + (token_kid, token_fingerprint) schema.
Sourcefn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Backend-level reachability probe (row 1). Valkey: PING;
Postgres: SELECT 1.
Sourcefn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
forwards to its ff_scheduler::Scheduler cursor; Postgres
forwards to PostgresScheduler’s FOR UPDATE SKIP LOCKED
path.
Backends that carry an embedded scheduler (e.g. ValkeyBackend
constructed via with_embedded_scheduler, or PostgresBackend
with its with_scanners sibling) route the claim through it.
Backends without a wired scheduler return
EngineError::Unavailable. HTTP consumers use
FlowFabricWorker::claim_via_server instead.
Sourcefn backend_label(&self) -> &'static str
fn backend_label(&self) -> &'static str
Static observability label identifying the backend family in
logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
returns "unknown" so legacy impl EngineBackend blocks that
have not upgraded keep compiling; every in-tree backend
overrides — ValkeyBackend → "valkey", PostgresBackend →
"postgres".
Sourcefn capabilities(&self) -> Capabilities
fn capabilities(&self) -> Capabilities
RFC-018 Stage A: snapshot of this backend’s identity + the
flat Supports surface it can actually service. Consumers use
this at startup to gate UI features / choose between alternative
code paths before dispatching. See
rfcs/RFC-018-backend-capability-discovery.md for the full
discovery contract and the four owner-adjudicated open
questions (granularity: coarse; version: struct; sync; no
event stream).
Default: returns a value tagged family = "unknown" with every
supports.* bool false, so pre-RFC-018 out-of-tree backends
keep compiling and consumers treat “all false” as “dispatch
and catch EngineError::Unavailable” (pre-RFC-018 behaviour).
Concrete in-tree backends (ValkeyBackend, PostgresBackend)
override to populate a real value.
Sync (no .await): backend-static info should not require a
probe on every query. Dynamic probes happen once at
connect* time and cache the result.
Sourcefn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Issue #281: run one-time backend-specific boot preparation.
Intended to run ONCE per deployment startup — NOT per request.
Idempotent and safe for consumers to call on every application
boot; backends that have nothing to do return
PrepareOutcome::NoOp without side effects.
Per-backend behaviour:
- Valkey — issues
FUNCTION LOAD REPLACEfor theflowfabricLua library (with bounded retry on transient transport faults; permanent compile errors surface asEngineError::Transportwithout retry). ReturnsPrepareOutcome::Appliedcarrying"FUNCTION LOAD (flowfabric lib v<N>)". - Postgres — returns
PrepareOutcome::NoOp. Schema migrations are applied out-of-band perrfcs/drafts/v0.7-migration-master.md §Q12; the backend runs a schema-version check at connect time and refuses to start on mismatch, so no boot-side prepare work remains. - Default impl — returns
PrepareOutcome::NoOpso out-of-tree backends without preparation work compile without boilerplate.
§Relationship to the in-tree boot path
ValkeyBackend::initialize_deployment (called from
Server::start_with_metrics) already invokes
ensure_library inline as
its step 4; that path is unchanged. prepare() exists as a
trait-surface entry point so consumers that construct an
Arc<dyn EngineBackend> outside of Server (e.g.
cairn-fabric’s boot path at cairn-fabric/src/boot.rs) can
run the same preparation without reaching into
backend-specific modules. The overlap is intentional: calling
both prepare() and initialize_deployment is safe because
FUNCTION LOAD REPLACE is idempotent under the version
check.
§Layer forwarding
Layer impls (HookedBackend, ff-sdk layers) do NOT forward
prepare today — consistent with backend_label / ping /
shutdown_prepare. Consumers that wrap a backend in layers
MUST call prepare() on the raw backend before wrapping, or
accept the default PrepareOutcome::NoOp.
Sourcefn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Drain-before-shutdown hook (RFC-017 §5.4). The server calls
this before draining its own background tasks so backend-
scoped primitives (Valkey stream semaphore, Postgres sqlx
pool, …) can close their gates and await in-flight work up to
grace.
Default impl returns Ok(()) — a no-op backend has nothing
backend-scoped to drain. Concrete backends whose data plane
owns resources (connection pools, semaphores, listeners)
override with a real body.
Sourcefn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
RFC-017 Stage E2: the “header” portion of cancel_flow — run the
atomic flow-state flip (Valkey: ff_cancel_flow FCALL; Postgres:
cancel_flow_once tx), decode policy + membership, and surface
the flow_already_terminal idempotency branch as a first-class
[CancelFlowHeader::AlreadyTerminal] so the Server can build
the wire CancelFlowResult without reaching for a raw
Client. Separate from the existing
EngineBackend::cancel_flow entry point (which takes the
enum-typed (policy, wait) split and returns the wait-collapsed
CancelFlowResult) because the Server owns its own
wait-dispatch + member-cancel machinery via
EngineBackend::cancel_execution + backlog ack.
Default impl returns EngineError::Unavailable so un-migrated
backends surface the miss loudly.
Sourcefn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-017 Stage E2: best-effort acknowledgement that one member of
a cancel_all flow has completed its per-member cancel. Drains
the member from the flow’s pending_cancels set and, if empty,
removes the flow from the partition-level cancel_backlog
(Valkey: ff_ack_cancel_member FCALL; Postgres: table write —
default Unavailable until Wave 9).
Failures are swallowed by the caller — the cancel-backlog reconciler is the authoritative drain — but a typed error here lets the caller log a backend-scoped context string.
Sourcefn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
RFC-017 Stage E2: full-shape execution read used by the
GET /v1/executions/{id} HTTP route. Returns the legacy
ExecutionInfo wire shape (not the decoupled
ExecutionSnapshot) so the existing HTTP response bytes stay
identical across the migration.
Ok(None) ⇒ no such execution. Default Unavailable because
the Valkey HGETALL-and-parse is backend-specific.
Sourcefn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
RFC-017 Stage E2: narrow public_state read used by the
GET /v1/executions/{id}/state HTTP route. Returns Ok(None)
when the execution is missing. Default Unavailable.
Sourcefn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to lease lifecycle events (acquired / renewed / expired / reclaimed / revoked) for the partition this backend is configured with.
Cross-partition fan-out is consumer-side merge: subscribe
per-partition backend instance and interleave on the read
side. Yields
Err(EngineError::StreamDisconnected { cursor }) on backend
disconnect; resume by calling this method again with the
returned cursor.
filter (#282): when filter.instance_tag is Some((k, v)),
only events whose execution carries tag k = v are yielded
(matching the crate::backend::ScannerFilter surface from
#122). Pass &ScannerFilter::default() for unfiltered
behaviour. Filtering happens inside the backend stream; the
crate::stream_events::LeaseHistorySubscription return type
is unchanged.
Sourcefn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to completion events (terminal state transitions).
- Postgres: wraps the
ff_completion_eventoutbox + LISTEN/NOTIFY machinery. Durable via event-id cursor. - Valkey: wraps the RESP3
ff:dag:completionspubsub subscriber. Pubsub is at-most-once over the live subscription window; the cursor is always the empty sentinel. If you need at-least-once replay with durable cursor resume, use the Postgres backend (seedocs/POSTGRES_PARITY_MATRIX.mdrowsubscribe_completion).
filter (#282): see Self::subscribe_lease_history. Valkey
reuses the subscribe_completions_filtered per-event HGET
gate; Postgres filters inline against the outbox’s denormalised
instance_tag column.
Sourcefn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to signal-delivery events (satisfied / buffered / deduped).
filter (#282): see Self::subscribe_lease_history.
Subscribe to instance-tag events (tag attached / cleared).
Producer wiring is deferred per #311 audit (“no concrete
demand”); the trait method exists for API uniformity across
the four families. Backends currently return
EngineError::Unavailable.