Skip to main content

EngineBackend

Trait EngineBackend 

Source
pub trait EngineBackend:
    Send
    + Sync
    + 'static {
Show 115 methods // Required methods fn claim<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, lane: &'life1 LaneId, capabilities: &'life2 CapabilitySet, policy: ClaimPolicy, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn renew<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn progress<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, percent: Option<u8>, message: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn append_frame<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, frame: Frame, ) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn complete<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, payload: Option<Vec<u8>>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn fail<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: FailureReason, classification: FailureClass, ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn cancel<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn suspend<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, args: SuspendArgs, ) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoint_key: &'life2 str, expires_in: Duration, ) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; fn observe_signals<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn claim_from_resume_grant<'life0, 'async_trait>( &'life0 self, token: ResumeToken, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait; fn delay<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, delay_until: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn wait_children<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn describe_execution<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn read_execution_context<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn describe_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn cancel_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, policy: CancelFlowPolicy, wait: CancelFlowWait, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait; fn report_usage<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, budget: &'life2 BudgetId, dimensions: UsageDimensions, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait; // Provided methods fn suspend_by_triple<'life0, 'async_trait>( &'life0 self, exec_id: ExecutionId, triple: LeaseFence, args: SuspendArgs, ) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn read_waitpoint_token<'life0, 'life1, 'async_trait>( &'life0 self, _partition: PartitionKey, _waitpoint_id: &'life1 WaitpointId, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn issue_reclaim_grant<'life0, 'async_trait>( &'life0 self, _args: IssueReclaimGrantArgs, ) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn reclaim_execution<'life0, 'async_trait>( &'life0 self, _args: ReclaimExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn read_current_attempt_index<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn read_total_attempt_count<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn set_execution_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _key: &'life2 str, _value: &'life3 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait { ... } fn set_flow_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _key: &'life2 str, _value: &'life3 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait { ... } fn get_execution_tag<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _key: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn get_execution_namespace<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn get_flow_tag<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _key: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn list_edges<'life0, 'life1, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _direction: EdgeDirection, ) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn describe_edge<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _edge_id: &'life2 EdgeId, ) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>( &'life0 self, _eid: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn list_flows<'life0, 'async_trait>( &'life0 self, _partition: PartitionKey, _cursor: Option<FlowId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn list_lanes<'life0, 'async_trait>( &'life0 self, _cursor: Option<LaneId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn list_suspended<'life0, 'async_trait>( &'life0 self, _partition: PartitionKey, _cursor: Option<ExecutionId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn list_executions<'life0, 'async_trait>( &'life0 self, _partition: PartitionKey, _cursor: Option<ExecutionId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn deliver_signal<'life0, 'async_trait>( &'life0 self, _args: DeliverSignalArgs, ) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn claim_resumed_execution<'life0, 'async_trait>( &'life0 self, _args: ClaimResumedExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn scan_eligible_executions<'life0, 'async_trait>( &'life0 self, _args: ScanEligibleArgs, ) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn issue_claim_grant<'life0, 'async_trait>( &'life0 self, _args: IssueClaimGrantArgs, ) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn block_route<'life0, 'async_trait>( &'life0 self, _args: BlockRouteArgs, ) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn claim_execution<'life0, 'async_trait>( &'life0 self, _args: ClaimExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _downstream_execution_id: &'life2 ExecutionId, _policy: EdgeDependencyPolicy, ) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>( &'life0 self, _args: RotateWaitpointHmacSecretAllArgs, ) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn seed_waitpoint_hmac_secret<'life0, 'async_trait>( &'life0 self, _args: SeedWaitpointHmacSecretArgs, ) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn read_stream<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _attempt_index: AttemptIndex, _from: StreamCursor, _to: StreamCursor, _count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn tail_stream<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _attempt_index: AttemptIndex, _after: StreamCursor, _block_ms: u64, _count_limit: u64, _visibility: TailVisibility, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn read_summary<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _attempt_index: AttemptIndex, ) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn create_execution<'life0, 'async_trait>( &'life0 self, _args: CreateExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn create_flow<'life0, 'async_trait>( &'life0 self, _args: CreateFlowArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn add_execution_to_flow<'life0, 'async_trait>( &'life0 self, _args: AddExecutionToFlowArgs, ) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn stage_dependency_edge<'life0, 'async_trait>( &'life0 self, _args: StageDependencyEdgeArgs, ) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn apply_dependency_to_child<'life0, 'async_trait>( &'life0 self, _args: ApplyDependencyToChildArgs, ) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn resolve_dependency<'life0, 'async_trait>( &'life0 self, _args: ResolveDependencyArgs, ) -> Pin<Box<dyn Future<Output = Result<ResolveDependencyOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn cascade_completion<'life0, 'life1, 'async_trait>( &'life0 self, _payload: &'life1 CompletionPayload, ) -> Pin<Box<dyn Future<Output = Result<CascadeOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn cancel_execution<'life0, 'async_trait>( &'life0 self, _args: CancelExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn change_priority<'life0, 'async_trait>( &'life0 self, _args: ChangePriorityArgs, ) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn replay_execution<'life0, 'async_trait>( &'life0 self, _args: ReplayExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn revoke_lease<'life0, 'async_trait>( &'life0 self, _args: RevokeLeaseArgs, ) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn complete_execution<'life0, 'async_trait>( &'life0 self, _args: CompleteExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<CompleteExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn fail_execution<'life0, 'async_trait>( &'life0 self, _args: FailExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<FailExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn renew_lease<'life0, 'async_trait>( &'life0 self, _args: RenewLeaseArgs, ) -> Pin<Box<dyn Future<Output = Result<RenewLeaseResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn resume_execution<'life0, 'async_trait>( &'life0 self, _args: ResumeExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ResumeExecutionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn check_admission<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _quota_policy_id: &'life1 QuotaPolicyId, _dimension: &'life2 str, _args: CheckAdmissionArgs, ) -> Pin<Box<dyn Future<Output = Result<CheckAdmissionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn block_execution_for_admission<'life0, 'async_trait>( &'life0 self, _args: BlockExecutionForAdmissionArgs, ) -> Pin<Box<dyn Future<Output = Result<BlockExecutionForAdmissionOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn read_budget_usage_and_limits<'life0, 'life1, 'async_trait>( &'life0 self, _budget_id: &'life1 BudgetId, ) -> Pin<Box<dyn Future<Output = Result<BudgetUsageAndLimits, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn read_quota_policy_limits<'life0, 'life1, 'async_trait>( &'life0 self, _quota_policy_id: &'life1 QuotaPolicyId, ) -> Pin<Box<dyn Future<Output = Result<Option<QuotaPolicyLimits>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn evaluate_flow_eligibility<'life0, 'async_trait>( &'life0 self, _args: EvaluateFlowEligibilityArgs, ) -> Pin<Box<dyn Future<Output = Result<EvaluateFlowEligibilityResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn record_spend<'life0, 'async_trait>( &'life0 self, _args: RecordSpendArgs, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn release_budget<'life0, 'async_trait>( &'life0 self, _args: ReleaseBudgetArgs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn release_admission<'life0, 'async_trait>( &'life0 self, _args: ReleaseAdmissionArgs, ) -> Pin<Box<dyn Future<Output = Result<ReleaseAdmissionResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn deliver_approval_signal<'life0, 'async_trait>( &'life0 self, _args: DeliverApprovalSignalArgs, ) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn issue_grant_and_claim<'life0, 'async_trait>( &'life0 self, _args: IssueGrantAndClaimArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimGrantOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn create_budget<'life0, 'async_trait>( &'life0 self, _args: CreateBudgetArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn reset_budget<'life0, 'async_trait>( &'life0 self, _args: ResetBudgetArgs, ) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn create_quota_policy<'life0, 'async_trait>( &'life0 self, _args: CreateQuotaPolicyArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn get_budget_status<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 BudgetId, ) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn report_usage_admin<'life0, 'life1, 'async_trait>( &'life0 self, _budget: &'life1 BudgetId, _args: ReportUsageAdminArgs, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn get_execution_result<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn list_pending_waitpoints<'life0, 'async_trait>( &'life0 self, _args: ListPendingWaitpointsArgs, ) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn ping<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn register_worker<'life0, 'async_trait>( &'life0 self, _args: RegisterWorkerArgs, ) -> Pin<Box<dyn Future<Output = Result<RegisterWorkerOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn heartbeat_worker<'life0, 'async_trait>( &'life0 self, _args: HeartbeatWorkerArgs, ) -> Pin<Box<dyn Future<Output = Result<HeartbeatWorkerOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn mark_worker_dead<'life0, 'async_trait>( &'life0 self, _args: MarkWorkerDeadArgs, ) -> Pin<Box<dyn Future<Output = Result<MarkWorkerDeadOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn list_expired_leases<'life0, 'async_trait>( &'life0 self, _args: ListExpiredLeasesArgs, ) -> Pin<Box<dyn Future<Output = Result<ListExpiredLeasesResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn list_workers<'life0, 'async_trait>( &'life0 self, _args: ListWorkersArgs, ) -> Pin<Box<dyn Future<Output = Result<ListWorkersResult, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn claim_for_worker<'life0, 'async_trait>( &'life0 self, _args: ClaimForWorkerArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn backend_label(&self) -> &'static str { ... } fn as_any(&self) -> &(dyn Any + 'static) { ... } fn capabilities(&self) -> Capabilities { ... } fn prepare<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn shutdown_prepare<'life0, 'async_trait>( &'life0 self, _grace: Duration, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn cancel_flow_header<'life0, 'async_trait>( &'life0 self, _args: CancelFlowArgs, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _execution_id: &'life2 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn read_execution_info<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn read_execution_state<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn subscribe_lease_history<'life0, 'life1, 'async_trait>( &'life0 self, _cursor: StreamCursor, _filter: &'life1 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn subscribe_completion<'life0, 'life1, 'async_trait>( &'life0 self, _cursor: StreamCursor, _filter: &'life1 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>( &'life0 self, _cursor: StreamCursor, _filter: &'life1 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn subscribe_instance_tags<'life0, 'async_trait>( &'life0 self, _cursor: StreamCursor, ) -> Pin<Box<dyn Future<Output = Result<InstanceTagSubscription, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn mark_lease_expired_if_due<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn promote_delayed<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _lane: &'life1 LaneId, _execution_id: &'life2 ExecutionId, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn close_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _waitpoint_id: &'life2 str, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn expire_execution<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _phase: ExpirePhase, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn expire_suspension<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn unblock_execution<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _partition: Partition, _lane_id: &'life1 LaneId, _execution_id: &'life2 ExecutionId, _expected_blocking_reason: &'life3 str, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait { ... } fn drain_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_partition: Partition, _flow_id: &'life1 FlowId, _downstream_eid: &'life2 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn reconcile_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_partition: Partition, _flow_id: &'life1 FlowId, _downstream_eid: &'life2 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<SiblingCancelReconcileAction, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn reconcile_execution_index<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _lanes: &'life1 [LaneId], _filter: &'life2 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn reconcile_budget_counters<'life0, 'async_trait>( &'life0 self, _partition: Partition, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn reconcile_quota_counters<'life0, 'async_trait>( &'life0 self, _partition: Partition, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... } fn project_flow_summary<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _flow_id: &'life1 FlowId, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<bool, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait { ... } fn trim_retention<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _lane_id: &'life1 LaneId, _retention_ms: u64, _now_ms: TimestampMs, _batch_size: u32, _filter: &'life2 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<u32, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait { ... } fn read_exec_core_fields<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _fields: &'life2 [&'life3 str], ) -> Pin<Box<dyn Future<Output = Result<HashMap<String, Option<String>>, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait { ... } fn server_time_ms<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<u64, EngineError>> + Send + 'async_trait>> where Self: 'async_trait, 'life0: 'async_trait { ... }
}
Expand description

