pub trait EngineBackend:
Send
+ Sync
+ 'static {
Show 73 methods
// Required methods
fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn claim_from_resume_grant<'life0, 'async_trait>(
&'life0 self,
token: ResumeToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn read_execution_context<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
// Provided methods
fn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn issue_reclaim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueReclaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn reclaim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReclaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn read_current_attempt_index<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn read_total_attempt_count<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
_eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn list_flows<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn list_lanes<'life0, 'async_trait>(
&'life0 self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn list_suspended<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn list_executions<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn scan_eligible_executions<'life0, 'async_trait>(
&'life0 self,
_args: ScanEligibleArgs,
) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn issue_claim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueClaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn block_route<'life0, 'async_trait>(
&'life0 self,
_args: BlockRouteArgs,
) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn claim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_downstream_execution_id: &'life2 ExecutionId,
_policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn backend_label(&self) -> &'static str { ... }
fn as_any(&self) -> &(dyn Any + 'static) { ... }
fn capabilities(&self) -> Capabilities { ... }
fn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_instance_tags<'life0, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
) -> Pin<Box<dyn Future<Output = Result<InstanceTagSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
}Expand description
The engine write surface — a single trait a backend implementation
honours to serve a FlowFabricWorker.
See RFC-012 §3.1 for the inventory rationale and §3.3 for the
type-level shape. 16 methods (Round-7 added create_waitpoint;
append_frame return widened; report_usage return replaced —
RFC-012 §R7). Issue #150 added the two trigger-surface methods
(deliver_signal / claim_resumed_execution).
§Note on complete payload shape
The RFC §3.3 sketch uses Option<Bytes>; the Stage 1a trait uses
Option<Vec<u8>> to match the existing
ff_sdk::ClaimedTask::complete signature and avoid adding a
bytes public-type dep for zero consumer benefit. Round-4 §7.17
resolved the payload container debate to Box<[u8]> in the
public type (see HandleOpaque); Option<Vec<u8>> is the
zero-churn choice consistent with today’s code. Consumers that
need &[u8] can borrow via .as_deref() on the Option.
Required Methods§
Sourcefn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Fresh-work claim. Returns Ok(None) when no work is currently
available; Err only on transport or input-validation faults.
Sourcefn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Renew a held lease. Returns the updated expiry + epoch on
success; typed State::StaleLease / State::LeaseExpired
when the lease has been stolen or timed out.
Sourcefn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Numeric-progress heartbeat.
Writes scalar progress_percent / progress_message fields on
exec_core; each call overwrites the previous value. This does
NOT append to the output stream — stream-frame producers must use
append_frame instead.
Sourcefn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Append one stream frame. Distinct from progress
per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
id and post-append frame count (RFC-012 §R7.2.1).
Stream-frame producers (arbitrary frame_type + payload, consumed
via the read/tail surfaces) MUST use this method rather than
progress; the latter updates scalar fields on
exec_core and is invisible to stream consumers.
Sourcefn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Terminal success. Borrows handle (round-4 M-D2) so callers
can retry under EngineError::Transport without losing the
cookie. Payload is Option<Vec<u8>> per the note above.
Sourcefn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Terminal failure with classification. Returns FailOutcome
so the caller learns whether a retry was scheduled.
Sourcefn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Cooperative cancel by the worker holding the lease.
Sourcefn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Suspend the execution awaiting a typed resume condition (RFC-013 Stage 1d).
Borrows handle (round-4 M-D2). Terminal-looking behaviour is
expressed through SuspendOutcome:
SuspendOutcome::Suspended— the pre-suspend handle is logically invalidated; the freshHandleKind::Suspendedhandle inside the variant supersedes it. Runtime enforcement via the fence triple: subsequent ops against the stale handle surface asContention(LeaseConflict).SuspendOutcome::AlreadySatisfied— buffered signals on a pending waitpoint already matched the resume condition at suspension time. The lease is NOT released; the caller’s pre-suspend handle remains valid.
See RFC-013 §2 for the type shapes, §3 for the replay / idempotency contract, §4 for the error taxonomy.
Sourcefn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Issue a pending waitpoint for future signal delivery.
Waitpoints have two states in the Valkey wire contract:
pending (token issued, not yet backing a suspension) and
active (bound to a suspension). This method creates a
waitpoint in the pending state. A later suspend call
transitions a pending waitpoint to active (see Lua
use_pending_waitpoint ARGV flag at
flowfabric.lua:3603,3641,3690) — or, if buffered signals
already satisfy its condition, the suspend call returns
SuspendOutcome::AlreadySatisfied and the waitpoint activates
without ever releasing the lease.
Pending-waitpoint expiry is a first-class terminal error on
the wire (PendingWaitpointExpired at
ff-script/src/error.rs:170,403-408). The attempt retains its
lease while the waitpoint is pending; signals delivered to
this waitpoint are buffered server-side (RFC-012 §R7.2.2).
Sourcefn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Non-mutating observation of signals that satisfied the handle’s resume condition.
Sourcefn claim_from_resume_grant<'life0, 'async_trait>(
&'life0 self,
token: ResumeToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_from_resume_grant<'life0, 'async_trait>(
&'life0 self,
token: ResumeToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consume a resume grant (via ResumeToken) to mint a
resumed-kind handle. Routes to ff_claim_resumed_execution on
Valkey / the epoch-bump reconciler on PG/SQLite. Returns
Ok(None) when the grant’s target execution is no longer
resumable (already reclaimed, terminal, etc.).
Renamed from claim_from_reclaim (RFC-024 PR-B+C). The
pre-rename name advertised “reclaim” but the semantic has
always been resume-after-suspend. The new lease-reclaim path
lives on Self::reclaim_execution.
Sourcefn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Park the execution until delay_until, releasing the lease.
Sourcefn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Mark the execution as waiting for its child flow to complete, releasing the lease.
Sourcefn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Snapshot an execution by id. Ok(None) ⇒ no such execution.
Sourcefn read_execution_context<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_context<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Point-read of the execution-scoped (input_payload, execution_kind, tags) bundle used by the SDK worker when
assembling a ClaimedTask (see ff_sdk::ClaimedTask) after a
successful claim.
No default impl — every EngineBackend must answer this
explicitly. Distinct from Self::describe_execution
(read-model projection) because the SDK needs the raw payload
bytes + kind + tags immediately post-claim, and the snapshot
projection deliberately omits the payload bytes.
Per-backend shape:
- Valkey — pipelined
GET :payload+HGETALL :coreHGETALL :tagson the execution’s partition (same pattern asSelf::describe_execution).
- Postgres — single
SELECT payload, raw_fieldsonff_exec_corekeyed by(partition_key, execution_id);execution_kind+tagslive inraw_fieldsJSONB. - SQLite — identical shape to Postgres.
Returns EngineError::Validation { kind: ValidationKind::InvalidInput, .. }
when the execution does not exist — the SDK worker only calls
this after a successful claim, so a missing row is a loud
storage-tier invariant violation rather than a routine Ok(None).
Sourcefn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Snapshot a flow by id. Ok(None) ⇒ no such flow.
Sourcefn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Operator-initiated cancellation of a flow and (optionally) its member executions. See RFC-012 §3.1.1 for the policy /wait matrix.
Sourcefn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Report usage against a budget and check limits. Returns the
typed ReportUsageResult variant; backends enforce
idempotency via the caller-supplied
[UsageDimensions::dedup_key] (RFC-012 §R7.2.3 — replaces
the pre-Round-7 AdmissionDecision return).
Provided Methods§
Sourcefn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Suspend by execution id + lease fence triple, for service-layer
callers that hold a run record / lease-claim descriptor but no
worker Handle (cairn issue #322).
Semantics mirror Self::suspend exactly — the same
SuspendArgs validation, the same SuspendOutcome
lifecycle, the same RFC-013 §3 dedup / replay contract. The
only difference is the fencing source: instead of the
(lease_id, lease_epoch, attempt_id) fields embedded in a
Handle, the backend fences against the triple passed directly.
Attempt-index, lane, and worker-instance metadata that
Self::suspend reads from the handle payload are recovered
from the backend’s authoritative execution record (Valkey:
exec_core HGETs; Postgres: ff_attempt row lookup).
The default impl returns EngineError::Unavailable so
existing backend impls remain non-breaking. Production backends
(Valkey, Postgres) override.
Sourcefn issue_reclaim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueReclaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn issue_reclaim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueReclaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
in lease_expired_reclaimable or lease_revoked state to the
reclaim path; the returned IssueReclaimGrantOutcome::Granted
carries a crate::contracts::ReclaimGrant which is then fed
to Self::reclaim_execution to mint a fresh attempt.
Default impl returns EngineError::Unavailable — PR-D (PG),
PR-E (SQLite), and PR-F (Valkey) override with real bodies.
Sourcefn reclaim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReclaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reclaim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReclaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consume a crate::contracts::ReclaimGrant to mint a fresh
attempt for a previously lease-expired / lease-revoked
execution (RFC-024 §3.2). Creates a new attempt row, bumps the
execution’s lease_reclaim_count, and mints a
crate::backend::HandleKind::Reclaimed handle.
Default impl returns EngineError::Unavailable — PR-D (PG),
PR-E (SQLite), and PR-F (Valkey) override with real bodies.
Sourcefn read_current_attempt_index<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_current_attempt_index<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Point-read of the execution’s current attempt-index pointer — the index of the currently-leased attempt row.
Distinct from Self::read_total_attempt_count: this method
names the attempt that already exists (pointer), whereas
read_total_attempt_count is the monotonic claim counter used
to compute the next fresh attempt index. See the sibling’s
rustdoc for the retry-path scenario that motivates the split.
Used on the SDK worker’s claim_from_resume_grant path —
specifically the private claim_resumed_execution helper —
immediately before dispatching Self::claim_resumed_execution.
The returned index is fed into
ClaimResumedExecutionArgs::current_attempt_index
so the backend’s script / transaction targets the correct
existing attempt row (KEYS[6] on Valkey; ff_attempt PK tuple
on PG/SQLite).
Per-backend shape:
- Valkey —
HGET {exec}:core current_attempt_indexon the execution’s partition. Single command. Both the missing-field case (exec_corepresent butcurrent_attempt_indexabsent or empty-string, i.e. pre-claim state) and the missing-row case (noexec_corehash at all) read back asAttemptIndex(0). This preserves the pre-PR-3 inline-HGETsemantic and is safe because Valkey’s happy path requiresexec_coreto exist before this method is reached — the SDK only callsread_current_attempt_indexpost-grant, and grant issuance is gated onexec_corepresence. A genuinely absent row would surface as the proper business-logic error (NotAResumedExecution/ExecutionNotLeaseable) on the downstream FCALL. - Postgres —
SELECT attempt_index FROM ff_exec_core WHERE partition_key = $1 AND execution_id = $2. The column isNOT NULL DEFAULT 0so a pre-claim row reads back as0(matching Valkey’s missing-field case). Missing row surfaces asEngineError::Validation { kind: ValidationKind::InvalidInput, .. }— diverges from Valkey’s missing-row→ 0mapping. - SQLite —
SELECT attempt_index FROM ff_exec_core WHERE partition_key = ? AND execution_id = ?; identical semantics to Postgres (missing-row →InvalidInput).
Cross-backend asymmetry on missing row is intentional. The
SDK happy path never observes it (grant issuance on Valkey
requires exec_core, and PG/SQLite currently return
Unavailable from claim_from_grant per
project_claim_from_grant_pg_sqlite_gap.md). Consumers writing
backend-agnostic tooling against this method directly must
treat the missing-row case as backend-dependent — match on
InvalidInput for PG/SQLite, and treat an unexpected 0 as
the Valkey equivalent signal.
The default impl returns EngineError::Unavailable so the
trait addition is non-breaking for out-of-tree backends (same
precedent as Self::read_execution_context landing in v0.12
PR-1).
Sourcefn read_total_attempt_count<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_total_attempt_count<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Point-read of the execution’s total attempt counter — the monotonic count of claims that have ever fired against this execution (including the in-flight one once claimed).
Used on the SDK worker’s claim_from_grant / claim_execution
path — the next attempt-index for a fresh claim is this
counter’s current value (so 1 on the second retry after the
first attempt failed terminally). This is semantically distinct
from Self::read_current_attempt_index, which is a pointer
at the currently-leased attempt row and is only meaningful on
the claim_from_resume_grant path (where a live attempt already
exists and we want to re-seat its lease rather than mint a new
attempt row).
Reading the pointer on the claim_from_grant path was a live
bug: on the retry-of-a-retry scenario the pointer still named
the previous terminal-failed attempt, so the newly-minted
attempt collided with it (Valkey KEYS[6]) or mis-targeted the
PG/SQLite ff_attempt PK tuple. This method fixes that by
reading the counter that Lua 5920 / PG ff_claim_execution /
SQLite claim_impl all already consult when computing the
next attempt index.
Per-backend shape:
- Valkey —
HGET {exec}:core total_attempt_counton the execution’s partition. Single command; pre-claim read (field absent or empty) maps to0. - Postgres —
SELECT raw_fields->>'total_attempt_count' FROM ff_exec_core WHERE (partition_key, execution_id) = .... The field lives in the JSONBraw_fieldsbag rather than a dedicated column (mirrors howcreate_execution_implseeds it on row creation). Missing row →InvalidInput; missing field →0. - SQLite —
SELECT CAST(json_extract(raw_fields, '$.total_attempt_count') AS INTEGER) FROM ff_exec_core WHERE .... Same JSON-in-raw_fieldsshape as PG; uses the samejson_extractidiom already employed inff-backend-sqlite/src/queries/operator.rsfor replay_count.
The default impl returns EngineError::Unavailable so the
trait addition is non-breaking for out-of-tree backends (same
precedent as Self::read_current_attempt_index landing in
v0.12 PR-3).
Sourcefn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
List dependency edges adjacent to an execution. Read-only; the
backend resolves the subject execution’s flow, reads the
direction-specific adjacency SET, and decodes each member’s
flow-scoped edge:<edge_id> hash.
Returns an empty Vec when the subject has no edges on the
requested side — including standalone executions (no owning
flow). Ordering is unspecified: the underlying adjacency SET
is an unordered SMEMBERS read. Callers that need deterministic
order should sort by EdgeSnapshot::edge_id /
EdgeSnapshot::created_at themselves.
Parse failures on the edge hash surface as
EngineError::Validation { kind: ValidationKind::Corruption, .. }
— unknown fields, missing required fields, endpoint mismatches
against the adjacency SET all fail loud rather than silently
returning partial results.
Gated on the core feature — edge reads are part of the
minimal engine surface a Postgres-style backend must honour.
Sourcefn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Snapshot a single dependency edge by its owning flow + edge id.
Ok(None) when the edge hash is absent (never staged, or
staged under a different flow than flow_id). Parse failures
on a present edge hash surface as
EngineError::Validation { kind: ValidationKind::Corruption, .. }
— the stored flow_id field is cross-checked against the
caller’s expected flow_id so a wrong-key read fails loud
rather than returning an unrelated edge.
Gated on the core feature — single-edge reads are part of
the minimal snapshot surface an alternate backend must honour
alongside Self::describe_execution / Self::describe_flow
/ Self::list_edges.
Sourcefn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
_eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
_eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Resolve an execution’s owning flow id, if any.
Ok(None) when the execution’s core record is absent or has
no associated flow (standalone execution). A present-but-
malformed flow_id field surfaces as
EngineError::Validation { kind: ValidationKind::Corruption, .. }.
Gated on the core feature. Used by ff-sdk’s
list_outgoing_edges / list_incoming_edges to pivot from a
consumer-supplied ExecutionId to the FlowId required by
Self::list_edges. A Valkey backend serves this with a
single HGET exec_core flow_id; a Postgres backend serves it
with the equivalent single-column row lookup.
Sourcefn list_flows<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_flows<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List flows on a partition with cursor-based pagination (issue #185).
Returns a ListFlowsPage of FlowSummary
rows ordered by flow_id (UUID byte-lexicographic). cursor
is None for the first page; callers forward the returned
next_cursor verbatim to continue iteration, and the listing
is exhausted when next_cursor is None. limit is the
maximum number of rows to return on this page — implementations
MAY return fewer (end of partition) but MUST NOT exceed it.
Ordering rationale: flow ids are UUIDs, and both Valkey
(sort after-the-fact) and Postgres (ORDER BY flow_id) can
agree on byte-lexicographic order — the same order
FlowId::to_string() produces for canonical hyphenated UUIDs.
Mapping to cursor > flow_id keeps the contract backend-
independent.
§Postgres implementation pattern
A Postgres-backed implementation serves this directly with
SELECT flow_id, created_at_ms, public_flow_state
FROM ff_flow
WHERE partition_key = $1
AND ($2::uuid IS NULL OR flow_id > $2)
ORDER BY flow_id
LIMIT $3 + 1;— reading one extra row to decide whether next_cursor should
be set to the last row’s flow_id. The Valkey implementation
maintains the ff:idx:{fp:N}:flow_index SET and performs the
sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
pipelining HGETALL flow_core for each row on the page.
Gated on the core feature — flow listing is part of the
minimal engine surface a Postgres-style backend must honour.
Sourcefn list_lanes<'life0, 'async_trait>(
&'life0 self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_lanes<'life0, 'async_trait>(
&'life0 self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Enumerate registered lanes with cursor-based pagination.
Lanes are global (not partition-scoped) — the backend serves
this from its lane registry and does NOT accept a
crate::partition::Partition argument. Results are sorted
by LaneId name so the ordering is stable across calls and
cursors address a deterministic position in the sort.
cursor— exclusive lower bound.Nonestarts from the first lane. To continue a walk, pass the previous page’sListLanesPage::next_cursor.limit— hard cap on the number of lanes returned in the page. Backends MAY round this down when the registry size is smaller; they MUST NOT return more thanlimit.
ListLanesPage::next_cursor is Some(last_lane_in_page)
iff at least one more lane exists after the returned page,
and None on the final page. Callers loop until next_cursor
is None to read the full registry.
Gated on the core feature — lane enumeration is part of the
minimal snapshot surface an alternate backend must honour
alongside Self::describe_flow / Self::list_edges.
Sourcefn list_suspended<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_suspended<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List suspended executions in one partition, cursor-paginated,
with each entry’s suspension reason_code populated (issue
#183).
Consumer-facing “what’s blocked on what?” panels (ff-board’s
suspended-executions view, operator CLIs) need the reason in
the list response so the UI does not round-trip per row to
describe_execution for a field it knows it needs. reason
on [SuspendedExecutionEntry] carries the free-form
suspension:current.reason_code field — see the type rustdoc
for the String-not-enum rationale.
cursor is opaque to callers; pass None to start a fresh
scan and feed the returned ListSuspendedPage::next_cursor
back in on subsequent pages until it comes back None.
limit bounds the entries count; backends MAY return fewer
when the partition is exhausted.
Ordering is by ascending suspended_at_ms (the per-lane
suspended ZSET score == timeout_at or the no-timeout
sentinel) with execution id as a lex tiebreak, so cursor
continuation is deterministic across calls.
Gated on the core feature — suspended-list enumeration is
part of the minimal engine surface a Postgres-style backend
must honour.
Sourcefn list_executions<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_executions<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Forward-only paginated listing of the executions indexed under one partition.
Reads the partition-wide ff:idx:{p:N}:all_executions set,
sorts lexicographically on ExecutionId, and returns the page
of ids strictly greater than cursor (or starting from the
smallest id when cursor = None). The returned
ListExecutionsPage::next_cursor is the last id on the page
iff at least one more id exists past it; None signals
end-of-stream.
limit is the maximum number of ids returned on this page. A
limit of 0 returns an empty page with next_cursor = None.
Backends MAY cap limit internally (Valkey: 1000) and return
fewer ids than requested; callers continue paginating until
next_cursor == None.
Ordering is stable under concurrent inserts for already-emitted ids (an id less-than-or-equal-to the caller’s cursor is never re-emitted in later pages) but new inserts past the cursor WILL appear in subsequent pages — consistent with forward-only cursor semantics.
Gated on the core feature — partition-scoped listing is part
of the minimal engine surface every backend must honour.
Sourcefn deliver_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Deliver an external signal to a suspended execution’s waitpoint.
The backend atomically records the signal, evaluates the resume
condition, and — when satisfied — transitions the execution
from suspended to runnable (or buffers the signal when the
waitpoint is still pending). Duplicate delivery — same
idempotency_key + waitpoint — surfaces as
DeliverSignalResult::Duplicate with the pre-existing
signal_id rather than mutating state twice.
Input validation (HMAC token presence, payload size limits,
signal-name shape) is the backend’s responsibility; callers
pass a fully populated DeliverSignalArgs and receive typed
outcomes or typed errors (ScriptError::invalid_token,
ScriptError::token_expired, ScriptError::ExecutionNotFound
surfaced via EngineError::Transport on the Valkey backend).
Gated on the core feature — signal delivery is part of the
minimal trigger surface every backend must honour so ff-server
/ REST handlers can dispatch against Arc<dyn EngineBackend>
without knowing which backend is running underneath.
Sourcefn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Claim a resumed execution — a previously-suspended attempt that
has cleared its resume condition (e.g. via
Self::deliver_signal) and now needs a worker to pick up the
same attempt index.
Distinct from Self::claim (fresh work) and
Self::claim_from_resume_grant (grant-based ownership transfer
after a crash): the resumed-claim path re-binds an existing
attempt rather than minting a new one. The backend issues a
fresh lease_id + bumps the lease_epoch, preserving
attempt_id / attempt_index so stream frames and progress
updates continue on the same attempt.
Typed failures surface via ScriptError → EngineError:
NotAResumedExecution when the attempt state is not
attempt_interrupted, ExecutionNotLeaseable when the
lifecycle phase is not runnable, and InvalidClaimGrant
when the grant key is missing or was already consumed.
Gated on the core feature — resumed-claim is part of the
minimal trigger surface every backend must honour.
Sourcefn scan_eligible_executions<'life0, 'async_trait>(
&'life0 self,
_args: ScanEligibleArgs,
) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn scan_eligible_executions<'life0, 'async_trait>(
&'life0 self,
_args: ScanEligibleArgs,
) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Scan a lane’s eligible ZSET on one partition for highest-priority executions awaiting a worker (v0.12 PR-5).
Lifted from the SDK-side ZRANGEBYSCORE inline on
FlowFabricWorker::claim_next — the scheduler-bypass scanner
gated behind direct-valkey-claim. The trait method itself is
backend-agnostic; consumers that drive the scanner loop
(bench harnesses, single-tenant dev) compose it with
Self::issue_claim_grant + Self::claim_execution to
replicate the pre-PR-5 claim_next body.
§Backend coverage
- Valkey —
ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>on the lane’s partition-scoped eligible key. Single command; no script round-trip. Wire shape is byte-for-byte identical to the pre-PR SDK inline call so bench traces match pre-PR without new#[tracing::instrument]span names. - Postgres / SQLite — use the
Err(Unavailable)default. PG/SQLite consumers drive work through the scheduler-routedSelf::claim_for_workerpath instead of the scanner primitives exposed here; lifting the scheduler itself onto the trait is RFC-024 follow-up scope. Seeproject_claim_from_grant_pg_sqlite_gap.mdfor motivation.
Default impl returns EngineError::Unavailable so the trait
addition is non-breaking for out-of-tree backends. Same
precedent as Self::claim_execution landing in v0.12 PR-4.
Sourcefn issue_claim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueClaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn issue_claim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueClaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Issue a claim grant — the scheduler’s admission write — for a single execution on a single lane (v0.12 PR-5).
Lifted from the SDK-side ff_issue_claim_grant inline helper
on FlowFabricWorker::claim_next. The backend atomically
writes the grant hash, appends to the per-worker grant index,
and removes the execution from the lane’s eligible ZSET.
Typed rejects surface via EngineError::Validation:
CapabilityMismatch when the worker’s capabilities do not
cover the execution’s required_capabilities, InvalidInput
for malformed args. Transport faults surface via
EngineError::Transport.
§Backend coverage
- Valkey — one
ff_issue_claim_grantFCALL. KEYS/ARGV shape is byte-for-byte identical to the pre-PR SDK inline call; bench traces match pre-PR. - Postgres / SQLite —
Err(Unavailable)default; useSelf::claim_for_workerinstead. SeeSelf::scan_eligible_executionsfor the cross-link rationale.
Sourcefn block_route<'life0, 'async_trait>(
&'life0 self,
_args: BlockRouteArgs,
) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn block_route<'life0, 'async_trait>(
&'life0 self,
_args: BlockRouteArgs,
) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Move an execution from a lane’s eligible ZSET into its blocked_route ZSET (v0.12 PR-5).
Lifted from the SDK-side ff_block_execution_for_admission
inline helper on FlowFabricWorker::claim_next. Called after
a Self::issue_claim_grant CapabilityMismatch reject —
without a block step, the inline scanner would re-pick the
same top-of-ZSET every tick (parity with
ff-scheduler::Scheduler::block_candidate).
The engine’s unblock scanner periodically promotes blocked_route back to eligible once a worker with matching caps registers.
§Backend coverage
- Valkey — one
ff_block_execution_for_admissionFCALL. - Postgres / SQLite —
Err(Unavailable)default; the scheduler-routedSelf::claim_for_workerpath handles admission rejects server-side.
Sourcefn claim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consume a scheduler-issued claim grant to mint a fresh attempt.
The SDK’s grant-consumer path — paired with FlowFabricWorker::claim_from_grant
in ff-sdk — routes through this method. The scheduler has
already validated budget / quota / capabilities and written a
grant (Valkey claim_grant hash); this call atomically
consumes that grant and creates the attempt row, mints
lease_id + lease_epoch, and returns a
ClaimExecutionResult::Claimed carrying the minted lease
triple.
Distinct from Self::claim (the scheduler-bypass scanner
used by the direct-valkey-claim feature) — this method
assumes the grant already exists and skips capability / ZSET
scanning. The Valkey impl fires exactly one ff_claim_execution
FCALL.
Typed failures surface via ScriptError → EngineError:
UseClaimResumedExecution when the attempt is actually
attempt_interrupted (caller should retry via
Self::claim_resumed_execution — see ContentionKind at
ff_core::engine_error), InvalidClaimGrant when the grant is
missing / consumed / worker-mismatched, CapabilityMismatch
when the execution’s required_capabilities drifted after
grant issuance.
§Backend coverage
- Valkey — implemented in
ff-backend-valkey(oneff_claim_executionFCALL). - Postgres / SQLite — use the
Err(Unavailable)default in this PR. Grants on PG / SQLite today flow throughPostgresScheduler::claim_for_worker(a sibling struct, not anEngineBackendmethod); wiring the default-over-trait behaviour into a PG / SQLiteclaim_executionimpl lands with a future RFC-024 grant-consumer extension.
The default impl returns EngineError::Unavailable so the
trait addition is non-breaking for out-of-tree backends. Same
precedent as Self::read_current_attempt_index landing in
v0.12 PR-3.
Sourcefn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_downstream_execution_id: &'life2 ExecutionId,
_policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_downstream_execution_id: &'life2 ExecutionId,
_policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-016 Stage A: set the inbound-edge-group policy for a
downstream execution. Must be called before the first
add_dependency(... -> downstream_execution_id) — the backend
rejects with EngineError::Conflict if edges have already
been staged for this group.
Stage A honours only
EdgeDependencyPolicy::AllOf;
the AnyOf / Quorum variants return
EngineError::Validation with
detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"
until Stage B’s resolver lands.
Sourcefn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Rotate the waitpoint HMAC signing kid cluster-wide.
v0.7 migration-master Q4 (adjudicated 2026-04-24). Additive trait surface so Valkey and Postgres backends can both expose the “rotate everywhere” semantic under one name.
- Valkey impl fans out an
ff_rotate_waitpoint_hmac_secretFCALL per execution partition.entries.len() == num_flow_partitionsand per-partition failures are surfaced as innerErrentries — the call as a whole does not fail when one partition’s FCALL fails, matching [ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions]’s partial-success contract. - Postgres impl (Wave 4) writes one row to
ff_waitpoint_hmac(kid, secret, rotated_at)and returns a single-entry vec withpartition = 0.
The default impl returns
EngineError::Unavailable with
op = "rotate_waitpoint_hmac_secret_all" so backends that
haven’t implemented the method surface the miss loudly rather
than silently no-op’ing. Both concrete backends override.
Sourcefn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Seed the initial waitpoint HMAC secret for a fresh deployment (issue #280).
Idempotent. If a current_kid (Valkey per-partition) or
an active kid row (Postgres) already exists with the given
kid, the method returns
SeedOutcome::AlreadySeeded without overwriting, reporting
whether the stored secret matches the caller-supplied one via
same_secret. Callers (cairn boot, operator tooling) invoke
this on every boot and let the backend decide whether to
install — removing the client-side “check then HSET” race that
cairn’s raw-HSET boot path silently tolerated.
For rotation of an already-seeded secret, use
Self::rotate_waitpoint_hmac_secret_all instead; seed is
install-only.
The default impl returns EngineError::Unavailable with
op = "seed_waitpoint_hmac_secret" so backends that haven’t
implemented the method surface the miss loudly.
Sourcefn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read frames from a completed or in-flight attempt’s stream.
from / to are StreamCursor values — StreamCursor::Start
/ StreamCursor::End are equivalent to XRANGE - / +, and
StreamCursor::At("<id>") reads from a concrete entry id.
Input validation (count_limit bounds, cursor shape) is the
caller’s responsibility — SDK-side wrappers in
ff-sdk enforce bounds before
forwarding. Backends MAY additionally reject out-of-range
input via EngineError::Validation.
Gated on the streaming feature — stream reads are part of
the stream-subset surface a backend without XREAD-like
primitives may omit.
Sourcefn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Tail a live attempt’s stream.
after is an exclusive StreamCursor — entries with id
strictly greater than after are returned. StreamCursor::Start
/ StreamCursor::End are NOT accepted here; callers MUST pass
a concrete id (or StreamCursor::from_beginning()). The SDK
wrapper rejects the open markers before reaching the backend.
block_ms == 0 → non-blocking peek. block_ms > 0 → blocks up
to that many ms for a new entry.
visibility (RFC-015 §6.1) filters the returned entries by
their stored StreamMode
mode field. Default
TailVisibility::All
preserves v1 behaviour.
Gated on the streaming feature — see read_stream.
Sourcefn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read the rolling summary document for an attempt (RFC-015 §6.3).
Returns Ok(None) when no StreamMode::DurableSummary
frame has ever been appended for the attempt. Non-blocking Hash
read; safe to call from any consumer without holding the lease.
Gated on the streaming feature — summary reads are part of
the stream-subset surface.
Sourcefn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create an execution. Ingress row 6 (RFC-017 §4). Wraps
ff_create_execution on Valkey; INSERT INTO ff_execution ...
on Postgres. The idempotency_key + backend-side default
dedup_ttl_ms = 86400000 make duplicate submissions idempotent.
Sourcefn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a flow header. Ingress row 5.
Sourcefn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Atomically add an execution to a flow (single-FCALL co-located commit on Valkey; single-transaction UPSERT on Postgres).
Sourcefn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Stage a dependency edge between flow members. CAS-guarded on
graph_revision — stale rev returns Contention(StaleGraphRevision).
Sourcefn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Apply a staged dependency edge to its downstream child.
Sourcefn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Operator-initiated execution cancel (row 2).
Sourcefn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Re-score an execution’s eligibility priority (row 17).
Sourcefn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Replay a terminal execution (row 22). Variadic KEYS handling (inbound-edge pre-read) is hidden inside the Valkey impl per RFC-017 §4 row 3.
Sourcefn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Operator-initiated lease revoke (row 19).
Sourcefn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a budget definition (row 6).
Sourcefn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Reset a budget’s usage counters (row 10).
Sourcefn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a quota policy (row 7).
Sourcefn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read-only budget status for operator visibility (row 8).
Sourcefn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Admin-path report_usage (row 9 + RFC-017 §5 round-1 F4).
Distinct from the existing Self::report_usage which takes
a worker handle — the admin path has no lease context.
Sourcefn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Fetch the stored result payload for a completed execution
(row 4). Returns Ok(None) when the execution is missing, not
yet complete, or its payload was trimmed by retention policy.
Sourcefn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List the pending-or-active waitpoints for an execution, cursor
paginated (row 5 / §8). Stage A preserves the existing
PendingWaitpointInfo shape; Stage D ships the §8 HMAC
sanitisation + (token_kid, token_fingerprint) schema.
Sourcefn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Backend-level reachability probe (row 1). Valkey: PING;
Postgres: SELECT 1.
Sourcefn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
forwards to its ff_scheduler::Scheduler cursor; Postgres
forwards to PostgresScheduler’s FOR UPDATE SKIP LOCKED
path.
Backends that carry an embedded scheduler (e.g. ValkeyBackend
constructed via with_embedded_scheduler, or PostgresBackend
with its with_scanners sibling) route the claim through it.
Backends without a wired scheduler return
EngineError::Unavailable. HTTP consumers use
FlowFabricWorker::claim_via_server instead.
Sourcefn backend_label(&self) -> &'static str
fn backend_label(&self) -> &'static str
Static observability label identifying the backend family in
logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
returns "unknown" so legacy impl EngineBackend blocks that
have not upgraded keep compiling; every in-tree backend
overrides — ValkeyBackend → "valkey", PostgresBackend →
"postgres".
Sourcefn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
Backend downcast escape hatch (v0.12 PR-7a transitional).
Scanner supervisors in ff-engine still dispatch through a
concrete ferriskey::Client; to keep the engine’s public
boundary backend-agnostic (Arc<dyn EngineBackend>) while the
scanner internals remain Valkey-shaped, the engine downcasts
via this method and reaches in for the embedded client. Every
backend that wants to be consumed by Engine::start_with_completions
overrides this to return self as &dyn Any; the default
returns a placeholder so a stray downcast_ref fails cleanly
rather than risking unsound behaviour.
v0.13 (PR-7b) will trait-ify individual scanners onto
EngineBackend and retire ff-engine’s dependence on this
downcast path. The method itself will remain on the trait
(likely deprecated) rather than be removed — removing a
public trait method is a breaking change for external
impl EngineBackend blocks.
Sourcefn capabilities(&self) -> Capabilities
fn capabilities(&self) -> Capabilities
RFC-018 Stage A: snapshot of this backend’s identity + the
flat Supports surface it can actually service. Consumers use
this at startup to gate UI features / choose between alternative
code paths before dispatching. See
rfcs/RFC-018-backend-capability-discovery.md for the full
discovery contract and the four owner-adjudicated open
questions (granularity: coarse; version: struct; sync; no
event stream).
Default: returns a value tagged family = "unknown" with every
supports.* bool false, so pre-RFC-018 out-of-tree backends
keep compiling and consumers treat “all false” as “dispatch
and catch EngineError::Unavailable” (pre-RFC-018 behaviour).
Concrete in-tree backends (ValkeyBackend, PostgresBackend)
override to populate a real value.
Sync (no .await): backend-static info should not require a
probe on every query. Dynamic probes happen once at
connect* time and cache the result.
Sourcefn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Issue #281: run one-time backend-specific boot preparation.
Intended to run ONCE per deployment startup — NOT per request.
Idempotent and safe for consumers to call on every application
boot; backends that have nothing to do return
PrepareOutcome::NoOp without side effects.
Per-backend behaviour:
- Valkey — issues
FUNCTION LOAD REPLACEfor theflowfabricLua library (with bounded retry on transient transport faults; permanent compile errors surface asEngineError::Transportwithout retry). ReturnsPrepareOutcome::Appliedcarrying"FUNCTION LOAD (flowfabric lib v<N>)". - Postgres — returns
PrepareOutcome::NoOp. Schema migrations are applied out-of-band perrfcs/drafts/v0.7-migration-master.md §Q12; the backend runs a schema-version check at connect time and refuses to start on mismatch, so no boot-side prepare work remains. - Default impl — returns
PrepareOutcome::NoOpso out-of-tree backends without preparation work compile without boilerplate.
§Relationship to the in-tree boot path
ValkeyBackend::initialize_deployment (called from
Server::start_with_metrics) already invokes
ensure_library inline as
its step 4; that path is unchanged. prepare() exists as a
trait-surface entry point so consumers that construct an
Arc<dyn EngineBackend> outside of Server (e.g.
cairn-fabric’s boot path at cairn-fabric/src/boot.rs) can
run the same preparation without reaching into
backend-specific modules. The overlap is intentional: calling
both prepare() and initialize_deployment is safe because
FUNCTION LOAD REPLACE is idempotent under the version
check.
§Layer forwarding
Layer impls (HookedBackend, ff-sdk layers) do NOT forward
prepare today — consistent with backend_label / ping /
shutdown_prepare. Consumers that wrap a backend in layers
MUST call prepare() on the raw backend before wrapping, or
accept the default PrepareOutcome::NoOp.
Sourcefn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Drain-before-shutdown hook (RFC-017 §5.4). The server calls
this before draining its own background tasks so backend-
scoped primitives (Valkey stream semaphore, Postgres sqlx
pool, …) can close their gates and await in-flight work up to
grace.
Default impl returns Ok(()) — a no-op backend has nothing
backend-scoped to drain. Concrete backends whose data plane
owns resources (connection pools, semaphores, listeners)
override with a real body.
Sourcefn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
RFC-017 Stage E2: the “header” portion of cancel_flow — run the
atomic flow-state flip (Valkey: ff_cancel_flow FCALL; Postgres:
cancel_flow_once tx), decode policy + membership, and surface
the flow_already_terminal idempotency branch as a first-class
[CancelFlowHeader::AlreadyTerminal] so the Server can build
the wire CancelFlowResult without reaching for a raw
Client. Separate from the existing
EngineBackend::cancel_flow entry point (which takes the
enum-typed (policy, wait) split and returns the wait-collapsed
CancelFlowResult) because the Server owns its own
wait-dispatch + member-cancel machinery via
EngineBackend::cancel_execution + backlog ack.
Default impl returns EngineError::Unavailable so un-migrated
backends surface the miss loudly.
Sourcefn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-017 Stage E2: best-effort acknowledgement that one member of
a cancel_all flow has completed its per-member cancel. Drains
the member from the flow’s pending_cancels set and, if empty,
removes the flow from the partition-level cancel_backlog
(Valkey: ff_ack_cancel_member FCALL; Postgres: table write —
default Unavailable until Wave 9).
Failures are swallowed by the caller — the cancel-backlog reconciler is the authoritative drain — but a typed error here lets the caller log a backend-scoped context string.
Sourcefn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
RFC-017 Stage E2: full-shape execution read used by the
GET /v1/executions/{id} HTTP route. Returns the legacy
ExecutionInfo wire shape (not the decoupled
ExecutionSnapshot) so the existing HTTP response bytes stay
identical across the migration.
Ok(None) ⇒ no such execution. Default Unavailable because
the Valkey HGETALL-and-parse is backend-specific.
Sourcefn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
RFC-017 Stage E2: narrow public_state read used by the
GET /v1/executions/{id}/state HTTP route. Returns Ok(None)
when the execution is missing. Default Unavailable.
Sourcefn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to lease lifecycle events (acquired / renewed / expired / reclaimed / revoked) for the partition this backend is configured with.
Cross-partition fan-out is consumer-side merge: subscribe
per-partition backend instance and interleave on the read
side. Yields
Err(EngineError::StreamDisconnected { cursor }) on backend
disconnect; resume by calling this method again with the
returned cursor.
filter (#282): when filter.instance_tag is Some((k, v)),
only events whose execution carries tag k = v are yielded
(matching the crate::backend::ScannerFilter surface from
#122). Pass &ScannerFilter::default() for unfiltered
behaviour. Filtering happens inside the backend stream; the
crate::stream_events::LeaseHistorySubscription return type
is unchanged.
Sourcefn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to completion events (terminal state transitions).
- Postgres: wraps the
ff_completion_eventoutbox + LISTEN/NOTIFY machinery. Durable via event-id cursor. - Valkey: wraps the RESP3
ff:dag:completionspubsub subscriber. Pubsub is at-most-once over the live subscription window; the cursor is always the empty sentinel. If you need at-least-once replay with durable cursor resume, use the Postgres backend (seedocs/POSTGRES_PARITY_MATRIX.mdrowsubscribe_completion).
filter (#282): see Self::subscribe_lease_history. Valkey
reuses the subscribe_completions_filtered per-event HGET
gate; Postgres filters inline against the outbox’s denormalised
instance_tag column.
Sourcefn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to signal-delivery events (satisfied / buffered / deduped).
filter (#282): see Self::subscribe_lease_history.
Subscribe to instance-tag events (tag attached / cleared).
Producer wiring is deferred per #311 audit (“no concrete
demand”); the trait method exists for API uniformity across
the four families. Backends currently return
EngineError::Unavailable.