The engine write surface — a single trait a backend implementation honours to serve a FlowFabricWorker.

See RFC-012 §3.1 for the inventory rationale and §3.3 for the type-level shape. 16 methods (Round-7 added create_waitpoint; append_frame return widened; report_usage return replaced — RFC-012 §R7). Issue #150 added the two trigger-surface methods (deliver_signal / claim_resumed_execution).

§Note on complete payload shape

The RFC §3.3 sketch uses Option<Bytes>; the Stage 1a trait uses Option<Vec<u8>> to match the existing ff_sdk::ClaimedTask::complete signature and avoid adding a bytes public-type dep for zero consumer benefit. Round-4 §7.17 resolved the payload container debate to Box<[u8]> in the public type (see HandleOpaque); Option<Vec<u8>> is the zero-churn choice consistent with today’s code. Consumers that need &[u8] can borrow via .as_deref() on the Option.

Required Methods§

Source

fn claim<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, lane: &'life1 LaneId, capabilities: &'life2 CapabilitySet, policy: ClaimPolicy, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Fresh-work claim. Returns Ok(None) when no work is currently available; Err only on transport or input-validation faults.

Source

fn renew<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Renew a held lease. Returns the updated expiry + epoch on success; typed State::StaleLease / State::LeaseExpired when the lease has been stolen or timed out.

Source

fn progress<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, percent: Option<u8>, message: Option<String>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Numeric-progress heartbeat.

Writes scalar progress_percent / progress_message fields on exec_core; each call overwrites the previous value. This does NOT append to the output stream — stream-frame producers must use append_frame instead.

Source

fn append_frame<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, frame: Frame, ) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Append one stream frame. Distinct from progress per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry id and post-append frame count (RFC-012 §R7.2.1).

Stream-frame producers (arbitrary frame_type + payload, consumed via the read/tail surfaces) MUST use this method rather than progress; the latter updates scalar fields on exec_core and is invisible to stream consumers.

Source

fn complete<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, payload: Option<Vec<u8>>, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Terminal success. Borrows handle (round-4 M-D2) so callers can retry under EngineError::Transport without losing the cookie. Payload is Option<Vec<u8>> per the note above.

Source

fn fail<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: FailureReason, classification: FailureClass, ) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Terminal failure with classification. Returns FailOutcome so the caller learns whether a retry was scheduled.

Source

fn cancel<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, reason: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Cooperative cancel by the worker holding the lease.

Source

fn suspend<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, args: SuspendArgs, ) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Suspend the execution awaiting a typed resume condition (RFC-013 Stage 1d).

Borrows handle (round-4 M-D2). Terminal-looking behaviour is expressed through SuspendOutcome:

  • SuspendOutcome::Suspended — the pre-suspend handle is logically invalidated; the fresh HandleKind::Suspended handle inside the variant supersedes it. Runtime enforcement via the fence triple: subsequent ops against the stale handle surface as Contention(LeaseConflict).
  • SuspendOutcome::AlreadySatisfied — buffered signals on a pending waitpoint already matched the resume condition at suspension time. The lease is NOT released; the caller’s pre-suspend handle remains valid.

See RFC-013 §2 for the type shapes, §3 for the replay / idempotency contract, §4 for the error taxonomy.

Source

fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, waitpoint_key: &'life2 str, expires_in: Duration, ) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Issue a pending waitpoint for future signal delivery.

Waitpoints have two states in the Valkey wire contract: pending (token issued, not yet backing a suspension) and active (bound to a suspension). This method creates a waitpoint in the pending state. A later suspend call transitions a pending waitpoint to active (see Lua use_pending_waitpoint ARGV flag at flowfabric.lua:3603,3641,3690) — or, if buffered signals already satisfy its condition, the suspend call returns SuspendOutcome::AlreadySatisfied and the waitpoint activates without ever releasing the lease.

Pending-waitpoint expiry is a first-class terminal error on the wire (PendingWaitpointExpired at ff-script/src/error.rs:170,403-408). The attempt retains its lease while the waitpoint is pending; signals delivered to this waitpoint are buffered server-side (RFC-012 §R7.2.2).

Source

fn observe_signals<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Non-mutating observation of signals that satisfied the handle’s resume condition.

Source

fn claim_from_resume_grant<'life0, 'async_trait>( &'life0 self, token: ResumeToken, ) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Consume a resume grant (via ResumeToken) to mint a resumed-kind handle. Routes to ff_claim_resumed_execution on Valkey / the epoch-bump reconciler on PG/SQLite. Returns Ok(None) when the grant’s target execution is no longer resumable (already reclaimed, terminal, etc.).

Renamed from claim_from_reclaim (RFC-024 PR-B+C). The pre-rename name advertised “reclaim” but the semantic has always been resume-after-suspend. The new lease-reclaim path lives on Self::reclaim_execution.

Source

fn delay<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, delay_until: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Park the execution until delay_until, releasing the lease.

Source

fn wait_children<'life0, 'life1, 'async_trait>( &'life0 self, handle: &'life1 Handle, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Mark the execution as waiting for its child flow to complete, releasing the lease.

Source

fn describe_execution<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Snapshot an execution by id. Ok(None) ⇒ no such execution.

Source

fn read_execution_context<'life0, 'life1, 'async_trait>( &'life0 self, execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Point-read of the execution-scoped (input_payload, execution_kind, tags) bundle used by the SDK worker when assembling a ClaimedTask (see ff_sdk::ClaimedTask) after a successful claim.

No default impl — every EngineBackend must answer this explicitly. Distinct from Self::describe_execution (read-model projection) because the SDK needs the raw payload bytes + kind + tags immediately post-claim, and the snapshot projection deliberately omits the payload bytes.

Per-backend shape:

  • Valkey — pipelined GET :payload + HGETALL :core
  • Postgres — single SELECT payload, raw_fields on ff_exec_core keyed by (partition_key, execution_id); execution_kind + tags live in raw_fields JSONB.
  • SQLite — identical shape to Postgres.

Returns EngineError::Validation { kind: ValidationKind::InvalidInput, .. } when the execution does not exist — the SDK worker only calls this after a successful claim, so a missing row is a loud storage-tier invariant violation rather than a routine Ok(None).

Source

fn describe_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Snapshot a flow by id. Ok(None) ⇒ no such flow.

Source

fn cancel_flow<'life0, 'life1, 'async_trait>( &'life0 self, id: &'life1 FlowId, policy: CancelFlowPolicy, wait: CancelFlowWait, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Operator-initiated cancellation of a flow and (optionally) its member executions. See RFC-012 §3.1.1 for the policy /wait matrix.

Source

fn report_usage<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, handle: &'life1 Handle, budget: &'life2 BudgetId, dimensions: UsageDimensions, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Report usage against a budget and check limits. Returns the typed ReportUsageResult variant; backends enforce idempotency via the caller-supplied [UsageDimensions::dedup_key] (RFC-012 §R7.2.3 — replaces the pre-Round-7 AdmissionDecision return).

Provided Methods§

Source

fn suspend_by_triple<'life0, 'async_trait>( &'life0 self, exec_id: ExecutionId, triple: LeaseFence, args: SuspendArgs, ) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Suspend by execution id + lease fence triple, for service-layer callers that hold a run record / lease-claim descriptor but no worker Handle (cairn issue #322).

Semantics mirror Self::suspend exactly — the same SuspendArgs validation, the same SuspendOutcome lifecycle, the same RFC-013 §3 dedup / replay contract. The only difference is the fencing source: instead of the (lease_id, lease_epoch, attempt_id) fields embedded in a Handle, the backend fences against the triple passed directly. Attempt-index, lane, and worker-instance metadata that Self::suspend reads from the handle payload are recovered from the backend’s authoritative execution record (Valkey: exec_core HGETs; Postgres: ff_attempt row lookup).

The default impl returns EngineError::Unavailable so existing backend impls remain non-breaking. Production backends (Valkey, Postgres) override.

Source

fn read_waitpoint_token<'life0, 'life1, 'async_trait>( &'life0 self, _partition: PartitionKey, _waitpoint_id: &'life1 WaitpointId, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the HMAC token stored on a waitpoint record, keyed by (partition, waitpoint_id).

Returns Ok(Some(token)) when the waitpoint exists and carries a token, Ok(None) when the waitpoint does not exist or no token field is present. A missing waitpoint is not an error — signals can legitimately arrive after a waitpoint has been consumed or expired, and the signal-bridge authenticates on the presence of a matching token, not on the waitpoint’s liveness.

§Use case

Control-plane signal delivery (cairn signal-bridge): at signal-resume time the bridge reads the token off the waitpoint hash / row to authenticate the resume request it subsequently issues. Previously implemented as direct ferriskey::Client::hget(waitpoint_key, "waitpoint_token") — Valkey-only. This method routes the read through the trait so the pattern works on Postgres + SQLite as well.

§Per-backend shape
  • ValkeyHGET ff:wp:<tag>:<waitpoint_id> waitpoint_token on the waitpoint’s partition. Empty string / missing field maps to None.
  • PostgresSELECT token FROM ff_waitpoint_pending WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1. Row-absent → None; empty tokenNone.
  • SQLite — same shape as Postgres.

The partition argument is the opaque PartitionKey produced by FlowFabric (typically extracted from the Handle / ResumeToken the waitpoint was minted against).

§Default impl

Returns EngineError::Unavailable with op = "read_waitpoint_token" so out-of-tree backends and in-tree backends not yet overriding this method continue to compile. Mirrors the precedent used by Self::issue_reclaim_grant / Self::reclaim_execution.

Source

fn issue_reclaim_grant<'life0, 'async_trait>( &'life0 self, _args: IssueReclaimGrantArgs, ) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions in lease_expired_reclaimable or lease_revoked state to the reclaim path; the returned IssueReclaimGrantOutcome::Granted carries a crate::contracts::ReclaimGrant which is then fed to Self::reclaim_execution to mint a fresh attempt.

Default impl returns EngineError::Unavailable — PR-D (PG), PR-E (SQLite), and PR-F (Valkey) override with real bodies.

Source

fn reclaim_execution<'life0, 'async_trait>( &'life0 self, _args: ReclaimExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Consume a crate::contracts::ReclaimGrant to mint a fresh attempt for a previously lease-expired / lease-revoked execution (RFC-024 §3.2). Creates a new attempt row, bumps the execution’s lease_reclaim_count, and mints a crate::backend::HandleKind::Reclaimed handle.

Default impl returns EngineError::Unavailable — PR-D (PG), PR-E (SQLite), and PR-F (Valkey) override with real bodies.

Source

fn read_current_attempt_index<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Point-read of the execution’s current attempt-index pointer — the index of the currently-leased attempt row.

Distinct from Self::read_total_attempt_count: this method names the attempt that already exists (pointer), whereas read_total_attempt_count is the monotonic claim counter used to compute the next fresh attempt index. See the sibling’s rustdoc for the retry-path scenario that motivates the split.

Used on the SDK worker’s claim_from_resume_grant path — specifically the private claim_resumed_execution helper — immediately before dispatching Self::claim_resumed_execution. The returned index is fed into ClaimResumedExecutionArgs::current_attempt_index so the backend’s script / transaction targets the correct existing attempt row (KEYS[6] on Valkey; ff_attempt PK tuple on PG/SQLite).

Per-backend shape:

  • ValkeyHGET {exec}:core current_attempt_index on the execution’s partition. Single command. Both the missing-field case (exec_core present but current_attempt_index absent or empty-string, i.e. pre-claim state) and the missing-row case (no exec_core hash at all) read back as AttemptIndex(0). This preserves the pre-PR-3 inline-HGET semantic and is safe because Valkey’s happy path requires exec_core to exist before this method is reached — the SDK only calls read_current_attempt_index post-grant, and grant issuance is gated on exec_core presence. A genuinely absent row would surface as the proper business-logic error (NotAResumedExecution / ExecutionNotLeaseable) on the downstream FCALL.
  • PostgresSELECT attempt_index FROM ff_exec_core WHERE partition_key = $1 AND execution_id = $2. The column is NOT NULL DEFAULT 0 so a pre-claim row reads back as 0 (matching Valkey’s missing-field case). Missing row surfaces as EngineError::Validation { kind: ValidationKind::InvalidInput, .. } — diverges from Valkey’s missing-row → 0 mapping.
  • SQLiteSELECT attempt_index FROM ff_exec_core WHERE partition_key = ? AND execution_id = ?; identical semantics to Postgres (missing-row → InvalidInput).

Cross-backend asymmetry on missing row is intentional. The SDK happy path never observes it (grant issuance on Valkey requires exec_core, and PG/SQLite currently return Unavailable from claim_from_grant per project_claim_from_grant_pg_sqlite_gap.md). Consumers writing backend-agnostic tooling against this method directly must treat the missing-row case as backend-dependent — match on InvalidInput for PG/SQLite, and treat an unexpected 0 as the Valkey equivalent signal.

The default impl returns EngineError::Unavailable so the trait addition is non-breaking for out-of-tree backends (same precedent as Self::read_execution_context landing in v0.12 PR-1).

Source

fn read_total_attempt_count<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Point-read of the execution’s total attempt counter — the monotonic count of claims that have ever fired against this execution (including the in-flight one once claimed).

Used on the SDK worker’s claim_from_grant / claim_execution path — the next attempt-index for a fresh claim is this counter’s current value (so 1 on the second retry after the first attempt failed terminally). This is semantically distinct from Self::read_current_attempt_index, which is a pointer at the currently-leased attempt row and is only meaningful on the claim_from_resume_grant path (where a live attempt already exists and we want to re-seat its lease rather than mint a new attempt row).

Reading the pointer on the claim_from_grant path was a live bug: on the retry-of-a-retry scenario the pointer still named the previous terminal-failed attempt, so the newly-minted attempt collided with it (Valkey KEYS[6]) or mis-targeted the PG/SQLite ff_attempt PK tuple. This method fixes that by reading the counter that Lua 5920 / PG ff_claim_execution / SQLite claim_impl all already consult when computing the next attempt index.

Per-backend shape:

  • ValkeyHGET {exec}:core total_attempt_count on the execution’s partition. Single command; pre-claim read (field absent or empty) maps to 0.
  • PostgresSELECT raw_fields->>'total_attempt_count' FROM ff_exec_core WHERE (partition_key, execution_id) = .... The field lives in the JSONB raw_fields bag rather than a dedicated column (mirrors how create_execution_impl seeds it on row creation). Missing row → InvalidInput; missing field → 0.
  • SQLiteSELECT CAST(json_extract(raw_fields, '$.total_attempt_count') AS INTEGER) FROM ff_exec_core WHERE .... Same JSON-in-raw_fields shape as PG; uses the same json_extract idiom already employed in ff-backend-sqlite/src/queries/operator.rs for replay_count.

The default impl returns EngineError::Unavailable so the trait addition is non-breaking for out-of-tree backends (same precedent as Self::read_current_attempt_index landing in v0.12 PR-3).

Source

fn set_execution_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _key: &'life2 str, _value: &'life3 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Set a single namespaced tag on an execution. Tag key MUST match the reserved caller-namespace pattern ^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$ — i.e. <caller>.<field> — or the call returns EngineError::Validation { kind: ValidationKind::InvalidInput, .. } with the offending key in detail. value is arbitrary UTF-8.

The namespace prefix is carried inline in key (e.g. "cairn.session_id") — there is no separate namespace arg. This matches the existing ff_set_execution_tags wire shape and the flow-tag projection in ExecutionSnapshot::tags.

Validation is performed by each overriding backend impl via validate_tag_key before the wire hop so PG / SQLite / Valkey reject the same set of keys. The default trait impl returns EngineError::Unavailable without running validation — there is no meaningful storage to validate against on an unsupported backend, and surfacing Unavailable before Validation matches the precedence used elsewhere on the trait. Backends MAY additionally validate on the storage tier (Valkey’s Lua path does, with a more permissive prefix-only check).

Per-backend shape:

  • Valkeyff_set_execution_tags FCALL with a single {key → value} pair. Routes through the existing Lua contract (no new wire format).
  • PostgresUPDATE ff_exec_core SET raw_fields = jsonb_set( coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value)) WHERE (partition_key, execution_id) = .... Same storage shape read by Self::describe_execution / Self::read_execution_context.
  • SQLiteUPDATE ff_exec_core SET raw_fields = json_set( coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE .... The key is quoted in the JSON path so dots inside the namespaced key (e.g. cairn.session_id) are treated as a single literal member name rather than JSON-path separators — yielding the same flat raw_fields.tags shape as PG.

Missing execution surfaces as EngineError::NotFound { entity: "execution" } — matches the Valkey FCALL’s execution_not_found mapping and the existing ScriptError::ExecutionNotFoundEngineError conversion (ff_script::engine_error_ext).

The default impl returns EngineError::Unavailable so the trait addition is non-breaking for out-of-tree backends.

Source

fn set_flow_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _key: &'life2 str, _value: &'life3 str, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Set a single namespaced tag on a flow. Same namespace rule as Self::set_execution_tag: key MUST match ^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$.

Per-backend shape:

  • Valkeyff_set_flow_tags FCALL with a single pair. Tags land on the dedicated ff:flow:{fp:N}:<flow_id>:tags hash, not on the flow_core hash (diverges from the execution shape — execution tags live on ff:exec:...:tags by the same split). Lazy migration on first write: the Lua (ff_script::flowfabric.lua, ff_set_flow_tags) scans flow_core once per flow for pre-58.4 inline namespaced fields (anything matching ^[a-z][a-z0-9_]*\.), HSETs them onto :tags, HDELs them from flow_core, and stamps tags_migrated=1 on flow_core so subsequent calls short-circuit to O(1). This heals flows created before RFC-058.4 landed; well-formed flows pay the migration cost only on their very first tag write. Callers MUST read tags via Self::get_flow_tag (HGET :tags <key>) — direct HGETALL against flow_core will not see post-migration values.

    Cross-backend parity caveat on describe_flow: the pre-existing ValkeyBackend::describe_flow / FlowSnapshot::tags read path snapshots flow_core fields only and does NOT today merge the :tags sub-hash, whereas Postgres describe_flow DOES surface flow tags via ff_backend_postgres::flow::extract_tags (which reads them off raw_fields — the same store set_flow_tag writes on PG). Trait consumers MUST NOT assume a tag written here will be visible via describe_flow on every backend: on Valkey, callers that need the full tag set should complement the snapshot with per-key Self::get_flow_tag reads. Extending Valkey describe_flow to merge :tags is additive and out of scope for this trait addition.

  • PostgresUPDATE ff_flow_core SET raw_fields = jsonb_set(..., '{<key>}', ...) — flow tags are stored as top-level raw_fields keys (matches ff_backend_postgres::flow::extract_tags). No tags nesting on flows, which diverges from the execution shape.

  • SQLite — mirrors PG: UPDATE ff_flow_core SET raw_fields = json_set(..., '$."<key>"', $value) WHERE .... The key is quoted so the dotted namespaced key lands as a single flat top-level member of raw_fields.

Missing flow surfaces as EngineError::NotFound { entity: "flow" } (matches the Valkey FCALL’s flow_not_found mapping).

The default impl returns EngineError::Unavailable.

Source

fn get_execution_tag<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _key: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Read a single namespaced execution tag. Returns Ok(None) when the tag is absent or the execution row does not exist — the two cases are not distinguished on the read path. Callers that need to distinguish should call Self::describe_execution first (an Ok(None) from that method proves the execution is absent). This matches Valkey’s native HGET semantics and keeps the read path at a single round-trip on every backend.

key must pass validate_tag_key — a malformed key can never be present in storage so the call short-circuits with EngineError::Validation { kind: ValidationKind::InvalidInput, .. } rather than round-tripping.

Per-backend shape:

  • ValkeyHGET :tags <key> on the execution’s partition.
  • PostgresSELECT raw_fields->'tags'->><key> FROM ff_exec_core WHERE ... with fetch_optional → missing row collapses to None.
  • SQLiteSELECT json_extract(raw_fields, '$.tags."<key>"') FROM ff_exec_core WHERE ... with the same collapse. The key is quoted in the JSON path so dotted namespaced keys resolve to the flat literal member written by set_execution_tag.

The default impl returns EngineError::Unavailable.

Source

fn get_execution_namespace<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read an execution’s namespace scalar. Returns Ok(None) when the row is absent or the field is unset. Dedicated point-read used by the scanner per-candidate filter (should_skip_candidate) to preserve the 1-HGET cost contract documented in ff_engine::scanner::should_skip_candidatedescribe_execution is heavier (HGETALL / full snapshot) and unnecessary when only the namespace scalar is needed.

Per-backend shape:

  • ValkeyHGET :core namespace on the execution’s partition (single field read on the already-hot exec_core hash).
  • PostgresSELECT raw_fields->>'namespace' FROM ff_exec_core WHERE partition_key = $1 AND execution_id = $2.
  • SQLiteSELECT json_extract(raw_fields, '$.namespace') FROM ff_exec_core WHERE ....

The default impl returns EngineError::Unavailable.

Source

fn get_flow_tag<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _key: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Read a single namespaced flow tag. Returns Ok(None) when the tag is absent or the flow row does not exist (same collapse semantics as Self::get_execution_tag). Symmetry partner — consumers like cairn read cairn.session_id off flows for archival.

key must pass validate_tag_key.

Per-backend shape:

  • ValkeyHGET :tags <key> on the flow’s partition.
  • PostgresSELECT raw_fields->><key> FROM ff_flow_core WHERE ... (top-level raw_fields key, matches the flow-tag storage shape).
  • SQLiteSELECT json_extract(raw_fields, '$."<key>"') FROM ff_flow_core WHERE ... (quoted key — see set_flow_tag).

The default impl returns EngineError::Unavailable.

Source

fn list_edges<'life0, 'life1, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _direction: EdgeDirection, ) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

List dependency edges adjacent to an execution. Read-only; the backend resolves the subject execution’s flow, reads the direction-specific adjacency SET, and decodes each member’s flow-scoped edge:<edge_id> hash.

Returns an empty Vec when the subject has no edges on the requested side — including standalone executions (no owning flow). Ordering is unspecified: the underlying adjacency SET is an unordered SMEMBERS read. Callers that need deterministic order should sort by EdgeSnapshot::edge_id / EdgeSnapshot::created_at themselves.

Parse failures on the edge hash surface as EngineError::Validation { kind: ValidationKind::Corruption, .. } — unknown fields, missing required fields, endpoint mismatches against the adjacency SET all fail loud rather than silently returning partial results.

Gated on the core feature — edge reads are part of the minimal engine surface a Postgres-style backend must honour.

Source

fn describe_edge<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _edge_id: &'life2 EdgeId, ) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Snapshot a single dependency edge by its owning flow + edge id.

Ok(None) when the edge hash is absent (never staged, or staged under a different flow than flow_id). Parse failures on a present edge hash surface as EngineError::Validation { kind: ValidationKind::Corruption, .. } — the stored flow_id field is cross-checked against the caller’s expected flow_id so a wrong-key read fails loud rather than returning an unrelated edge.

Gated on the core feature — single-edge reads are part of the minimal snapshot surface an alternate backend must honour alongside Self::describe_execution / Self::describe_flow / Self::list_edges.

Source

fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>( &'life0 self, _eid: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Resolve an execution’s owning flow id, if any.

Ok(None) when the execution’s core record is absent or has no associated flow (standalone execution). A present-but- malformed flow_id field surfaces as EngineError::Validation { kind: ValidationKind::Corruption, .. }.

Gated on the core feature. Used by ff-sdk’s list_outgoing_edges / list_incoming_edges to pivot from a consumer-supplied ExecutionId to the FlowId required by Self::list_edges. A Valkey backend serves this with a single HGET exec_core flow_id; a Postgres backend serves it with the equivalent single-column row lookup.

Source

fn list_flows<'life0, 'async_trait>( &'life0 self, _partition: PartitionKey, _cursor: Option<FlowId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List flows on a partition with cursor-based pagination (issue #185).

Returns a ListFlowsPage of FlowSummary rows ordered by flow_id (UUID byte-lexicographic). cursor is None for the first page; callers forward the returned next_cursor verbatim to continue iteration, and the listing is exhausted when next_cursor is None. limit is the maximum number of rows to return on this page — implementations MAY return fewer (end of partition) but MUST NOT exceed it.

Ordering rationale: flow ids are UUIDs, and both Valkey (sort after-the-fact) and Postgres (ORDER BY flow_id) can agree on byte-lexicographic order — the same order FlowId::to_string() produces for canonical hyphenated UUIDs. Mapping to cursor > flow_id keeps the contract backend- independent.

§Postgres implementation pattern

A Postgres-backed implementation serves this directly with

SELECT flow_id, created_at_ms, public_flow_state
  FROM ff_flow
 WHERE partition_key = $1
   AND ($2::uuid IS NULL OR flow_id > $2)
 ORDER BY flow_id
 LIMIT $3 + 1;

— reading one extra row to decide whether next_cursor should be set to the last row’s flow_id. The Valkey implementation maintains the ff:idx:{fp:N}:flow_index SET and performs the sort + slice client-side (SMEMBERS then sort-by-UUID-bytes), pipelining HGETALL flow_core for each row on the page.

Gated on the core feature — flow listing is part of the minimal engine surface a Postgres-style backend must honour.

Source

fn list_lanes<'life0, 'async_trait>( &'life0 self, _cursor: Option<LaneId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enumerate registered lanes with cursor-based pagination.

Lanes are global (not partition-scoped) — the backend serves this from its lane registry and does NOT accept a crate::partition::Partition argument. Results are sorted by LaneId name so the ordering is stable across calls and cursors address a deterministic position in the sort.

  • cursor — exclusive lower bound. None starts from the first lane. To continue a walk, pass the previous page’s ListLanesPage::next_cursor.
  • limit — hard cap on the number of lanes returned in the page. Backends MAY round this down when the registry size is smaller; they MUST NOT return more than limit.

ListLanesPage::next_cursor is Some(last_lane_in_page) iff at least one more lane exists after the returned page, and None on the final page. Callers loop until next_cursor is None to read the full registry.

Gated on the core feature — lane enumeration is part of the minimal snapshot surface an alternate backend must honour alongside Self::describe_flow / Self::list_edges.

Source

fn list_suspended<'life0, 'async_trait>( &'life0 self, _partition: PartitionKey, _cursor: Option<ExecutionId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List suspended executions in one partition, cursor-paginated, with each entry’s suspension reason_code populated (issue #183).

Consumer-facing “what’s blocked on what?” panels (ff-board’s suspended-executions view, operator CLIs) need the reason in the list response so the UI does not round-trip per row to describe_execution for a field it knows it needs. reason on [SuspendedExecutionEntry] carries the free-form suspension:current.reason_code field — see the type rustdoc for the String-not-enum rationale.

cursor is opaque to callers; pass None to start a fresh scan and feed the returned ListSuspendedPage::next_cursor back in on subsequent pages until it comes back None. limit bounds the entries count; backends MAY return fewer when the partition is exhausted.

Ordering is by ascending suspended_at_ms (the per-lane suspended ZSET score == timeout_at or the no-timeout sentinel) with execution id as a lex tiebreak, so cursor continuation is deterministic across calls.

Gated on the core feature — suspended-list enumeration is part of the minimal engine surface a Postgres-style backend must honour.

Source

fn list_executions<'life0, 'async_trait>( &'life0 self, _partition: PartitionKey, _cursor: Option<ExecutionId>, _limit: usize, ) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Forward-only paginated listing of the executions indexed under one partition.

Reads the partition-wide ff:idx:{p:N}:all_executions set, sorts lexicographically on ExecutionId, and returns the page of ids strictly greater than cursor (or starting from the smallest id when cursor = None). The returned ListExecutionsPage::next_cursor is the last id on the page iff at least one more id exists past it; None signals end-of-stream.

limit is the maximum number of ids returned on this page. A limit of 0 returns an empty page with next_cursor = None. Backends MAY cap limit internally (Valkey: 1000) and return fewer ids than requested; callers continue paginating until next_cursor == None.

Ordering is stable under concurrent inserts for already-emitted ids (an id less-than-or-equal-to the caller’s cursor is never re-emitted in later pages) but new inserts past the cursor WILL appear in subsequent pages — consistent with forward-only cursor semantics.

Gated on the core feature — partition-scoped listing is part of the minimal engine surface every backend must honour.

Source

fn deliver_signal<'life0, 'async_trait>( &'life0 self, _args: DeliverSignalArgs, ) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Deliver an external signal to a suspended execution’s waitpoint.

The backend atomically records the signal, evaluates the resume condition, and — when satisfied — transitions the execution from suspended to runnable (or buffers the signal when the waitpoint is still pending). Duplicate delivery — same idempotency_key + waitpoint — surfaces as DeliverSignalResult::Duplicate with the pre-existing signal_id rather than mutating state twice.

Input validation (HMAC token presence, payload size limits, signal-name shape) is the backend’s responsibility; callers pass a fully populated DeliverSignalArgs and receive typed outcomes or typed errors (ScriptError::invalid_token, ScriptError::token_expired, ScriptError::ExecutionNotFound surfaced via EngineError::Transport on the Valkey backend).

Gated on the core feature — signal delivery is part of the minimal trigger surface every backend must honour so ff-server / REST handlers can dispatch against Arc<dyn EngineBackend> without knowing which backend is running underneath.

Source

fn claim_resumed_execution<'life0, 'async_trait>( &'life0 self, _args: ClaimResumedExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Claim a resumed execution — a previously-suspended attempt that has cleared its resume condition (e.g. via Self::deliver_signal) and now needs a worker to pick up the same attempt index.

Distinct from Self::claim (fresh work) and Self::claim_from_resume_grant (grant-based ownership transfer after a crash): the resumed-claim path re-binds an existing attempt rather than minting a new one. The backend issues a fresh lease_id + bumps the lease_epoch, preserving attempt_id / attempt_index so stream frames and progress updates continue on the same attempt.

Typed failures surface via ScriptErrorEngineError: NotAResumedExecution when the attempt state is not attempt_interrupted, ExecutionNotLeaseable when the lifecycle phase is not runnable, and InvalidClaimGrant when the grant key is missing or was already consumed.

Gated on the core feature — resumed-claim is part of the minimal trigger surface every backend must honour.

Source

fn scan_eligible_executions<'life0, 'async_trait>( &'life0 self, _args: ScanEligibleArgs, ) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Scan a lane’s eligible ZSET on one partition for highest-priority executions awaiting a worker (v0.12 PR-5).

Lifted from the SDK-side ZRANGEBYSCORE inline on FlowFabricWorker::claim_next — the scheduler-bypass scanner gated behind direct-valkey-claim. The trait method itself is backend-agnostic; consumers that drive the scanner loop (bench harnesses, single-tenant dev) compose it with Self::issue_claim_grant + Self::claim_execution to replicate the pre-PR-5 claim_next body.

§Backend coverage
  • ValkeyZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit> on the lane’s partition-scoped eligible key. Single command; no script round-trip. Wire shape is byte-for-byte identical to the pre-PR SDK inline call so bench traces match pre-PR without new #[tracing::instrument] span names.
  • Postgres / SQLite — use the Err(Unavailable) default. PG/SQLite consumers drive work through the scheduler-routed Self::claim_for_worker path instead of the scanner primitives exposed here; lifting the scheduler itself onto the trait is RFC-024 follow-up scope. See project_claim_from_grant_pg_sqlite_gap.md for motivation.

Default impl returns EngineError::Unavailable so the trait addition is non-breaking for out-of-tree backends. Same precedent as Self::claim_execution landing in v0.12 PR-4.

Source

fn issue_claim_grant<'life0, 'async_trait>( &'life0 self, _args: IssueClaimGrantArgs, ) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Issue a claim grant — the scheduler’s admission write — for a single execution on a single lane (v0.12 PR-5).

Lifted from the SDK-side ff_issue_claim_grant inline helper on FlowFabricWorker::claim_next. The backend atomically writes the grant hash, appends to the per-worker grant index, and removes the execution from the lane’s eligible ZSET.

Typed rejects surface via EngineError::Validation: CapabilityMismatch when the worker’s capabilities do not cover the execution’s required_capabilities, InvalidInput for malformed args. Transport faults surface via EngineError::Transport.

§Backend coverage
  • Valkey — one ff_issue_claim_grant FCALL. KEYS/ARGV shape is byte-for-byte identical to the pre-PR SDK inline call; bench traces match pre-PR.
  • Postgres / SQLiteErr(Unavailable) default; use Self::claim_for_worker instead. See Self::scan_eligible_executions for the cross-link rationale.
Source

fn block_route<'life0, 'async_trait>( &'life0 self, _args: BlockRouteArgs, ) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Move an execution from a lane’s eligible ZSET into its blocked_route ZSET (v0.12 PR-5).

Lifted from the SDK-side ff_block_execution_for_admission inline helper on FlowFabricWorker::claim_next. Called after a Self::issue_claim_grant CapabilityMismatch reject — without a block step, the inline scanner would re-pick the same top-of-ZSET every tick (parity with ff-scheduler::Scheduler::block_candidate).

The engine’s unblock scanner periodically promotes blocked_route back to eligible once a worker with matching caps registers.

§Backend coverage
  • Valkey — one ff_block_execution_for_admission FCALL.
  • Postgres / SQLiteErr(Unavailable) default; the scheduler-routed Self::claim_for_worker path handles admission rejects server-side.
Source

fn claim_execution<'life0, 'async_trait>( &'life0 self, _args: ClaimExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Consume a scheduler-issued claim grant to mint a fresh attempt.

The SDK’s grant-consumer path — paired with FlowFabricWorker::claim_from_grant in ff-sdk — routes through this method. The scheduler has already validated budget / quota / capabilities and written a grant (Valkey claim_grant hash); this call atomically consumes that grant and creates the attempt row, mints lease_id + lease_epoch, and returns a ClaimExecutionResult::Claimed carrying the minted lease triple.

Distinct from Self::claim (the scheduler-bypass scanner used by the direct-valkey-claim feature) — this method assumes the grant already exists and skips capability / ZSET scanning. The Valkey impl fires exactly one ff_claim_execution FCALL.

Typed failures surface via ScriptErrorEngineError: UseClaimResumedExecution when the attempt is actually attempt_interrupted (caller should retry via Self::claim_resumed_execution — see ContentionKind at ff_core::engine_error), InvalidClaimGrant when the grant is missing / consumed / worker-mismatched, CapabilityMismatch when the execution’s required_capabilities drifted after grant issuance.

§Backend coverage
  • Valkey — implemented in ff-backend-valkey (one ff_claim_execution FCALL).
  • Postgres / SQLite — use the Err(Unavailable) default in this PR. Grants on PG / SQLite today flow through PostgresScheduler::claim_for_worker (a sibling struct, not an EngineBackend method); wiring the default-over-trait behaviour into a PG / SQLite claim_execution impl lands with a future RFC-024 grant-consumer extension.

The default impl returns EngineError::Unavailable so the trait addition is non-breaking for out-of-tree backends. Same precedent as Self::read_current_attempt_index landing in v0.12 PR-3.

Source

fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _downstream_execution_id: &'life2 ExecutionId, _policy: EdgeDependencyPolicy, ) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

RFC-016 Stage A: set the inbound-edge-group policy for a downstream execution. Must be called before the first add_dependency(... -> downstream_execution_id) — the backend rejects with EngineError::Conflict if edges have already been staged for this group.

Stage A honours only EdgeDependencyPolicy::AllOf; the AnyOf / Quorum variants return EngineError::Validation with detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B" until Stage B’s resolver lands.

Source

fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>( &'life0 self, _args: RotateWaitpointHmacSecretAllArgs, ) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Rotate the waitpoint HMAC signing kid cluster-wide.

v0.7 migration-master Q4 (adjudicated 2026-04-24). Additive trait surface so Valkey and Postgres backends can both expose the “rotate everywhere” semantic under one name.

  • Valkey impl fans out an ff_rotate_waitpoint_hmac_secret FCALL per execution partition. entries.len() == num_flow_partitions and per-partition failures are surfaced as inner Err entries — the call as a whole does not fail when one partition’s FCALL fails, matching [ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions]’s partial-success contract.
  • Postgres impl (Wave 4) writes one row to ff_waitpoint_hmac(kid, secret, rotated_at) and returns a single-entry vec with partition = 0.

The default impl returns EngineError::Unavailable with op = "rotate_waitpoint_hmac_secret_all" so backends that haven’t implemented the method surface the miss loudly rather than silently no-op’ing. Both concrete backends override.

Source

fn seed_waitpoint_hmac_secret<'life0, 'async_trait>( &'life0 self, _args: SeedWaitpointHmacSecretArgs, ) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Seed the initial waitpoint HMAC secret for a fresh deployment (issue #280).

Idempotent. If a current_kid (Valkey per-partition) or an active kid row (Postgres) already exists with the given kid, the method returns SeedOutcome::AlreadySeeded without overwriting, reporting whether the stored secret matches the caller-supplied one via same_secret. Callers (cairn boot, operator tooling) invoke this on every boot and let the backend decide whether to install — removing the client-side “check then HSET” race that cairn’s raw-HSET boot path silently tolerated.

For rotation of an already-seeded secret, use Self::rotate_waitpoint_hmac_secret_all instead; seed is install-only.

The default impl returns EngineError::Unavailable with op = "seed_waitpoint_hmac_secret" so backends that haven’t implemented the method surface the miss loudly.

Source

fn read_stream<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _attempt_index: AttemptIndex, _from: StreamCursor, _to: StreamCursor, _count_limit: u64, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read frames from a completed or in-flight attempt’s stream.

from / to are StreamCursor values — StreamCursor::Start / StreamCursor::End are equivalent to XRANGE - / +, and StreamCursor::At("<id>") reads from a concrete entry id.

Input validation (count_limit bounds, cursor shape) is the caller’s responsibility — SDK-side wrappers in ff-sdk enforce bounds before forwarding. Backends MAY additionally reject out-of-range input via EngineError::Validation.

Gated on the streaming feature — stream reads are part of the stream-subset surface a backend without XREAD-like primitives may omit.

Source

fn tail_stream<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _attempt_index: AttemptIndex, _after: StreamCursor, _block_ms: u64, _count_limit: u64, _visibility: TailVisibility, ) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Tail a live attempt’s stream.

after is an exclusive StreamCursor — entries with id strictly greater than after are returned. StreamCursor::Start / StreamCursor::End are NOT accepted here; callers MUST pass a concrete id (or StreamCursor::from_beginning()). The SDK wrapper rejects the open markers before reaching the backend.

block_ms == 0 → non-blocking peek. block_ms > 0 → blocks up to that many ms for a new entry.

visibility (RFC-015 §6.1) filters the returned entries by their stored StreamMode mode field. Default TailVisibility::All preserves v1 behaviour.

Gated on the streaming feature — see read_stream.

Source

fn read_summary<'life0, 'life1, 'async_trait>( &'life0 self, _execution_id: &'life1 ExecutionId, _attempt_index: AttemptIndex, ) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the rolling summary document for an attempt (RFC-015 §6.3).

Returns Ok(None) when no StreamMode::DurableSummary frame has ever been appended for the attempt. Non-blocking Hash read; safe to call from any consumer without holding the lease.

Gated on the streaming feature — summary reads are part of the stream-subset surface.

Source

fn create_execution<'life0, 'async_trait>( &'life0 self, _args: CreateExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create an execution. Ingress row 6 (RFC-017 §4). Wraps ff_create_execution on Valkey; INSERT INTO ff_execution ... on Postgres. The idempotency_key + backend-side default dedup_ttl_ms = 86400000 make duplicate submissions idempotent.

Source

fn create_flow<'life0, 'async_trait>( &'life0 self, _args: CreateFlowArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create a flow header. Ingress row 5.

Source

fn add_execution_to_flow<'life0, 'async_trait>( &'life0 self, _args: AddExecutionToFlowArgs, ) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Atomically add an execution to a flow (single-FCALL co-located commit on Valkey; single-transaction UPSERT on Postgres).

Source

fn stage_dependency_edge<'life0, 'async_trait>( &'life0 self, _args: StageDependencyEdgeArgs, ) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Stage a dependency edge between flow members. CAS-guarded on graph_revision — stale rev returns Contention(StaleGraphRevision).

Source

fn apply_dependency_to_child<'life0, 'async_trait>( &'life0 self, _args: ApplyDependencyToChildArgs, ) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Apply a staged dependency edge to its downstream child.

Source

fn resolve_dependency<'life0, 'async_trait>( &'life0 self, _args: ResolveDependencyArgs, ) -> Pin<Box<dyn Future<Output = Result<ResolveDependencyOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Resolve one dependency edge after its upstream reached a terminal outcome — satisfy on “success”, mark impossible otherwise. Idempotent (AlreadyResolved on replay).

PR-7b Step 0 overlap-resolver: lifted here so cluster 2 (scanner/dependency_reconciler) could trait-route through Arc<dyn EngineBackend> without a merge conflict with cluster 4. Cluster 4 (completion_listener::spawn_dispatch_loop) ultimately routed through the coarser Self::cascade_completion (per-payload) instead of looping over this per-edge method, because the Postgres cascade is outbox-driven rather than per-edge — see cascade_completion rustdoc “Timing semantics” for details.

§Backend status
  • Valkey: wraps ff_resolve_dependency (RFC-016 Stage C signature). Atomic single-slot FCALL.
  • Postgres: Unavailable. PG’s post-completion cascade is not per-edge; it runs via ff_backend_postgres::dispatch::dispatch_completion(event_id) keyed on the ff_completion_event outbox row. The Valkey- shaped per-edge resolve does not map cleanly to that model; PG’s dependency_reconciler already calls dispatch_completion directly. The engine’s PR-7b/final integration test expects Unsupported logs from Valkey-shaped scanners on a PG deployment — this surface honours that contract.
  • SQLite: Unavailable for the same reason (mirrors PG).

The default impl returns EngineError::Unavailable so a backend that has not been migrated surfaces a typed Unsupported-grade error rather than a panic.

Source

fn cascade_completion<'life0, 'life1, 'async_trait>( &'life0 self, _payload: &'life1 CompletionPayload, ) -> Pin<Box<dyn Future<Output = Result<CascadeOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Cascade a terminal-execution completion into its downstream edges. Consumed by ff-engine::completion_listener::spawn_dispatch_loop (PR-7b Cluster 4) to trait-route the post-completion DAG-promotion path through Arc<dyn EngineBackend>.

Distinct from Self::resolve_dependency: that method is per-edge (one ff_resolve_dependency FCALL); this method is per-completion and orchestrates the full outgoing-edge walk plus child_skipped recursion (Valkey) or outbox-event dispatch (Postgres).

§Timing semantics

Backends diverge on when the caller observes cascade work. See CascadeOutcome for the full contract; the short form:

  • Valkey: synchronous. FCALL-driven walk completes inline; child_skipped descendants are recursively cascaded up to the internal MAX_CASCADE_DEPTH cap before return.
  • Postgres: asynchronous via the ff_completion_event outbox. The call resolves payload to its event_id, runs ff_backend_postgres::dispatch::dispatch_completion, and returns when the outbox row has been claimed + its direct hops advanced. Further-descendant cascades ride their own outbox events (emitted by the per-hop tx) — NOT this call.

Consumers that depend on synchronous cascade must either target Valkey explicitly or observe PG’s dispatched_at_ms clearance via the dependency_reconciler partial index to verify drain.

The default impl returns EngineError::Unavailable so backends that have not been migrated surface a typed error rather than a panic.

Source

fn cancel_execution<'life0, 'async_trait>( &'life0 self, _args: CancelExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Operator-initiated execution cancel (row 2).

Source

fn change_priority<'life0, 'async_trait>( &'life0 self, _args: ChangePriorityArgs, ) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Re-score an execution’s eligibility priority (row 17).

Source

fn replay_execution<'life0, 'async_trait>( &'life0 self, _args: ReplayExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Replay a terminal execution (row 22). Variadic KEYS handling (inbound-edge pre-read) is hidden inside the Valkey impl per RFC-017 §4 row 3.

Source

fn revoke_lease<'life0, 'async_trait>( &'life0 self, _args: RevokeLeaseArgs, ) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Operator-initiated lease revoke (row 19).

Source

fn complete_execution<'life0, 'async_trait>( &'life0 self, _args: CompleteExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<CompleteExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Service-layer complete_execution — peer of Self::complete that takes a fence triple instead of a worker Handle. See the group preamble above for cairn-migration context.

Source

fn fail_execution<'life0, 'async_trait>( &'life0 self, _args: FailExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<FailExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Service-layer fail_execution — peer of Self::fail that takes a fence triple instead of a worker Handle.

Source

fn renew_lease<'life0, 'async_trait>( &'life0 self, _args: RenewLeaseArgs, ) -> Pin<Box<dyn Future<Output = Result<RenewLeaseResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Service-layer renew_lease — peer of Self::renew that takes a fence triple instead of a worker Handle.

Source

fn resume_execution<'life0, 'async_trait>( &'life0 self, _args: ResumeExecutionArgs, ) -> Pin<Box<dyn Future<Output = Result<ResumeExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Service-layer resume_execution — transitions a suspended execution back to runnable. Distinct from Self::claim_from_resume_grant (which mints a worker handle against an already-eligible resumed execution): this method is the lifecycle transition primitive the control plane calls when an operator / auto-resume policy moves a suspended execution forward.

The Valkey impl pre-reads current_waitpoint_id + lane_id from exec_core so callers only need the execution id + the trigger type — same ergonomics as revoke_lease reading current_worker_instance_id when callers omit it.

Source

fn check_admission<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _quota_policy_id: &'life1 QuotaPolicyId, _dimension: &'life2 str, _args: CheckAdmissionArgs, ) -> Pin<Box<dyn Future<Output = Result<CheckAdmissionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Service-layer check_admission_and_record — atomic admission check against a quota policy. Callers supply the policy id + dimension (quota keys live on their own {q:<policy>} partition that cannot be derived from execution_id, so these travel outside CheckAdmissionArgs). dimension defaults to "default" inside the Valkey body when the caller passes an empty string — matches cairn’s pre-migration default.

Source

fn block_execution_for_admission<'life0, 'async_trait>( &'life0 self, _args: BlockExecutionForAdmissionArgs, ) -> Pin<Box<dyn Future<Output = Result<BlockExecutionForAdmissionOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Generalised admission block — covers budget / quota / capability denial paths. BlockingReason selects both the eligibility state written to exec_core and the target blocked_<reason> lane index. Valkey wraps the existing ff_block_execution_for_admission FCALL (KEYS=3); PG/SQLite write the equivalent row transition.

Companion to Self::block_routeblock_route stays as the capability-mismatch shorthand; this method is the primitive the scheduler reaches for when the reason is known at the call site (budget denial, quota denial, etc.). FF #511 Phase 2b.

Source

fn read_budget_usage_and_limits<'life0, 'life1, 'async_trait>( &'life0 self, _budget_id: &'life1 BudgetId, ) -> Pin<Box<dyn Future<Output = Result<BudgetUsageAndLimits, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

FF #511 Phase 3 — typed snapshot of a budget’s usage + limits hashes. Replaces the scheduler’s Valkey-shaped HGETALL/HGET pattern on ff:budget:{K}:{id}:limits + ff:budget:{K}:{id}:usage. Returns BudgetUsageAndLimits::empty when the limits hash is absent (“no limits configured” — not an error).

Source

fn read_quota_policy_limits<'life0, 'life1, 'async_trait>( &'life0 self, _quota_policy_id: &'life1 QuotaPolicyId, ) -> Pin<Box<dyn Future<Output = Result<Option<QuotaPolicyLimits>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read the admission-relevant fields of a quota policy (rate limit, window, concurrency cap, jitter). Replaces the Valkey-shaped 4-HGET pattern on ff:quota:{K}:def that ff_scheduler used pre-FF #511. Returns None when the policy row is absent; absence is a well-defined “no admission configured” signal, not an error.

Source

fn evaluate_flow_eligibility<'life0, 'async_trait>( &'life0 self, _args: EvaluateFlowEligibilityArgs, ) -> Pin<Box<dyn Future<Output = Result<EvaluateFlowEligibilityResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Service-layer evaluate_flow_eligibility — read-only check that returns the execution’s current eligibility state (eligible, blocked_by_dependencies, or a backend-specific status string). Called by cairn’s dependency-resolution path to decide whether a downstream execution can proceed.

Source

fn record_spend<'life0, 'async_trait>( &'life0 self, _args: RecordSpendArgs, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Per-execution budget spend with tenant-open dimensions.

Cairn #454 Q1/Q2: takes an open-set BTreeMap<String, u64> of deltas (distinct from the fixed-shape Self::report_usage/Self::report_usage_admin which use [UsageDimensions]). Return shape reuses ReportUsageResult — same Ok / SoftBreach / HardBreach / AlreadyApplied variants cairn’s UI already branches on.

The default impl returns EngineError::Unavailable so the trait remains additive for backends that have not landed the #454 body yet.

Source

fn release_budget<'life0, 'async_trait>( &'life0 self, _args: ReleaseBudgetArgs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Per-execution budget attribution release.

Called on execution termination to reverse this execution’s contribution to a budget counter. Per cairn #454 clarification this is per-execution, not a whole-budget flush — the budget persists across executions.

The default impl returns EngineError::Unavailable.

Source

fn release_admission<'life0, 'async_trait>( &'life0 self, _args: ReleaseAdmissionArgs, ) -> Pin<Box<dyn Future<Output = Result<ReleaseAdmissionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Release a quota-admission slot that was recorded via Self::check_admission / ff_check_admission_and_record but for which issue_claim_grant subsequently failed. Idempotent: releasing an already-released slot is a no-op.

Valkey wraps the existing ff_release_admission FCALL (DEL guard + SREM admitted_set + DECR-if-positive concurrency counter). PG/SQLite write to their admission-tracking rows.

Used by ff_scheduler::Scheduler on the claim-fail rollback path (FF #511). The default impl returns EngineError::Unavailable.

Source

fn deliver_approval_signal<'life0, 'async_trait>( &'life0 self, _args: DeliverApprovalSignalArgs, ) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Operator-driven approval-signal delivery.

Pre-shaped variant of Self::deliver_signal where the caller does not carry the waitpoint token. The backend reads the token from ff_waitpoint_pending, HMAC-verifies server-side, and dispatches. Operator API never sees the token bytes.

Cairn #454 Q3 — signal_name is a flat string (conventional values "approved" / "rejected"); audit metadata lives in cairn’s audit log, not on the FF surface.

The default impl returns EngineError::Unavailable.

Source

fn issue_grant_and_claim<'life0, 'async_trait>( &'life0 self, _args: IssueGrantAndClaimArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Backend-atomic issue_claim_grant + claim_execution.

Cairn #454 Q4 — composing the two primitives caller-side risks leaking a grant if claim_execution fails after issue_claim_grant succeeded. This method’s contract is that the composition is backend-atomic: Valkey fuses them in one FCALL; PG/SQLite fuse them in one tx.

The default impl must not be a chained call — it returns EngineError::Unavailable so consumers cannot accidentally use a non-atomic fallback.

Source

fn create_budget<'life0, 'async_trait>( &'life0 self, _args: CreateBudgetArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create a budget definition (row 6).

Source

fn reset_budget<'life0, 'async_trait>( &'life0 self, _args: ResetBudgetArgs, ) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Reset a budget’s usage counters (row 10).

Source

fn create_quota_policy<'life0, 'async_trait>( &'life0 self, _args: CreateQuotaPolicyArgs, ) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Create a quota policy (row 7).

Source

fn get_budget_status<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 BudgetId, ) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Read-only budget status for operator visibility (row 8).

Source

fn report_usage_admin<'life0, 'life1, 'async_trait>( &'life0 self, _budget: &'life1 BudgetId, _args: ReportUsageAdminArgs, ) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Admin-path report_usage (row 9 + RFC-017 §5 round-1 F4). Distinct from the existing Self::report_usage which takes a worker handle — the admin path has no lease context.

Source

fn get_execution_result<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Fetch the stored result payload for a completed execution (row 4). Returns Ok(None) when the execution is missing, not yet complete, or its payload was trimmed by retention policy.

Source

fn list_pending_waitpoints<'life0, 'async_trait>( &'life0 self, _args: ListPendingWaitpointsArgs, ) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

List the pending-or-active waitpoints for an execution, cursor paginated (row 5 / §8). Stage A preserves the existing PendingWaitpointInfo shape; Stage D ships the §8 HMAC sanitisation + (token_kid, token_fingerprint) schema.

Source

fn ping<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Backend-level reachability probe (row 1). Valkey: PING; Postgres: SELECT 1.

Source

fn register_worker<'life0, 'async_trait>( &'life0 self, _args: RegisterWorkerArgs, ) -> Pin<Box<dyn Future<Output = Result<RegisterWorkerOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Register (or idempotently refresh) a worker instance. See RFC-025 §4.

Source

fn heartbeat_worker<'life0, 'async_trait>( &'life0 self, _args: HeartbeatWorkerArgs, ) -> Pin<Box<dyn Future<Output = Result<HeartbeatWorkerOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Refresh the worker-instance liveness TTL.

Source

fn mark_worker_dead<'life0, 'async_trait>( &'life0 self, _args: MarkWorkerDeadArgs, ) -> Pin<Box<dyn Future<Output = Result<MarkWorkerDeadOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Operator-driven worker death (distinct from passive TTL expiry).

Source

fn list_expired_leases<'life0, 'async_trait>( &'life0 self, _args: ListExpiredLeasesArgs, ) -> Pin<Box<dyn Future<Output = Result<ListExpiredLeasesResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enumerate expired leases for reclaim-decision tooling.

Source

fn list_workers<'life0, 'async_trait>( &'life0 self, _args: ListWorkersArgs, ) -> Pin<Box<dyn Future<Output = Result<ListWorkersResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Enumerate live workers (RFC-025 Phase 6, §9.4).

Source

fn claim_for_worker<'life0, 'async_trait>( &'life0 self, _args: ClaimForWorkerArgs, ) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey forwards to its ff_scheduler::Scheduler cursor; Postgres forwards to PostgresScheduler’s FOR UPDATE SKIP LOCKED path.

Backends that carry an embedded scheduler (e.g. ValkeyBackend constructed via with_embedded_scheduler, or PostgresBackend with its with_scanners sibling) route the claim through it. Backends without a wired scheduler return EngineError::Unavailable. HTTP consumers use FlowFabricWorker::claim_via_server instead.

Source

fn backend_label(&self) -> &'static str

Static observability label identifying the backend family in logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl returns "unknown" so legacy impl EngineBackend blocks that have not upgraded keep compiling; every in-tree backend overrides — ValkeyBackend"valkey", PostgresBackend"postgres".

Source

fn as_any(&self) -> &(dyn Any + 'static)

Backend downcast escape hatch (v0.12 PR-7a transitional).

Scanner supervisors in ff-engine still dispatch through a concrete ferriskey::Client; to keep the engine’s public boundary backend-agnostic (Arc<dyn EngineBackend>) while the scanner internals remain Valkey-shaped, the engine downcasts via this method and reaches in for the embedded client. Every backend that wants to be consumed by Engine::start_with_completions overrides this to return self as &dyn Any; the default returns a placeholder so a stray downcast_ref fails cleanly rather than risking unsound behaviour.

v0.13 (PR-7b) will trait-ify individual scanners onto EngineBackend and retire ff-engine’s dependence on this downcast path. The method itself will remain on the trait (likely deprecated) rather than be removed — removing a public trait method is a breaking change for external impl EngineBackend blocks.

Source

fn capabilities(&self) -> Capabilities

RFC-018 Stage A: snapshot of this backend’s identity + the flat Supports surface it can actually service. Consumers use this at startup to gate UI features / choose between alternative code paths before dispatching. See rfcs/RFC-018-backend-capability-discovery.md for the full discovery contract and the four owner-adjudicated open questions (granularity: coarse; version: struct; sync; no event stream).

Default: returns a value tagged family = "unknown" with every supports.* bool false, so pre-RFC-018 out-of-tree backends keep compiling and consumers treat “all false” as “dispatch and catch EngineError::Unavailable” (pre-RFC-018 behaviour). Concrete in-tree backends (ValkeyBackend, PostgresBackend) override to populate a real value.

Sync (no .await): backend-static info should not require a probe on every query. Dynamic probes happen once at connect* time and cache the result.

Source

fn prepare<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Issue #281: run one-time backend-specific boot preparation.

Intended to run ONCE per deployment startup — NOT per request. Idempotent and safe for consumers to call on every application boot; backends that have nothing to do return PrepareOutcome::NoOp without side effects.

Per-backend behaviour:

  • Valkey — issues FUNCTION LOAD REPLACE for the flowfabric Lua library (with bounded retry on transient transport faults; permanent compile errors surface as EngineError::Transport without retry). Returns PrepareOutcome::Applied carrying "FUNCTION LOAD (flowfabric lib v<N>)".
  • Postgres — returns PrepareOutcome::NoOp. Schema migrations are applied out-of-band per rfcs/drafts/v0.7-migration-master.md §Q12; the backend runs a schema-version check at connect time and refuses to start on mismatch, so no boot-side prepare work remains.
  • Default impl — returns PrepareOutcome::NoOp so out-of-tree backends without preparation work compile without boilerplate.
§Relationship to the in-tree boot path

ValkeyBackend::initialize_deployment (called from Server::start_with_metrics) already invokes ensure_library inline as its step 4; that path is unchanged. prepare() exists as a trait-surface entry point so consumers that construct an Arc<dyn EngineBackend> outside of Server (e.g. cairn-fabric’s boot path at cairn-fabric/src/boot.rs) can run the same preparation without reaching into backend-specific modules. The overlap is intentional: calling both prepare() and initialize_deployment is safe because FUNCTION LOAD REPLACE is idempotent under the version check.

§Layer forwarding

Layer impls (HookedBackend, ff-sdk layers) do NOT forward prepare today — consistent with backend_label / ping / shutdown_prepare. Consumers that wrap a backend in layers MUST call prepare() on the raw backend before wrapping, or accept the default PrepareOutcome::NoOp.

Source

fn shutdown_prepare<'life0, 'async_trait>( &'life0 self, _grace: Duration, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Drain-before-shutdown hook (RFC-017 §5.4). The server calls this before draining its own background tasks so backend- scoped primitives (Valkey stream semaphore, Postgres sqlx pool, …) can close their gates and await in-flight work up to grace.

Default impl returns Ok(()) — a no-op backend has nothing backend-scoped to drain. Concrete backends whose data plane owns resources (connection pools, semaphores, listeners) override with a real body.

Source

fn cancel_flow_header<'life0, 'async_trait>( &'life0 self, _args: CancelFlowArgs, ) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

RFC-017 Stage E2: the “header” portion of cancel_flow — run the atomic flow-state flip (Valkey: ff_cancel_flow FCALL; Postgres: cancel_flow_once tx), decode policy + membership, and surface the flow_already_terminal idempotency branch as a first-class [CancelFlowHeader::AlreadyTerminal] so the Server can build the wire CancelFlowResult without reaching for a raw Client. Separate from the existing EngineBackend::cancel_flow entry point (which takes the enum-typed (policy, wait) split and returns the wait-collapsed CancelFlowResult) because the Server owns its own wait-dispatch + member-cancel machinery via EngineBackend::cancel_execution + backlog ack.

Default impl returns EngineError::Unavailable so un-migrated backends surface the miss loudly.

Source

fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_id: &'life1 FlowId, _execution_id: &'life2 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

RFC-017 Stage E2: best-effort acknowledgement that one member of a cancel_all flow has completed its per-member cancel. Drains the member from the flow’s pending_cancels set and, if empty, removes the flow from the partition-level cancel_backlog (Valkey: ff_ack_cancel_member FCALL; Postgres: table write — default Unavailable until Wave 9).

Failures are swallowed by the caller — the cancel-backlog reconciler is the authoritative drain — but a typed error here lets the caller log a backend-scoped context string.

Source

fn read_execution_info<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

RFC-017 Stage E2: full-shape execution read used by the GET /v1/executions/{id} HTTP route. Returns the legacy ExecutionInfo wire shape (not the decoupled ExecutionSnapshot) so the existing HTTP response bytes stay identical across the migration.

Ok(None) ⇒ no such execution. Default Unavailable because the Valkey HGETALL-and-parse is backend-specific.

Source

fn read_execution_state<'life0, 'life1, 'async_trait>( &'life0 self, _id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

RFC-017 Stage E2: narrow public_state read used by the GET /v1/executions/{id}/state HTTP route. Returns Ok(None) when the execution is missing. Default Unavailable.

Source

fn subscribe_lease_history<'life0, 'life1, 'async_trait>( &'life0 self, _cursor: StreamCursor, _filter: &'life1 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Subscribe to lease lifecycle events (acquired / renewed / expired / reclaimed / revoked) for the partition this backend is configured with.

Cross-partition fan-out is consumer-side merge: subscribe per-partition backend instance and interleave on the read side. Yields Err(EngineError::StreamDisconnected { cursor }) on backend disconnect; resume by calling this method again with the returned cursor.

filter (#282): when filter.instance_tag is Some((k, v)), only events whose execution carries tag k = v are yielded (matching the crate::backend::ScannerFilter surface from #122). Pass &ScannerFilter::default() for unfiltered behaviour. Filtering happens inside the backend stream; the crate::stream_events::LeaseHistorySubscription return type is unchanged.

Source

fn subscribe_completion<'life0, 'life1, 'async_trait>( &'life0 self, _cursor: StreamCursor, _filter: &'life1 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Subscribe to completion events (terminal state transitions).

  • Postgres: wraps the ff_completion_event outbox + LISTEN/NOTIFY machinery. Durable via event-id cursor.
  • Valkey: wraps the RESP3 ff:dag:completions pubsub subscriber. Pubsub is at-most-once over the live subscription window; the cursor is always the empty sentinel. If you need at-least-once replay with durable cursor resume, use the Postgres backend (see docs/POSTGRES_PARITY_MATRIX.md row subscribe_completion).

filter (#282): see Self::subscribe_lease_history. Valkey reuses the subscribe_completions_filtered per-event HGET gate; Postgres filters inline against the outbox’s denormalised instance_tag column.

Source

fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>( &'life0 self, _cursor: StreamCursor, _filter: &'life1 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Subscribe to signal-delivery events (satisfied / buffered / deduped).

filter (#282): see Self::subscribe_lease_history.

Source

fn subscribe_instance_tags<'life0, 'async_trait>( &'life0 self, _cursor: StreamCursor, ) -> Pin<Box<dyn Future<Output = Result<InstanceTagSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to instance-tag events (tag attached / cleared).

Producer wiring is deferred per #311 audit (“no concrete demand”); the trait method exists for API uniformity across the four families. Backends currently return EngineError::Unavailable.

Source

fn mark_lease_expired_if_due<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Lease-expiry scanner hook — mark an expired lease as reclaimable so another worker can redeem the execution. Atomic per call.

Valkey: FCALL ff_mark_lease_expired_if_due. Postgres: single-row tx on ff_attempt + ff_exec_core (see ff_backend_postgres::reconcilers::lease_expiry).

Source

fn promote_delayed<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _lane: &'life1 LaneId, _execution_id: &'life2 ExecutionId, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Delayed-promoter scanner hook — promote a delayed execution to eligible_now once its delay_until has passed.

Valkey: FCALL ff_promote_delayed. Postgres: Wave 9 schema scope (no current impl).

Source

fn close_waitpoint<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _waitpoint_id: &'life2 str, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Pending-waitpoint-expiry scanner hook — close a pending waitpoint whose deadline has passed (wake the suspended execution with a timeout signal).

Valkey: FCALL ff_close_waitpoint. Postgres: Wave 9 schema scope (no current impl).

Source

fn expire_execution<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _phase: ExpirePhase, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Shared hook for the attempt-timeout and execution-deadline scanners — terminate or retry an execution whose wall-clock budget has elapsed. phase discriminates which of the two scanner paths is calling so the backend can preserve diagnostic breadcrumbs without forking the surface.

Valkey: FCALL ff_expire_execution (with phase passed through as an ARGV discriminator). Postgres: single-row tx mirror of the Lua semantic for AttemptTimeout; ExecutionDeadline is Wave 9 schema scope.

Source

fn expire_suspension<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Suspension-timeout scanner hook — expire a suspended execution whose suspension deadline has passed (wake with timeout).

Valkey: FCALL ff_expire_suspension. Postgres: single-row tx on ff_suspend + ff_exec_core.

Source

fn unblock_execution<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _partition: Partition, _lane_id: &'life1 LaneId, _execution_id: &'life2 ExecutionId, _expected_blocking_reason: &'life3 str, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Unblock-scanner hook — move an execution from a blocked ZSET back to eligible_now once its blocking condition has cleared (budget under limit, quota window drained, or a capable worker has come online). expected_blocking_reason discriminates which of the blocked:{budget,quota,route} sets the execution is leaving and also fences against a stale unblock (Lua rejects if the core’s blocking_reason no longer matches).

§Backend status
  • Valkey: wraps ff_unblock_execution (RFC-010 §6). Atomic single-slot FCALL on {p:N}.
  • Postgres: Unavailable (structural). PG does not persist a per-reason blocked:{budget,quota,route} index — scheduler eligibility is re-evaluated live via SQL predicates on ff_exec_core + budget / quota tables (see ff_backend_postgres::scheduler). Nothing to reconcile, so the engine’s PG scanner loop does not run this path; callers on a PG deployment receive the typed Unavailable per PR-7b/final contract.
  • SQLite: Unavailable for the same reason as Postgres.
Source

fn drain_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_partition: Partition, _flow_id: &'life1 FlowId, _downstream_eid: &'life2 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

RFC-016 Stage C — sibling-cancel group drain.

After edge_cancel_dispatcher has issued EngineBackend::cancel_execution against every listed sibling of a (flow_id, downstream_eid) group, this trait method atomically removes the tuple from the partition-local pending-cancel-groups index and clears the per-group flag + members state.

Valkey: FCALL ff_drain_sibling_cancel_group (SREM + HDEL in one Lua unit). Postgres: Unavailable — PG’s Wave-6b reconcilers::edge_cancel_dispatcher::dispatcher_tick owns the equivalent row drain inside its own per-group transaction; the Valkey-shaped per-tuple call is not used on the PG scanner path. SQLite: Unavailable (mirrors PG; RFC-023 scope).

Source

fn reconcile_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _flow_partition: Partition, _flow_id: &'life1 FlowId, _downstream_eid: &'life2 ExecutionId, ) -> Pin<Box<dyn Future<Output = Result<SiblingCancelReconcileAction, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

RFC-016 Stage D — sibling-cancel group reconcile (Invariant Q6 safety net).

Re-examines a (flow_id, downstream_eid) tuple in the partition-local pending-cancel-groups index and returns one of three atomic dispositions — see SiblingCancelReconcileAction — so the crash-recovery reconciler can drain stale or completed tuples without fighting the Stage-C dispatcher.

Valkey: FCALL ff_reconcile_sibling_cancel_group. Postgres: Unavailable — PG’s reconcilers::edge_cancel_reconciler::reconciler_tick owns the row-level reconcile inside its own batched tx. SQLite: Unavailable (mirrors PG).

Source

fn reconcile_execution_index<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _lanes: &'life1 [LaneId], _filter: &'life2 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Index-reconciler pass — walks ff:idx:{p:N}:all_executions and verifies each execution appears in the correct scheduling sorted set (eligible / delayed / blocked:* / active / suspended / terminal) for its current (lifecycle_phase, eligibility_state, ownership_state) triple. Phase 1 is log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).

Source

fn reconcile_budget_counters<'life0, 'async_trait>( &'life0 self, _partition: Partition, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Budget-reconciler pass — walks ff:budget:{b:M}:policies_idx, reads each budget’s definition / usage / limits, and reconciles the breached_at marker against hard limits. Resetting budgets (non-zero reset_interval_ms) are skipped — they are handled by the budget_reset scanner (cluster 2). Drops index entries for budgets whose definition hash has been deleted (retention purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.

Source

fn reconcile_quota_counters<'life0, 'async_trait>( &'life0 self, _partition: Partition, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Quota-reconciler pass — walks ff:quota:{q:M}:policies_idx, trims expired entries from rate-limit sliding windows, and recomputes each policy’s concurrency counter by walking its admitted_set and pruning entries whose admission guard key has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.

Source

fn project_flow_summary<'life0, 'life1, 'async_trait>( &'life0 self, _partition: Partition, _flow_id: &'life1 FlowId, _now_ms: TimestampMs, ) -> Pin<Box<dyn Future<Output = Result<bool, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Flow-projector scanner hook — sample flow members, derive an aggregate public_flow_state + per-state counts, and write them to the flow summary projection. Returns Ok(true) when the summary was updated, Ok(false) when the flow had no members or the index entry was defensively pruned (core missing).

The derived public_flow_state written here is distinct from ff_flow_core.public_flow_state — the former is a rollup dashboard field, the latter is the authoritative mutation-guard state owned by create_flow / cancel_flow. See ff_engine::scanner::flow_projector module doc for the two-sources contract.

§Backend status
  • Valkey: lifts the pre-PR-7b Rust-composed SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern. No new Lua function; the aggregation is inherently multi-round-trip (cross-partition member reads) and atomicity is neither required nor achievable against 256 partitions.
  • Postgres: SELECT aggregates from ff_exec_core grouped by public_state for the flow’s members, INSERT … ON CONFLICT DO UPDATE into ff_flow_summary (migration 0019). One query per flow; partition-local aggregation.
  • SQLite: Unavailable per RFC-023 Phase 3.5 (flow summary projection is a shared-deployment dashboard feature; local single-tenant SQLite deployments don’t need it).
Source

fn trim_retention<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, _partition: Partition, _lane_id: &'life1 LaneId, _retention_ms: u64, _now_ms: TimestampMs, _batch_size: u32, _filter: &'life2 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<u32, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Retention-trimmer scanner hook — delete terminal executions and all their subordinate keys/rows once they are older than the configured retention window. Returns the number of executions actually purged in this call (so the scanner can loop when it hits batch_size).

Retention trimming is inherently a scan-and-delete loop over time; the trait surface exists to remove engine-side Valkey coupling, not to atomise the operation into a single round-trip. Implementations may issue multiple round-trips (e.g. per-execution cascade across sibling tables); the trait contract is “make progress, bounded by batch_size”.

§Backend status
  • Valkey: lifts the pre-PR-7b ZRANGEBYSCORE + per-exec cascade-delete loop verbatim. Cluster-safe (all keys for one execution live on the same {p:N} slot).
  • Postgres: DELETE FROM ff_exec_core for terminal rows past the cutoff plus explicit cascade DELETEs on every execution-scoped sibling table (no FK CASCADE in the schema). Single transaction per batch.
  • SQLite: Unavailable per RFC-023 Phase 3.5. Single-tenant local deployments typically manage their own DB lifecycle and do not need a retention scanner.
Source

fn read_exec_core_fields<'life0, 'life1, 'life2, 'life3, 'async_trait>( &'life0 self, _partition: Partition, _execution_id: &'life1 ExecutionId, _fields: &'life2 [&'life3 str], ) -> Pin<Box<dyn Future<Output = Result<HashMap<String, Option<String>>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait,

Point-read N fields from the exec_core hash for a given execution. Returns a map of field-name → Option (None for fields absent or stored as NULL). Scanner call sites formerly issuing raw HGET/HMGET on ExecKeyContext::core() route through this trait method (cairn #436 / PR-7b Wave 0a).

Field values are coerced to String at the trait boundary for wire compatibility with the Valkey HGET shape. Consumers parse specific fields (lane_id, current_attempt_index, etc.) from the returned strings as needed.

  • Valkey: HMGET exec_core_key f1 f2 ..., zipping names to values.
  • Postgres: SELECT against ff_exec_core with dynamic column extraction; fields in raw_fields JSONB are extracted via ->>.
  • SQLite: equivalent with json_extract(raw_fields, '$.field').

Default body returns Unavailable so non-v0.13 backends remain compile-compatible.

Source

fn server_time_ms<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<u64, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Returns the backend’s current wall-clock epoch milliseconds.

Used by 15 scanners to compute “due” thresholds before issuing per-partition due-scans. Previously a Valkey-only helper in ff-engine’s scanner::lease_expiry issuing TIME; this trait method is the backend-agnostic replacement (cairn #436 / PR-7b Wave 0a).

Every in-tree backend overrides. The default falls back to SystemTime::now() so out-of-tree EngineBackend impls (e.g. cairn mocks, test doubles) stay source-compatible across v0.12 → v0.13.

  • Valkey: TIME command.
  • Postgres: SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint (not now()now() is the transaction start timestamp, which is stale under any long-running tx; scanners need the true wall-clock read).
  • SQLite: SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER).

Implementors§