pub trait EngineBackend:
Send
+ Sync
+ 'static {
Show 106 methods
// Required methods
fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn claim_from_resume_grant<'life0, 'async_trait>(
&'life0 self,
token: ResumeToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn read_execution_context<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait;
fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait;
// Provided methods
fn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn read_waitpoint_token<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_waitpoint_id: &'life1 WaitpointId,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn issue_reclaim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueReclaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn reclaim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReclaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn read_current_attempt_index<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn read_total_attempt_count<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn set_execution_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_key: &'life2 str,
_value: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait { ... }
fn set_flow_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_key: &'life2 str,
_value: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait { ... }
fn get_execution_tag<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_key: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn get_execution_namespace<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn get_flow_tag<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_key: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
_eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn list_flows<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn list_lanes<'life0, 'async_trait>(
&'life0 self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn list_suspended<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn list_executions<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn scan_eligible_executions<'life0, 'async_trait>(
&'life0 self,
_args: ScanEligibleArgs,
) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn issue_claim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueClaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn block_route<'life0, 'async_trait>(
&'life0 self,
_args: BlockRouteArgs,
) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn claim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_downstream_execution_id: &'life2 ExecutionId,
_policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn resolve_dependency<'life0, 'async_trait>(
&'life0 self,
_args: ResolveDependencyArgs,
) -> Pin<Box<dyn Future<Output = Result<ResolveDependencyOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn cascade_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_payload: &'life1 CompletionPayload,
) -> Pin<Box<dyn Future<Output = Result<CascadeOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn complete_execution<'life0, 'async_trait>(
&'life0 self,
_args: CompleteExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CompleteExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn fail_execution<'life0, 'async_trait>(
&'life0 self,
_args: FailExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<FailExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn renew_lease<'life0, 'async_trait>(
&'life0 self,
_args: RenewLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RenewLeaseResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn resume_execution<'life0, 'async_trait>(
&'life0 self,
_args: ResumeExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ResumeExecutionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn check_admission<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_quota_policy_id: &'life1 QuotaPolicyId,
_dimension: &'life2 str,
_args: CheckAdmissionArgs,
) -> Pin<Box<dyn Future<Output = Result<CheckAdmissionResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn evaluate_flow_eligibility<'life0, 'async_trait>(
&'life0 self,
_args: EvaluateFlowEligibilityArgs,
) -> Pin<Box<dyn Future<Output = Result<EvaluateFlowEligibilityResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn record_spend<'life0, 'async_trait>(
&'life0 self,
_args: RecordSpendArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn release_budget<'life0, 'async_trait>(
&'life0 self,
_args: ReleaseBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn deliver_approval_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverApprovalSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn issue_grant_and_claim<'life0, 'async_trait>(
&'life0 self,
_args: IssueGrantAndClaimArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimGrantOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn backend_label(&self) -> &'static str { ... }
fn as_any(&self) -> &(dyn Any + 'static) { ... }
fn capabilities(&self) -> Capabilities { ... }
fn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn subscribe_instance_tags<'life0, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
) -> Pin<Box<dyn Future<Output = Result<InstanceTagSubscription, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn mark_lease_expired_if_due<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn promote_delayed<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane: &'life1 LaneId,
_execution_id: &'life2 ExecutionId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn close_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_waitpoint_id: &'life2 str,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn expire_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_phase: ExpirePhase,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn expire_suspension<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn unblock_execution<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane_id: &'life1 LaneId,
_execution_id: &'life2 ExecutionId,
_expected_blocking_reason: &'life3 str,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait { ... }
fn drain_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_partition: Partition,
_flow_id: &'life1 FlowId,
_downstream_eid: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn reconcile_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_partition: Partition,
_flow_id: &'life1 FlowId,
_downstream_eid: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<SiblingCancelReconcileAction, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn reconcile_execution_index<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lanes: &'life1 [LaneId],
_filter: &'life2 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn reconcile_budget_counters<'life0, 'async_trait>(
&'life0 self,
_partition: Partition,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn reconcile_quota_counters<'life0, 'async_trait>(
&'life0 self,
_partition: Partition,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
fn project_flow_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_flow_id: &'life1 FlowId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<bool, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
fn trim_retention<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane_id: &'life1 LaneId,
_retention_ms: u64,
_now_ms: TimestampMs,
_batch_size: u32,
_filter: &'life2 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<u32, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait { ... }
fn read_exec_core_fields<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_fields: &'life2 [&'life3 str],
) -> Pin<Box<dyn Future<Output = Result<HashMap<String, Option<String>>, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait { ... }
fn server_time_ms<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<u64, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait { ... }
}Expand description
The engine write surface — a single trait a backend implementation
honours to serve a FlowFabricWorker.
See RFC-012 §3.1 for the inventory rationale and §3.3 for the
type-level shape. 16 methods (Round-7 added create_waitpoint;
append_frame return widened; report_usage return replaced —
RFC-012 §R7). Issue #150 added the two trigger-surface methods
(deliver_signal / claim_resumed_execution).
§Note on complete payload shape
The RFC §3.3 sketch uses Option<Bytes>; the Stage 1a trait uses
Option<Vec<u8>> to match the existing
ff_sdk::ClaimedTask::complete signature and avoid adding a
bytes public-type dep for zero consumer benefit. Round-4 §7.17
resolved the payload container debate to Box<[u8]> in the
public type (see HandleOpaque); Option<Vec<u8>> is the
zero-churn choice consistent with today’s code. Consumers that
need &[u8] can borrow via .as_deref() on the Option.
Required Methods§
Sourcefn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Fresh-work claim. Returns Ok(None) when no work is currently
available; Err only on transport or input-validation faults.
Sourcefn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Renew a held lease. Returns the updated expiry + epoch on
success; typed State::StaleLease / State::LeaseExpired
when the lease has been stolen or timed out.
Sourcefn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Numeric-progress heartbeat.
Writes scalar progress_percent / progress_message fields on
exec_core; each call overwrites the previous value. This does
NOT append to the output stream — stream-frame producers must use
append_frame instead.
Sourcefn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Append one stream frame. Distinct from progress
per RFC-012 §3.1.1 K#6. Returns the backend-assigned stream entry
id and post-append frame count (RFC-012 §R7.2.1).
Stream-frame producers (arbitrary frame_type + payload, consumed
via the read/tail surfaces) MUST use this method rather than
progress; the latter updates scalar fields on
exec_core and is invisible to stream consumers.
Sourcefn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Terminal success. Borrows handle (round-4 M-D2) so callers
can retry under EngineError::Transport without losing the
cookie. Payload is Option<Vec<u8>> per the note above.
Sourcefn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Terminal failure with classification. Returns FailOutcome
so the caller learns whether a retry was scheduled.
Sourcefn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Cooperative cancel by the worker holding the lease.
Sourcefn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Suspend the execution awaiting a typed resume condition (RFC-013 Stage 1d).
Borrows handle (round-4 M-D2). Terminal-looking behaviour is
expressed through SuspendOutcome:
SuspendOutcome::Suspended— the pre-suspend handle is logically invalidated; the freshHandleKind::Suspendedhandle inside the variant supersedes it. Runtime enforcement via the fence triple: subsequent ops against the stale handle surface asContention(LeaseConflict).SuspendOutcome::AlreadySatisfied— buffered signals on a pending waitpoint already matched the resume condition at suspension time. The lease is NOT released; the caller’s pre-suspend handle remains valid.
See RFC-013 §2 for the type shapes, §3 for the replay / idempotency contract, §4 for the error taxonomy.
Sourcefn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
waitpoint_key: &'life2 str,
expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Issue a pending waitpoint for future signal delivery.
Waitpoints have two states in the Valkey wire contract:
pending (token issued, not yet backing a suspension) and
active (bound to a suspension). This method creates a
waitpoint in the pending state. A later suspend call
transitions a pending waitpoint to active (see Lua
use_pending_waitpoint ARGV flag at
flowfabric.lua:3603,3641,3690) — or, if buffered signals
already satisfy its condition, the suspend call returns
SuspendOutcome::AlreadySatisfied and the waitpoint activates
without ever releasing the lease.
Pending-waitpoint expiry is a first-class terminal error on
the wire (PendingWaitpointExpired at
ff-script/src/error.rs:170,403-408). The attempt retains its
lease while the waitpoint is pending; signals delivered to
this waitpoint are buffered server-side (RFC-012 §R7.2.2).
Sourcefn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Non-mutating observation of signals that satisfied the handle’s resume condition.
Sourcefn claim_from_resume_grant<'life0, 'async_trait>(
&'life0 self,
token: ResumeToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_from_resume_grant<'life0, 'async_trait>(
&'life0 self,
token: ResumeToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consume a resume grant (via ResumeToken) to mint a
resumed-kind handle. Routes to ff_claim_resumed_execution on
Valkey / the epoch-bump reconciler on PG/SQLite. Returns
Ok(None) when the grant’s target execution is no longer
resumable (already reclaimed, terminal, etc.).
Renamed from claim_from_reclaim (RFC-024 PR-B+C). The
pre-rename name advertised “reclaim” but the semantic has
always been resume-after-suspend. The new lease-reclaim path
lives on Self::reclaim_execution.
Sourcefn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Park the execution until delay_until, releasing the lease.
Sourcefn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Mark the execution as waiting for its child flow to complete, releasing the lease.
Sourcefn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Snapshot an execution by id. Ok(None) ⇒ no such execution.
Sourcefn read_execution_context<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_context<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<ExecutionContext, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Point-read of the execution-scoped (input_payload, execution_kind, tags) bundle used by the SDK worker when
assembling a ClaimedTask (see ff_sdk::ClaimedTask) after a
successful claim.
No default impl — every EngineBackend must answer this
explicitly. Distinct from Self::describe_execution
(read-model projection) because the SDK needs the raw payload
bytes + kind + tags immediately post-claim, and the snapshot
projection deliberately omits the payload bytes.
Per-backend shape:
- Valkey — pipelined
GET :payload+HGETALL :coreHGETALL :tagson the execution’s partition (same pattern asSelf::describe_execution).
- Postgres — single
SELECT payload, raw_fieldsonff_exec_corekeyed by(partition_key, execution_id);execution_kind+tagslive inraw_fieldsJSONB. - SQLite — identical shape to Postgres.
Returns EngineError::Validation { kind: ValidationKind::InvalidInput, .. }
when the execution does not exist — the SDK worker only calls
this after a successful claim, so a missing row is a loud
storage-tier invariant violation rather than a routine Ok(None).
Sourcefn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Snapshot a flow by id. Ok(None) ⇒ no such flow.
Sourcefn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Operator-initiated cancellation of a flow and (optionally) its member executions. See RFC-012 §3.1.1 for the policy /wait matrix.
Sourcefn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Report usage against a budget and check limits. Returns the
typed ReportUsageResult variant; backends enforce
idempotency via the caller-supplied
[UsageDimensions::dedup_key] (RFC-012 §R7.2.3 — replaces
the pre-Round-7 AdmissionDecision return).
Provided Methods§
Sourcefn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn suspend_by_triple<'life0, 'async_trait>(
&'life0 self,
exec_id: ExecutionId,
triple: LeaseFence,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Suspend by execution id + lease fence triple, for service-layer
callers that hold a run record / lease-claim descriptor but no
worker Handle (cairn issue #322).
Semantics mirror Self::suspend exactly — the same
SuspendArgs validation, the same SuspendOutcome
lifecycle, the same RFC-013 §3 dedup / replay contract. The
only difference is the fencing source: instead of the
(lease_id, lease_epoch, attempt_id) fields embedded in a
Handle, the backend fences against the triple passed directly.
Attempt-index, lane, and worker-instance metadata that
Self::suspend reads from the handle payload are recovered
from the backend’s authoritative execution record (Valkey:
exec_core HGETs; Postgres: ff_attempt row lookup).
The default impl returns EngineError::Unavailable so
existing backend impls remain non-breaking. Production backends
(Valkey, Postgres) override.
Sourcefn read_waitpoint_token<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_waitpoint_id: &'life1 WaitpointId,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_waitpoint_token<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_waitpoint_id: &'life1 WaitpointId,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read the HMAC token stored on a waitpoint record, keyed by
(partition, waitpoint_id).
Returns Ok(Some(token)) when the waitpoint exists and carries
a token, Ok(None) when the waitpoint does not exist or no
token field is present. A missing waitpoint is not an error —
signals can legitimately arrive after a waitpoint has been
consumed or expired, and the signal-bridge authenticates on the
presence of a matching token, not on the waitpoint’s liveness.
§Use case
Control-plane signal delivery (cairn signal-bridge): at
signal-resume time the bridge reads the token off the
waitpoint hash / row to authenticate the resume request it
subsequently issues. Previously implemented as direct
ferriskey::Client::hget(waitpoint_key, "waitpoint_token") —
Valkey-only. This method routes the read through the trait so
the pattern works on Postgres + SQLite as well.
§Per-backend shape
- Valkey —
HGET ff:wp:<tag>:<waitpoint_id> waitpoint_tokenon the waitpoint’s partition. Empty string / missing field maps toNone. - Postgres —
SELECT token FROM ff_waitpoint_pending WHERE partition_key = $1 AND waitpoint_id = $2 LIMIT 1. Row-absent →None; emptytoken→None. - SQLite — same shape as Postgres.
The partition argument is the opaque PartitionKey
produced by FlowFabric (typically extracted from the
Handle / ResumeToken the waitpoint was minted against).
§Default impl
Returns EngineError::Unavailable with
op = "read_waitpoint_token" so out-of-tree backends and
in-tree backends not yet overriding this method continue to
compile. Mirrors the precedent used by
Self::issue_reclaim_grant / Self::reclaim_execution.
Sourcefn issue_reclaim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueReclaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn issue_reclaim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueReclaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueReclaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Issue a lease-reclaim grant (RFC-024 §3.2). Admits executions
in lease_expired_reclaimable or lease_revoked state to the
reclaim path; the returned IssueReclaimGrantOutcome::Granted
carries a crate::contracts::ReclaimGrant which is then fed
to Self::reclaim_execution to mint a fresh attempt.
Default impl returns EngineError::Unavailable — PR-D (PG),
PR-E (SQLite), and PR-F (Valkey) override with real bodies.
Sourcefn reclaim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReclaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reclaim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReclaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReclaimExecutionOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consume a crate::contracts::ReclaimGrant to mint a fresh
attempt for a previously lease-expired / lease-revoked
execution (RFC-024 §3.2). Creates a new attempt row, bumps the
execution’s lease_reclaim_count, and mints a
crate::backend::HandleKind::Reclaimed handle.
Default impl returns EngineError::Unavailable — PR-D (PG),
PR-E (SQLite), and PR-F (Valkey) override with real bodies.
Sourcefn read_current_attempt_index<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_current_attempt_index<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Point-read of the execution’s current attempt-index pointer — the index of the currently-leased attempt row.
Distinct from Self::read_total_attempt_count: this method
names the attempt that already exists (pointer), whereas
read_total_attempt_count is the monotonic claim counter used
to compute the next fresh attempt index. See the sibling’s
rustdoc for the retry-path scenario that motivates the split.
Used on the SDK worker’s claim_from_resume_grant path —
specifically the private claim_resumed_execution helper —
immediately before dispatching Self::claim_resumed_execution.
The returned index is fed into
ClaimResumedExecutionArgs::current_attempt_index
so the backend’s script / transaction targets the correct
existing attempt row (KEYS[6] on Valkey; ff_attempt PK tuple
on PG/SQLite).
Per-backend shape:
- Valkey —
HGET {exec}:core current_attempt_indexon the execution’s partition. Single command. Both the missing-field case (exec_corepresent butcurrent_attempt_indexabsent or empty-string, i.e. pre-claim state) and the missing-row case (noexec_corehash at all) read back asAttemptIndex(0). This preserves the pre-PR-3 inline-HGETsemantic and is safe because Valkey’s happy path requiresexec_coreto exist before this method is reached — the SDK only callsread_current_attempt_indexpost-grant, and grant issuance is gated onexec_corepresence. A genuinely absent row would surface as the proper business-logic error (NotAResumedExecution/ExecutionNotLeaseable) on the downstream FCALL. - Postgres —
SELECT attempt_index FROM ff_exec_core WHERE partition_key = $1 AND execution_id = $2. The column isNOT NULL DEFAULT 0so a pre-claim row reads back as0(matching Valkey’s missing-field case). Missing row surfaces asEngineError::Validation { kind: ValidationKind::InvalidInput, .. }— diverges from Valkey’s missing-row→ 0mapping. - SQLite —
SELECT attempt_index FROM ff_exec_core WHERE partition_key = ? AND execution_id = ?; identical semantics to Postgres (missing-row →InvalidInput).
Cross-backend asymmetry on missing row is intentional. The
SDK happy path never observes it (grant issuance on Valkey
requires exec_core, and PG/SQLite currently return
Unavailable from claim_from_grant per
project_claim_from_grant_pg_sqlite_gap.md). Consumers writing
backend-agnostic tooling against this method directly must
treat the missing-row case as backend-dependent — match on
InvalidInput for PG/SQLite, and treat an unexpected 0 as
the Valkey equivalent signal.
The default impl returns EngineError::Unavailable so the
trait addition is non-breaking for out-of-tree backends (same
precedent as Self::read_execution_context landing in v0.12
PR-1).
Sourcefn read_total_attempt_count<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_total_attempt_count<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<AttemptIndex, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Point-read of the execution’s total attempt counter — the monotonic count of claims that have ever fired against this execution (including the in-flight one once claimed).
Used on the SDK worker’s claim_from_grant / claim_execution
path — the next attempt-index for a fresh claim is this
counter’s current value (so 1 on the second retry after the
first attempt failed terminally). This is semantically distinct
from Self::read_current_attempt_index, which is a pointer
at the currently-leased attempt row and is only meaningful on
the claim_from_resume_grant path (where a live attempt already
exists and we want to re-seat its lease rather than mint a new
attempt row).
Reading the pointer on the claim_from_grant path was a live
bug: on the retry-of-a-retry scenario the pointer still named
the previous terminal-failed attempt, so the newly-minted
attempt collided with it (Valkey KEYS[6]) or mis-targeted the
PG/SQLite ff_attempt PK tuple. This method fixes that by
reading the counter that Lua 5920 / PG ff_claim_execution /
SQLite claim_impl all already consult when computing the
next attempt index.
Per-backend shape:
- Valkey —
HGET {exec}:core total_attempt_counton the execution’s partition. Single command; pre-claim read (field absent or empty) maps to0. - Postgres —
SELECT raw_fields->>'total_attempt_count' FROM ff_exec_core WHERE (partition_key, execution_id) = .... The field lives in the JSONBraw_fieldsbag rather than a dedicated column (mirrors howcreate_execution_implseeds it on row creation). Missing row →InvalidInput; missing field →0. - SQLite —
SELECT CAST(json_extract(raw_fields, '$.total_attempt_count') AS INTEGER) FROM ff_exec_core WHERE .... Same JSON-in-raw_fieldsshape as PG; uses the samejson_extractidiom already employed inff-backend-sqlite/src/queries/operator.rsfor replay_count.
The default impl returns EngineError::Unavailable so the
trait addition is non-breaking for out-of-tree backends (same
precedent as Self::read_current_attempt_index landing in
v0.12 PR-3).
Sourcefn set_execution_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_key: &'life2 str,
_value: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn set_execution_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_key: &'life2 str,
_value: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Set a single namespaced tag on an execution. Tag key MUST match
the reserved caller-namespace pattern ^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$ —
i.e. <caller>.<field> — or the call returns
EngineError::Validation { kind: ValidationKind::InvalidInput, .. }
with the offending key in detail. value is arbitrary UTF-8.
The namespace prefix is carried inline in key (e.g.
"cairn.session_id") — there is no separate namespace arg.
This matches the existing ff_set_execution_tags wire shape and
the flow-tag projection in ExecutionSnapshot::tags.
Validation is performed by each overriding backend impl via
validate_tag_key before the wire hop so PG / SQLite /
Valkey reject the same set of keys. The default trait impl
returns EngineError::Unavailable without running validation
— there is no meaningful storage to validate against on an
unsupported backend, and surfacing Unavailable before
Validation matches the precedence used elsewhere on the trait.
Backends MAY additionally validate on the storage tier (Valkey’s
Lua path does, with a more permissive prefix-only check).
Per-backend shape:
- Valkey —
ff_set_execution_tagsFCALL with a single{key → value}pair. Routes through the existing Lua contract (no new wire format). - Postgres —
UPDATE ff_exec_core SET raw_fields = jsonb_set( coalesce(raw_fields, '{}'::jsonb), '{tags,<key>}', to_jsonb($value)) WHERE (partition_key, execution_id) = .... Same storage shape read bySelf::describe_execution/Self::read_execution_context. - SQLite —
UPDATE ff_exec_core SET raw_fields = json_set( coalesce(raw_fields, '{}'), '$.tags."<key>"', $value) WHERE .... The key is quoted in the JSON path so dots inside the namespaced key (e.g.cairn.session_id) are treated as a single literal member name rather than JSON-path separators — yielding the same flatraw_fields.tagsshape as PG.
Missing execution surfaces as
EngineError::NotFound { entity: "execution" }
— matches the Valkey FCALL’s execution_not_found mapping and
the existing ScriptError::ExecutionNotFound → EngineError
conversion (ff_script::engine_error_ext).
The default impl returns EngineError::Unavailable so the
trait addition is non-breaking for out-of-tree backends.
Sourcefn set_flow_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_key: &'life2 str,
_value: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn set_flow_tag<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_key: &'life2 str,
_value: &'life3 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Set a single namespaced tag on a flow. Same namespace rule as
Self::set_execution_tag: key MUST match
^[a-z][a-z0-9_]*\.[a-z0-9_][a-z0-9_.]*$.
Per-backend shape:
-
Valkey —
ff_set_flow_tagsFCALL with a single pair. Tags land on the dedicatedff:flow:{fp:N}:<flow_id>:tagshash, not on theflow_corehash (diverges from the execution shape — execution tags live onff:exec:...:tagsby the same split). Lazy migration on first write: the Lua (ff_script::flowfabric.lua,ff_set_flow_tags) scansflow_coreonce per flow for pre-58.4 inline namespaced fields (anything matching^[a-z][a-z0-9_]*\.), HSETs them onto:tags, HDELs them fromflow_core, and stampstags_migrated=1onflow_coreso subsequent calls short-circuit to O(1). This heals flows created before RFC-058.4 landed; well-formed flows pay the migration cost only on their very first tag write. Callers MUST read tags viaSelf::get_flow_tag(HGET :tags <key>) — directHGETALLagainstflow_corewill not see post-migration values.Cross-backend parity caveat on
describe_flow: the pre-existingValkeyBackend::describe_flow/FlowSnapshot::tagsread path snapshotsflow_corefields only and does NOT today merge the:tagssub-hash, whereas Postgresdescribe_flowDOES surface flow tags viaff_backend_postgres::flow::extract_tags(which reads them offraw_fields— the same storeset_flow_tagwrites on PG). Trait consumers MUST NOT assume a tag written here will be visible viadescribe_flowon every backend: on Valkey, callers that need the full tag set should complement the snapshot with per-keySelf::get_flow_tagreads. Extending Valkeydescribe_flowto merge:tagsis additive and out of scope for this trait addition. -
Postgres —
UPDATE ff_flow_core SET raw_fields = jsonb_set(..., '{<key>}', ...)— flow tags are stored as top-levelraw_fieldskeys (matchesff_backend_postgres::flow::extract_tags). Notagsnesting on flows, which diverges from the execution shape. -
SQLite — mirrors PG:
UPDATE ff_flow_core SET raw_fields = json_set(..., '$."<key>"', $value) WHERE .... The key is quoted so the dotted namespaced key lands as a single flat top-level member ofraw_fields.
Missing flow surfaces as
EngineError::NotFound { entity: "flow" }
(matches the Valkey FCALL’s flow_not_found mapping).
The default impl returns EngineError::Unavailable.
Sourcefn get_execution_tag<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_key: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn get_execution_tag<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_key: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Read a single namespaced execution tag. Returns Ok(None) when
the tag is absent or the execution row does not exist —
the two cases are not distinguished on the read path. Callers
that need to distinguish should call Self::describe_execution
first (an Ok(None) from that method proves the execution is
absent). This matches Valkey’s native HGET semantics and
keeps the read path at a single round-trip on every backend.
key must pass validate_tag_key — a malformed key can
never be present in storage so the call short-circuits with
EngineError::Validation { kind: ValidationKind::InvalidInput, .. }
rather than round-tripping.
Per-backend shape:
- Valkey —
HGET :tags <key>on the execution’s partition. - Postgres —
SELECT raw_fields->'tags'->><key> FROM ff_exec_core WHERE ...withfetch_optional→ missing row collapses toNone. - SQLite —
SELECT json_extract(raw_fields, '$.tags."<key>"') FROM ff_exec_core WHERE ...with the same collapse. The key is quoted in the JSON path so dotted namespaced keys resolve to the flat literal member written byset_execution_tag.
The default impl returns EngineError::Unavailable.
Sourcefn get_execution_namespace<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_execution_namespace<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read an execution’s namespace scalar. Returns Ok(None) when
the row is absent or the field is unset. Dedicated point-read
used by the scanner per-candidate filter (should_skip_candidate)
to preserve the 1-HGET cost contract documented in
ff_engine::scanner::should_skip_candidate — describe_execution
is heavier (HGETALL / full snapshot) and unnecessary when only
the namespace scalar is needed.
Per-backend shape:
- Valkey —
HGET :core namespaceon the execution’s partition (single field read on the already-hot exec_core hash). - Postgres —
SELECT raw_fields->>'namespace' FROM ff_exec_core WHERE partition_key = $1 AND execution_id = $2. - SQLite —
SELECT json_extract(raw_fields, '$.namespace') FROM ff_exec_core WHERE ....
The default impl returns EngineError::Unavailable.
Sourcefn get_flow_tag<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_key: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn get_flow_tag<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_key: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Option<String>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Read a single namespaced flow tag. Returns Ok(None) when
the tag is absent or the flow row does not exist (same
collapse semantics as Self::get_execution_tag). Symmetry
partner — consumers like cairn read cairn.session_id off
flows for archival.
key must pass validate_tag_key.
Per-backend shape:
- Valkey —
HGET :tags <key>on the flow’s partition. - Postgres —
SELECT raw_fields->><key> FROM ff_flow_core WHERE ...(top-levelraw_fieldskey, matches the flow-tag storage shape). - SQLite —
SELECT json_extract(raw_fields, '$."<key>"') FROM ff_flow_core WHERE ...(quoted key — seeset_flow_tag).
The default impl returns EngineError::Unavailable.
Sourcefn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
List dependency edges adjacent to an execution. Read-only; the
backend resolves the subject execution’s flow, reads the
direction-specific adjacency SET, and decodes each member’s
flow-scoped edge:<edge_id> hash.
Returns an empty Vec when the subject has no edges on the
requested side — including standalone executions (no owning
flow). Ordering is unspecified: the underlying adjacency SET
is an unordered SMEMBERS read. Callers that need deterministic
order should sort by EdgeSnapshot::edge_id /
EdgeSnapshot::created_at themselves.
Parse failures on the edge hash surface as
EngineError::Validation { kind: ValidationKind::Corruption, .. }
— unknown fields, missing required fields, endpoint mismatches
against the adjacency SET all fail loud rather than silently
returning partial results.
Gated on the core feature — edge reads are part of the
minimal engine surface a Postgres-style backend must honour.
Sourcefn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Snapshot a single dependency edge by its owning flow + edge id.
Ok(None) when the edge hash is absent (never staged, or
staged under a different flow than flow_id). Parse failures
on a present edge hash surface as
EngineError::Validation { kind: ValidationKind::Corruption, .. }
— the stored flow_id field is cross-checked against the
caller’s expected flow_id so a wrong-key read fails loud
rather than returning an unrelated edge.
Gated on the core feature — single-edge reads are part of
the minimal snapshot surface an alternate backend must honour
alongside Self::describe_execution / Self::describe_flow
/ Self::list_edges.
Sourcefn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
_eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
_eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Resolve an execution’s owning flow id, if any.
Ok(None) when the execution’s core record is absent or has
no associated flow (standalone execution). A present-but-
malformed flow_id field surfaces as
EngineError::Validation { kind: ValidationKind::Corruption, .. }.
Gated on the core feature. Used by ff-sdk’s
list_outgoing_edges / list_incoming_edges to pivot from a
consumer-supplied ExecutionId to the FlowId required by
Self::list_edges. A Valkey backend serves this with a
single HGET exec_core flow_id; a Postgres backend serves it
with the equivalent single-column row lookup.
Sourcefn list_flows<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_flows<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<FlowId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List flows on a partition with cursor-based pagination (issue #185).
Returns a ListFlowsPage of FlowSummary
rows ordered by flow_id (UUID byte-lexicographic). cursor
is None for the first page; callers forward the returned
next_cursor verbatim to continue iteration, and the listing
is exhausted when next_cursor is None. limit is the
maximum number of rows to return on this page — implementations
MAY return fewer (end of partition) but MUST NOT exceed it.
Ordering rationale: flow ids are UUIDs, and both Valkey
(sort after-the-fact) and Postgres (ORDER BY flow_id) can
agree on byte-lexicographic order — the same order
FlowId::to_string() produces for canonical hyphenated UUIDs.
Mapping to cursor > flow_id keeps the contract backend-
independent.
§Postgres implementation pattern
A Postgres-backed implementation serves this directly with
SELECT flow_id, created_at_ms, public_flow_state
FROM ff_flow
WHERE partition_key = $1
AND ($2::uuid IS NULL OR flow_id > $2)
ORDER BY flow_id
LIMIT $3 + 1;— reading one extra row to decide whether next_cursor should
be set to the last row’s flow_id. The Valkey implementation
maintains the ff:idx:{fp:N}:flow_index SET and performs the
sort + slice client-side (SMEMBERS then sort-by-UUID-bytes),
pipelining HGETALL flow_core for each row on the page.
Gated on the core feature — flow listing is part of the
minimal engine surface a Postgres-style backend must honour.
Sourcefn list_lanes<'life0, 'async_trait>(
&'life0 self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_lanes<'life0, 'async_trait>(
&'life0 self,
_cursor: Option<LaneId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Enumerate registered lanes with cursor-based pagination.
Lanes are global (not partition-scoped) — the backend serves
this from its lane registry and does NOT accept a
crate::partition::Partition argument. Results are sorted
by LaneId name so the ordering is stable across calls and
cursors address a deterministic position in the sort.
cursor— exclusive lower bound.Nonestarts from the first lane. To continue a walk, pass the previous page’sListLanesPage::next_cursor.limit— hard cap on the number of lanes returned in the page. Backends MAY round this down when the registry size is smaller; they MUST NOT return more thanlimit.
ListLanesPage::next_cursor is Some(last_lane_in_page)
iff at least one more lane exists after the returned page,
and None on the final page. Callers loop until next_cursor
is None to read the full registry.
Gated on the core feature — lane enumeration is part of the
minimal snapshot surface an alternate backend must honour
alongside Self::describe_flow / Self::list_edges.
Sourcefn list_suspended<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_suspended<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List suspended executions in one partition, cursor-paginated,
with each entry’s suspension reason_code populated (issue
#183).
Consumer-facing “what’s blocked on what?” panels (ff-board’s
suspended-executions view, operator CLIs) need the reason in
the list response so the UI does not round-trip per row to
describe_execution for a field it knows it needs. reason
on [SuspendedExecutionEntry] carries the free-form
suspension:current.reason_code field — see the type rustdoc
for the String-not-enum rationale.
cursor is opaque to callers; pass None to start a fresh
scan and feed the returned ListSuspendedPage::next_cursor
back in on subsequent pages until it comes back None.
limit bounds the entries count; backends MAY return fewer
when the partition is exhausted.
Ordering is by ascending suspended_at_ms (the per-lane
suspended ZSET score == timeout_at or the no-timeout
sentinel) with execution id as a lex tiebreak, so cursor
continuation is deterministic across calls.
Gated on the core feature — suspended-list enumeration is
part of the minimal engine surface a Postgres-style backend
must honour.
Sourcefn list_executions<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_executions<'life0, 'async_trait>(
&'life0 self,
_partition: PartitionKey,
_cursor: Option<ExecutionId>,
_limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Forward-only paginated listing of the executions indexed under one partition.
Reads the partition-wide ff:idx:{p:N}:all_executions set,
sorts lexicographically on ExecutionId, and returns the page
of ids strictly greater than cursor (or starting from the
smallest id when cursor = None). The returned
ListExecutionsPage::next_cursor is the last id on the page
iff at least one more id exists past it; None signals
end-of-stream.
limit is the maximum number of ids returned on this page. A
limit of 0 returns an empty page with next_cursor = None.
Backends MAY cap limit internally (Valkey: 1000) and return
fewer ids than requested; callers continue paginating until
next_cursor == None.
Ordering is stable under concurrent inserts for already-emitted ids (an id less-than-or-equal-to the caller’s cursor is never re-emitted in later pages) but new inserts past the cursor WILL appear in subsequent pages — consistent with forward-only cursor semantics.
Gated on the core feature — partition-scoped listing is part
of the minimal engine surface every backend must honour.
Sourcefn deliver_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Deliver an external signal to a suspended execution’s waitpoint.
The backend atomically records the signal, evaluates the resume
condition, and — when satisfied — transitions the execution
from suspended to runnable (or buffers the signal when the
waitpoint is still pending). Duplicate delivery — same
idempotency_key + waitpoint — surfaces as
DeliverSignalResult::Duplicate with the pre-existing
signal_id rather than mutating state twice.
Input validation (HMAC token presence, payload size limits,
signal-name shape) is the backend’s responsibility; callers
pass a fully populated DeliverSignalArgs and receive typed
outcomes or typed errors (ScriptError::invalid_token,
ScriptError::token_expired, ScriptError::ExecutionNotFound
surfaced via EngineError::Transport on the Valkey backend).
Gated on the core feature — signal delivery is part of the
minimal trigger surface every backend must honour so ff-server
/ REST handlers can dispatch against Arc<dyn EngineBackend>
without knowing which backend is running underneath.
Sourcefn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Claim a resumed execution — a previously-suspended attempt that
has cleared its resume condition (e.g. via
Self::deliver_signal) and now needs a worker to pick up the
same attempt index.
Distinct from Self::claim (fresh work) and
Self::claim_from_resume_grant (grant-based ownership transfer
after a crash): the resumed-claim path re-binds an existing
attempt rather than minting a new one. The backend issues a
fresh lease_id + bumps the lease_epoch, preserving
attempt_id / attempt_index so stream frames and progress
updates continue on the same attempt.
Typed failures surface via ScriptError → EngineError:
NotAResumedExecution when the attempt state is not
attempt_interrupted, ExecutionNotLeaseable when the
lifecycle phase is not runnable, and InvalidClaimGrant
when the grant key is missing or was already consumed.
Gated on the core feature — resumed-claim is part of the
minimal trigger surface every backend must honour.
Sourcefn scan_eligible_executions<'life0, 'async_trait>(
&'life0 self,
_args: ScanEligibleArgs,
) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn scan_eligible_executions<'life0, 'async_trait>(
&'life0 self,
_args: ScanEligibleArgs,
) -> Pin<Box<dyn Future<Output = Result<Vec<ExecutionId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Scan a lane’s eligible ZSET on one partition for highest-priority executions awaiting a worker (v0.12 PR-5).
Lifted from the SDK-side ZRANGEBYSCORE inline on
FlowFabricWorker::claim_next — the scheduler-bypass scanner
gated behind direct-valkey-claim. The trait method itself is
backend-agnostic; consumers that drive the scanner loop
(bench harnesses, single-tenant dev) compose it with
Self::issue_claim_grant + Self::claim_execution to
replicate the pre-PR-5 claim_next body.
§Backend coverage
- Valkey —
ZRANGEBYSCORE eligible_zset -inf +inf LIMIT 0 <limit>on the lane’s partition-scoped eligible key. Single command; no script round-trip. Wire shape is byte-for-byte identical to the pre-PR SDK inline call so bench traces match pre-PR without new#[tracing::instrument]span names. - Postgres / SQLite — use the
Err(Unavailable)default. PG/SQLite consumers drive work through the scheduler-routedSelf::claim_for_workerpath instead of the scanner primitives exposed here; lifting the scheduler itself onto the trait is RFC-024 follow-up scope. Seeproject_claim_from_grant_pg_sqlite_gap.mdfor motivation.
Default impl returns EngineError::Unavailable so the trait
addition is non-breaking for out-of-tree backends. Same
precedent as Self::claim_execution landing in v0.12 PR-4.
Sourcefn issue_claim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueClaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn issue_claim_grant<'life0, 'async_trait>(
&'life0 self,
_args: IssueClaimGrantArgs,
) -> Pin<Box<dyn Future<Output = Result<IssueClaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Issue a claim grant — the scheduler’s admission write — for a single execution on a single lane (v0.12 PR-5).
Lifted from the SDK-side ff_issue_claim_grant inline helper
on FlowFabricWorker::claim_next. The backend atomically
writes the grant hash, appends to the per-worker grant index,
and removes the execution from the lane’s eligible ZSET.
Typed rejects surface via EngineError::Validation:
CapabilityMismatch when the worker’s capabilities do not
cover the execution’s required_capabilities, InvalidInput
for malformed args. Transport faults surface via
EngineError::Transport.
§Backend coverage
- Valkey — one
ff_issue_claim_grantFCALL. KEYS/ARGV shape is byte-for-byte identical to the pre-PR SDK inline call; bench traces match pre-PR. - Postgres / SQLite —
Err(Unavailable)default; useSelf::claim_for_workerinstead. SeeSelf::scan_eligible_executionsfor the cross-link rationale.
Sourcefn block_route<'life0, 'async_trait>(
&'life0 self,
_args: BlockRouteArgs,
) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn block_route<'life0, 'async_trait>(
&'life0 self,
_args: BlockRouteArgs,
) -> Pin<Box<dyn Future<Output = Result<BlockRouteOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Move an execution from a lane’s eligible ZSET into its blocked_route ZSET (v0.12 PR-5).
Lifted from the SDK-side ff_block_execution_for_admission
inline helper on FlowFabricWorker::claim_next. Called after
a Self::issue_claim_grant CapabilityMismatch reject —
without a block step, the inline scanner would re-pick the
same top-of-ZSET every tick (parity with
ff-scheduler::Scheduler::block_candidate).
The engine’s unblock scanner periodically promotes blocked_route back to eligible once a worker with matching caps registers.
§Backend coverage
- Valkey — one
ff_block_execution_for_admissionFCALL. - Postgres / SQLite —
Err(Unavailable)default; the scheduler-routedSelf::claim_for_workerpath handles admission rejects server-side.
Sourcefn claim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_execution<'life0, 'async_trait>(
&'life0 self,
_args: ClaimExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Consume a scheduler-issued claim grant to mint a fresh attempt.
The SDK’s grant-consumer path — paired with FlowFabricWorker::claim_from_grant
in ff-sdk — routes through this method. The scheduler has
already validated budget / quota / capabilities and written a
grant (Valkey claim_grant hash); this call atomically
consumes that grant and creates the attempt row, mints
lease_id + lease_epoch, and returns a
ClaimExecutionResult::Claimed carrying the minted lease
triple.
Distinct from Self::claim (the scheduler-bypass scanner
used by the direct-valkey-claim feature) — this method
assumes the grant already exists and skips capability / ZSET
scanning. The Valkey impl fires exactly one ff_claim_execution
FCALL.
Typed failures surface via ScriptError → EngineError:
UseClaimResumedExecution when the attempt is actually
attempt_interrupted (caller should retry via
Self::claim_resumed_execution — see ContentionKind at
ff_core::engine_error), InvalidClaimGrant when the grant is
missing / consumed / worker-mismatched, CapabilityMismatch
when the execution’s required_capabilities drifted after
grant issuance.
§Backend coverage
- Valkey — implemented in
ff-backend-valkey(oneff_claim_executionFCALL). - Postgres / SQLite — use the
Err(Unavailable)default in this PR. Grants on PG / SQLite today flow throughPostgresScheduler::claim_for_worker(a sibling struct, not anEngineBackendmethod); wiring the default-over-trait behaviour into a PG / SQLiteclaim_executionimpl lands with a future RFC-024 grant-consumer extension.
The default impl returns EngineError::Unavailable so the
trait addition is non-breaking for out-of-tree backends. Same
precedent as Self::read_current_attempt_index landing in
v0.12 PR-3.
Sourcefn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_downstream_execution_id: &'life2 ExecutionId,
_policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_downstream_execution_id: &'life2 ExecutionId,
_policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-016 Stage A: set the inbound-edge-group policy for a
downstream execution. Must be called before the first
add_dependency(... -> downstream_execution_id) — the backend
rejects with EngineError::Conflict if edges have already
been staged for this group.
Stage A honours only
EdgeDependencyPolicy::AllOf;
the AnyOf / Quorum variants return
EngineError::Validation with
detail = "stage A supports AllOf only; AnyOf/Quorum land in stage B"
until Stage B’s resolver lands.
Sourcefn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
_args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Rotate the waitpoint HMAC signing kid cluster-wide.
v0.7 migration-master Q4 (adjudicated 2026-04-24). Additive trait surface so Valkey and Postgres backends can both expose the “rotate everywhere” semantic under one name.
- Valkey impl fans out an
ff_rotate_waitpoint_hmac_secretFCALL per execution partition.entries.len() == num_flow_partitionsand per-partition failures are surfaced as innerErrentries — the call as a whole does not fail when one partition’s FCALL fails, matching [ff_sdk::admin::rotate_waitpoint_hmac_secret_all_partitions]’s partial-success contract. - Postgres impl (Wave 4) writes one row to
ff_waitpoint_hmac(kid, secret, rotated_at)and returns a single-entry vec withpartition = 0.
The default impl returns
EngineError::Unavailable with
op = "rotate_waitpoint_hmac_secret_all" so backends that
haven’t implemented the method surface the miss loudly rather
than silently no-op’ing. Both concrete backends override.
Sourcefn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn seed_waitpoint_hmac_secret<'life0, 'async_trait>(
&'life0 self,
_args: SeedWaitpointHmacSecretArgs,
) -> Pin<Box<dyn Future<Output = Result<SeedOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Seed the initial waitpoint HMAC secret for a fresh deployment (issue #280).
Idempotent. If a current_kid (Valkey per-partition) or
an active kid row (Postgres) already exists with the given
kid, the method returns
SeedOutcome::AlreadySeeded without overwriting, reporting
whether the stored secret matches the caller-supplied one via
same_secret. Callers (cairn boot, operator tooling) invoke
this on every boot and let the backend decide whether to
install — removing the client-side “check then HSET” race that
cairn’s raw-HSET boot path silently tolerated.
For rotation of an already-seeded secret, use
Self::rotate_waitpoint_hmac_secret_all instead; seed is
install-only.
The default impl returns EngineError::Unavailable with
op = "seed_waitpoint_hmac_secret" so backends that haven’t
implemented the method surface the miss loudly.
Sourcefn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_from: StreamCursor,
_to: StreamCursor,
_count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read frames from a completed or in-flight attempt’s stream.
from / to are StreamCursor values — StreamCursor::Start
/ StreamCursor::End are equivalent to XRANGE - / +, and
StreamCursor::At("<id>") reads from a concrete entry id.
Input validation (count_limit bounds, cursor shape) is the
caller’s responsibility — SDK-side wrappers in
ff-sdk enforce bounds before
forwarding. Backends MAY additionally reject out-of-range
input via EngineError::Validation.
Gated on the streaming feature — stream reads are part of
the stream-subset surface a backend without XREAD-like
primitives may omit.
Sourcefn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
_after: StreamCursor,
_block_ms: u64,
_count_limit: u64,
_visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Tail a live attempt’s stream.
after is an exclusive StreamCursor — entries with id
strictly greater than after are returned. StreamCursor::Start
/ StreamCursor::End are NOT accepted here; callers MUST pass
a concrete id (or StreamCursor::from_beginning()). The SDK
wrapper rejects the open markers before reaching the backend.
block_ms == 0 → non-blocking peek. block_ms > 0 → blocks up
to that many ms for a new entry.
visibility (RFC-015 §6.1) filters the returned entries by
their stored StreamMode
mode field. Default
TailVisibility::All
preserves v1 behaviour.
Gated on the streaming feature — see read_stream.
Sourcefn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_execution_id: &'life1 ExecutionId,
_attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read the rolling summary document for an attempt (RFC-015 §6.3).
Returns Ok(None) when no StreamMode::DurableSummary
frame has ever been appended for the attempt. Non-blocking Hash
read; safe to call from any consumer without holding the lease.
Gated on the streaming feature — summary reads are part of
the stream-subset surface.
Sourcefn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_execution<'life0, 'async_trait>(
&'life0 self,
_args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create an execution. Ingress row 6 (RFC-017 §4). Wraps
ff_create_execution on Valkey; INSERT INTO ff_execution ...
on Postgres. The idempotency_key + backend-side default
dedup_ttl_ms = 86400000 make duplicate submissions idempotent.
Sourcefn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_flow<'life0, 'async_trait>(
&'life0 self,
_args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a flow header. Ingress row 5.
Sourcefn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
_args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Atomically add an execution to a flow (single-FCALL co-located commit on Valkey; single-transaction UPSERT on Postgres).
Sourcefn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
_args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Stage a dependency edge between flow members. CAS-guarded on
graph_revision — stale rev returns Contention(StaleGraphRevision).
Sourcefn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
_args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Apply a staged dependency edge to its downstream child.
Sourcefn resolve_dependency<'life0, 'async_trait>(
&'life0 self,
_args: ResolveDependencyArgs,
) -> Pin<Box<dyn Future<Output = Result<ResolveDependencyOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn resolve_dependency<'life0, 'async_trait>(
&'life0 self,
_args: ResolveDependencyArgs,
) -> Pin<Box<dyn Future<Output = Result<ResolveDependencyOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Resolve one dependency edge after its upstream reached a
terminal outcome — satisfy on “success”, mark impossible
otherwise. Idempotent (AlreadyResolved on replay).
PR-7b Step 0 overlap-resolver: lifted here so cluster 2
(scanner/dependency_reconciler) could trait-route through
Arc<dyn EngineBackend> without a merge conflict with cluster
4. Cluster 4 (completion_listener::spawn_dispatch_loop)
ultimately routed through the coarser
Self::cascade_completion (per-payload) instead of looping
over this per-edge method, because the Postgres cascade is
outbox-driven rather than per-edge — see cascade_completion
rustdoc “Timing semantics” for details.
§Backend status
- Valkey: wraps
ff_resolve_dependency(RFC-016 Stage C signature). Atomic single-slot FCALL. - Postgres:
Unavailable. PG’s post-completion cascade is not per-edge; it runs viaff_backend_postgres::dispatch::dispatch_completion(event_id)keyed on theff_completion_eventoutbox row. The Valkey- shaped per-edge resolve does not map cleanly to that model; PG’sdependency_reconcileralready callsdispatch_completiondirectly. The engine’s PR-7b/final integration test expectsUnsupportedlogs from Valkey-shaped scanners on a PG deployment — this surface honours that contract. - SQLite:
Unavailablefor the same reason (mirrors PG).
The default impl returns EngineError::Unavailable so a
backend that has not been migrated surfaces a typed
Unsupported-grade error rather than a panic.
Sourcefn cascade_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_payload: &'life1 CompletionPayload,
) -> Pin<Box<dyn Future<Output = Result<CascadeOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn cascade_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_payload: &'life1 CompletionPayload,
) -> Pin<Box<dyn Future<Output = Result<CascadeOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Cascade a terminal-execution completion into its downstream
edges. Consumed by
ff-engine::completion_listener::spawn_dispatch_loop (PR-7b
Cluster 4) to trait-route the post-completion DAG-promotion
path through Arc<dyn EngineBackend>.
Distinct from Self::resolve_dependency: that method is
per-edge (one ff_resolve_dependency FCALL); this method is
per-completion and orchestrates the full outgoing-edge walk
plus child_skipped recursion (Valkey) or outbox-event
dispatch (Postgres).
§Timing semantics
Backends diverge on when the caller observes cascade work.
See CascadeOutcome for the full contract; the short form:
- Valkey: synchronous. FCALL-driven walk completes inline;
child_skippeddescendants are recursively cascaded up to the internalMAX_CASCADE_DEPTHcap before return. - Postgres: asynchronous via the
ff_completion_eventoutbox. The call resolvespayloadto itsevent_id, runsff_backend_postgres::dispatch::dispatch_completion, and returns when the outbox row has been claimed + its direct hops advanced. Further-descendant cascades ride their own outbox events (emitted by the per-hop tx) — NOT this call.
Consumers that depend on synchronous cascade must either target
Valkey explicitly or observe PG’s dispatched_at_ms clearance
via the dependency_reconciler partial index to verify drain.
The default impl returns EngineError::Unavailable so
backends that have not been migrated surface a typed error
rather than a panic.
Sourcefn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Operator-initiated execution cancel (row 2).
Sourcefn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Re-score an execution’s eligibility priority (row 17).
Sourcefn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Replay a terminal execution (row 22). Variadic KEYS handling (inbound-edge pre-read) is hidden inside the Valkey impl per RFC-017 §4 row 3.
Sourcefn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Operator-initiated lease revoke (row 19).
Sourcefn complete_execution<'life0, 'async_trait>(
&'life0 self,
_args: CompleteExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CompleteExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn complete_execution<'life0, 'async_trait>(
&'life0 self,
_args: CompleteExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CompleteExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Service-layer complete_execution — peer of Self::complete
that takes a fence triple instead of a worker Handle. See
the group preamble above for cairn-migration context.
Sourcefn fail_execution<'life0, 'async_trait>(
&'life0 self,
_args: FailExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<FailExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn fail_execution<'life0, 'async_trait>(
&'life0 self,
_args: FailExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<FailExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Service-layer fail_execution — peer of Self::fail that
takes a fence triple instead of a worker Handle.
Sourcefn renew_lease<'life0, 'async_trait>(
&'life0 self,
_args: RenewLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RenewLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn renew_lease<'life0, 'async_trait>(
&'life0 self,
_args: RenewLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RenewLeaseResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Service-layer renew_lease — peer of Self::renew that
takes a fence triple instead of a worker Handle.
Sourcefn resume_execution<'life0, 'async_trait>(
&'life0 self,
_args: ResumeExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ResumeExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn resume_execution<'life0, 'async_trait>(
&'life0 self,
_args: ResumeExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ResumeExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Service-layer resume_execution — transitions a suspended
execution back to runnable. Distinct from
Self::claim_from_resume_grant (which mints a worker handle
against an already-eligible resumed execution): this method is
the lifecycle transition primitive the control plane calls
when an operator / auto-resume policy moves a suspended
execution forward.
The Valkey impl pre-reads current_waitpoint_id + lane_id
from exec_core so callers only need the execution id + the
trigger type — same ergonomics as revoke_lease reading
current_worker_instance_id when callers omit it.
Sourcefn check_admission<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_quota_policy_id: &'life1 QuotaPolicyId,
_dimension: &'life2 str,
_args: CheckAdmissionArgs,
) -> Pin<Box<dyn Future<Output = Result<CheckAdmissionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn check_admission<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_quota_policy_id: &'life1 QuotaPolicyId,
_dimension: &'life2 str,
_args: CheckAdmissionArgs,
) -> Pin<Box<dyn Future<Output = Result<CheckAdmissionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Service-layer check_admission_and_record — atomic admission
check against a quota policy. Callers supply the policy id +
dimension (quota keys live on their own {q:<policy>}
partition that cannot be derived from execution_id, so these
travel outside CheckAdmissionArgs). dimension defaults
to "default" inside the Valkey body when the caller passes
an empty string — matches cairn’s pre-migration default.
Sourcefn evaluate_flow_eligibility<'life0, 'async_trait>(
&'life0 self,
_args: EvaluateFlowEligibilityArgs,
) -> Pin<Box<dyn Future<Output = Result<EvaluateFlowEligibilityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn evaluate_flow_eligibility<'life0, 'async_trait>(
&'life0 self,
_args: EvaluateFlowEligibilityArgs,
) -> Pin<Box<dyn Future<Output = Result<EvaluateFlowEligibilityResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Service-layer evaluate_flow_eligibility — read-only check
that returns the execution’s current eligibility state
(eligible, blocked_by_dependencies, or a backend-specific
status string). Called by cairn’s dependency-resolution path
to decide whether a downstream execution can proceed.
Sourcefn record_spend<'life0, 'async_trait>(
&'life0 self,
_args: RecordSpendArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn record_spend<'life0, 'async_trait>(
&'life0 self,
_args: RecordSpendArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Per-execution budget spend with tenant-open dimensions.
Cairn #454 Q1/Q2: takes an open-set BTreeMap<String, u64> of
deltas (distinct from the fixed-shape
Self::report_usage/Self::report_usage_admin which use
[UsageDimensions]). Return shape reuses ReportUsageResult
— same Ok / SoftBreach / HardBreach / AlreadyApplied
variants cairn’s UI already branches on.
The default impl returns EngineError::Unavailable so the
trait remains additive for backends that have not landed the
#454 body yet.
Sourcefn release_budget<'life0, 'async_trait>(
&'life0 self,
_args: ReleaseBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn release_budget<'life0, 'async_trait>(
&'life0 self,
_args: ReleaseBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Per-execution budget attribution release.
Called on execution termination to reverse this execution’s contribution to a budget counter. Per cairn #454 clarification this is per-execution, not a whole-budget flush — the budget persists across executions.
The default impl returns EngineError::Unavailable.
Sourcefn deliver_approval_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverApprovalSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn deliver_approval_signal<'life0, 'async_trait>(
&'life0 self,
_args: DeliverApprovalSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Operator-driven approval-signal delivery.
Pre-shaped variant of Self::deliver_signal where the caller
does not carry the waitpoint token. The backend reads the
token from ff_waitpoint_pending, HMAC-verifies server-side,
and dispatches. Operator API never sees the token bytes.
Cairn #454 Q3 — signal_name is a flat string (conventional
values "approved" / "rejected"); audit metadata lives in
cairn’s audit log, not on the FF surface.
The default impl returns EngineError::Unavailable.
Sourcefn issue_grant_and_claim<'life0, 'async_trait>(
&'life0 self,
_args: IssueGrantAndClaimArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn issue_grant_and_claim<'life0, 'async_trait>(
&'life0 self,
_args: IssueGrantAndClaimArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimGrantOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Backend-atomic issue_claim_grant + claim_execution.
Cairn #454 Q4 — composing the two primitives caller-side risks
leaking a grant if claim_execution fails after
issue_claim_grant succeeded. This method’s contract is that
the composition is backend-atomic: Valkey fuses them in one
FCALL; PG/SQLite fuse them in one tx.
The default impl must not be a chained call — it returns
EngineError::Unavailable so consumers cannot accidentally
use a non-atomic fallback.
Sourcefn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a budget definition (row 6).
Sourcefn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Reset a budget’s usage counters (row 10).
Sourcefn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Create a quota policy (row 7).
Sourcefn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Read-only budget status for operator visibility (row 8).
Sourcefn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Admin-path report_usage (row 9 + RFC-017 §5 round-1 F4).
Distinct from the existing Self::report_usage which takes
a worker handle — the admin path has no lease context.
Sourcefn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Fetch the stored result payload for a completed execution
(row 4). Returns Ok(None) when the execution is missing, not
yet complete, or its payload was trimmed by retention policy.
Sourcefn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
List the pending-or-active waitpoints for an execution, cursor
paginated (row 5 / §8). Stage A preserves the existing
PendingWaitpointInfo shape; Stage D ships the §8 HMAC
sanitisation + (token_kid, token_fingerprint) schema.
Sourcefn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Backend-level reachability probe (row 1). Valkey: PING;
Postgres: SELECT 1.
Sourcefn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
_args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Scheduler-routed claim entrypoint (row 18, RFC-017 §7). Valkey
forwards to its ff_scheduler::Scheduler cursor; Postgres
forwards to PostgresScheduler’s FOR UPDATE SKIP LOCKED
path.
Backends that carry an embedded scheduler (e.g. ValkeyBackend
constructed via with_embedded_scheduler, or PostgresBackend
with its with_scanners sibling) route the claim through it.
Backends without a wired scheduler return
EngineError::Unavailable. HTTP consumers use
FlowFabricWorker::claim_via_server instead.
Sourcefn backend_label(&self) -> &'static str
fn backend_label(&self) -> &'static str
Static observability label identifying the backend family in
logs + metrics (RFC-017 §5.4 + §9 Stage B). Default impl
returns "unknown" so legacy impl EngineBackend blocks that
have not upgraded keep compiling; every in-tree backend
overrides — ValkeyBackend → "valkey", PostgresBackend →
"postgres".
Sourcefn as_any(&self) -> &(dyn Any + 'static)
fn as_any(&self) -> &(dyn Any + 'static)
Backend downcast escape hatch (v0.12 PR-7a transitional).
Scanner supervisors in ff-engine still dispatch through a
concrete ferriskey::Client; to keep the engine’s public
boundary backend-agnostic (Arc<dyn EngineBackend>) while the
scanner internals remain Valkey-shaped, the engine downcasts
via this method and reaches in for the embedded client. Every
backend that wants to be consumed by Engine::start_with_completions
overrides this to return self as &dyn Any; the default
returns a placeholder so a stray downcast_ref fails cleanly
rather than risking unsound behaviour.
v0.13 (PR-7b) will trait-ify individual scanners onto
EngineBackend and retire ff-engine’s dependence on this
downcast path. The method itself will remain on the trait
(likely deprecated) rather than be removed — removing a
public trait method is a breaking change for external
impl EngineBackend blocks.
Sourcefn capabilities(&self) -> Capabilities
fn capabilities(&self) -> Capabilities
RFC-018 Stage A: snapshot of this backend’s identity + the
flat Supports surface it can actually service. Consumers use
this at startup to gate UI features / choose between alternative
code paths before dispatching. See
rfcs/RFC-018-backend-capability-discovery.md for the full
discovery contract and the four owner-adjudicated open
questions (granularity: coarse; version: struct; sync; no
event stream).
Default: returns a value tagged family = "unknown" with every
supports.* bool false, so pre-RFC-018 out-of-tree backends
keep compiling and consumers treat “all false” as “dispatch
and catch EngineError::Unavailable” (pre-RFC-018 behaviour).
Concrete in-tree backends (ValkeyBackend, PostgresBackend)
override to populate a real value.
Sync (no .await): backend-static info should not require a
probe on every query. Dynamic probes happen once at
connect* time and cache the result.
Sourcefn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn prepare<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<PrepareOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Issue #281: run one-time backend-specific boot preparation.
Intended to run ONCE per deployment startup — NOT per request.
Idempotent and safe for consumers to call on every application
boot; backends that have nothing to do return
PrepareOutcome::NoOp without side effects.
Per-backend behaviour:
- Valkey — issues
FUNCTION LOAD REPLACEfor theflowfabricLua library (with bounded retry on transient transport faults; permanent compile errors surface asEngineError::Transportwithout retry). ReturnsPrepareOutcome::Appliedcarrying"FUNCTION LOAD (flowfabric lib v<N>)". - Postgres — returns
PrepareOutcome::NoOp. Schema migrations are applied out-of-band perrfcs/drafts/v0.7-migration-master.md §Q12; the backend runs a schema-version check at connect time and refuses to start on mismatch, so no boot-side prepare work remains. - Default impl — returns
PrepareOutcome::NoOpso out-of-tree backends without preparation work compile without boilerplate.
§Relationship to the in-tree boot path
ValkeyBackend::initialize_deployment (called from
Server::start_with_metrics) already invokes
ensure_library inline as
its step 4; that path is unchanged. prepare() exists as a
trait-surface entry point so consumers that construct an
Arc<dyn EngineBackend> outside of Server (e.g.
cairn-fabric’s boot path at cairn-fabric/src/boot.rs) can
run the same preparation without reaching into
backend-specific modules. The overlap is intentional: calling
both prepare() and initialize_deployment is safe because
FUNCTION LOAD REPLACE is idempotent under the version
check.
§Layer forwarding
Layer impls (HookedBackend, ff-sdk layers) do NOT forward
prepare today — consistent with backend_label / ping /
shutdown_prepare. Consumers that wrap a backend in layers
MUST call prepare() on the raw backend before wrapping, or
accept the default PrepareOutcome::NoOp.
Sourcefn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
_grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Drain-before-shutdown hook (RFC-017 §5.4). The server calls
this before draining its own background tasks so backend-
scoped primitives (Valkey stream semaphore, Postgres sqlx
pool, …) can close their gates and await in-flight work up to
grace.
Default impl returns Ok(()) — a no-op backend has nothing
backend-scoped to drain. Concrete backends whose data plane
owns resources (connection pools, semaphores, listeners)
override with a real body.
Sourcefn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
RFC-017 Stage E2: the “header” portion of cancel_flow — run the
atomic flow-state flip (Valkey: ff_cancel_flow FCALL; Postgres:
cancel_flow_once tx), decode policy + membership, and surface
the flow_already_terminal idempotency branch as a first-class
[CancelFlowHeader::AlreadyTerminal] so the Server can build
the wire CancelFlowResult without reaching for a raw
Client. Separate from the existing
EngineBackend::cancel_flow entry point (which takes the
enum-typed (policy, wait) split and returns the wait-collapsed
CancelFlowResult) because the Server owns its own
wait-dispatch + member-cancel machinery via
EngineBackend::cancel_execution + backlog ack.
Default impl returns EngineError::Unavailable so un-migrated
backends surface the miss loudly.
Sourcefn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-017 Stage E2: best-effort acknowledgement that one member of
a cancel_all flow has completed its per-member cancel. Drains
the member from the flow’s pending_cancels set and, if empty,
removes the flow from the partition-level cancel_backlog
(Valkey: ff_ack_cancel_member FCALL; Postgres: table write —
default Unavailable until Wave 9).
Failures are swallowed by the caller — the cancel-backlog reconciler is the authoritative drain — but a typed error here lets the caller log a backend-scoped context string.
Sourcefn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
RFC-017 Stage E2: full-shape execution read used by the
GET /v1/executions/{id} HTTP route. Returns the legacy
ExecutionInfo wire shape (not the decoupled
ExecutionSnapshot) so the existing HTTP response bytes stay
identical across the migration.
Ok(None) ⇒ no such execution. Default Unavailable because
the Valkey HGETALL-and-parse is backend-specific.
Sourcefn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
RFC-017 Stage E2: narrow public_state read used by the
GET /v1/executions/{id}/state HTTP route. Returns Ok(None)
when the execution is missing. Default Unavailable.
Sourcefn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_lease_history<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<LeaseHistorySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to lease lifecycle events (acquired / renewed / expired / reclaimed / revoked) for the partition this backend is configured with.
Cross-partition fan-out is consumer-side merge: subscribe
per-partition backend instance and interleave on the read
side. Yields
Err(EngineError::StreamDisconnected { cursor }) on backend
disconnect; resume by calling this method again with the
returned cursor.
filter (#282): when filter.instance_tag is Some((k, v)),
only events whose execution carries tag k = v are yielded
(matching the crate::backend::ScannerFilter surface from
#122). Pass &ScannerFilter::default() for unfiltered
behaviour. Filtering happens inside the backend stream; the
crate::stream_events::LeaseHistorySubscription return type
is unchanged.
Sourcefn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_completion<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionSubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to completion events (terminal state transitions).
- Postgres: wraps the
ff_completion_eventoutbox + LISTEN/NOTIFY machinery. Durable via event-id cursor. - Valkey: wraps the RESP3
ff:dag:completionspubsub subscriber. Pubsub is at-most-once over the live subscription window; the cursor is always the empty sentinel. If you need at-least-once replay with durable cursor resume, use the Postgres backend (seedocs/POSTGRES_PARITY_MATRIX.mdrowsubscribe_completion).
filter (#282): see Self::subscribe_lease_history. Valkey
reuses the subscribe_completions_filtered per-event HGET
gate; Postgres filters inline against the outbox’s denormalised
instance_tag column.
Sourcefn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_signal_delivery<'life0, 'life1, 'async_trait>(
&'life0 self,
_cursor: StreamCursor,
_filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<SignalDeliverySubscription, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to signal-delivery events (satisfied / buffered / deduped).
filter (#282): see Self::subscribe_lease_history.
Subscribe to instance-tag events (tag attached / cleared).
Producer wiring is deferred per #311 audit (“no concrete
demand”); the trait method exists for API uniformity across
the four families. Backends currently return
EngineError::Unavailable.
Sourcefn mark_lease_expired_if_due<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn mark_lease_expired_if_due<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Lease-expiry scanner hook — mark an expired lease as reclaimable so another worker can redeem the execution. Atomic per call.
Valkey: FCALL ff_mark_lease_expired_if_due.
Postgres: single-row tx on ff_attempt + ff_exec_core (see
ff_backend_postgres::reconcilers::lease_expiry).
Sourcefn promote_delayed<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane: &'life1 LaneId,
_execution_id: &'life2 ExecutionId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn promote_delayed<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane: &'life1 LaneId,
_execution_id: &'life2 ExecutionId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Delayed-promoter scanner hook — promote a delayed execution to
eligible_now once its delay_until has passed.
Valkey: FCALL ff_promote_delayed.
Postgres: Wave 9 schema scope (no current impl).
Sourcefn close_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_waitpoint_id: &'life2 str,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn close_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_waitpoint_id: &'life2 str,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Pending-waitpoint-expiry scanner hook — close a pending waitpoint whose deadline has passed (wake the suspended execution with a timeout signal).
Valkey: FCALL ff_close_waitpoint.
Postgres: Wave 9 schema scope (no current impl).
Sourcefn expire_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_phase: ExpirePhase,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn expire_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_phase: ExpirePhase,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Shared hook for the attempt-timeout and execution-deadline
scanners — terminate or retry an execution whose wall-clock
budget has elapsed. phase discriminates which of the two
scanner paths is calling so the backend can preserve diagnostic
breadcrumbs without forking the surface.
Valkey: FCALL ff_expire_execution (with phase passed through
as an ARGV discriminator).
Postgres: single-row tx mirror of the Lua semantic for
AttemptTimeout; ExecutionDeadline is Wave 9 schema scope.
Sourcefn expire_suspension<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn expire_suspension<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Suspension-timeout scanner hook — expire a suspended execution whose suspension deadline has passed (wake with timeout).
Valkey: FCALL ff_expire_suspension.
Postgres: single-row tx on ff_suspend + ff_exec_core.
Sourcefn unblock_execution<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane_id: &'life1 LaneId,
_execution_id: &'life2 ExecutionId,
_expected_blocking_reason: &'life3 str,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn unblock_execution<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane_id: &'life1 LaneId,
_execution_id: &'life2 ExecutionId,
_expected_blocking_reason: &'life3 str,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Unblock-scanner hook — move an execution from a blocked ZSET back
to eligible_now once its blocking condition has cleared (budget
under limit, quota window drained, or a capable worker has come
online). expected_blocking_reason discriminates which of the
blocked:{budget,quota,route} sets the execution is leaving and
also fences against a stale unblock (Lua rejects if the core’s
blocking_reason no longer matches).
§Backend status
- Valkey: wraps
ff_unblock_execution(RFC-010 §6). Atomic single-slot FCALL on{p:N}. - Postgres:
Unavailable(structural). PG does not persist a per-reasonblocked:{budget,quota,route}index — scheduler eligibility is re-evaluated live via SQL predicates onff_exec_core+ budget / quota tables (seeff_backend_postgres::scheduler). Nothing to reconcile, so the engine’s PG scanner loop does not run this path; callers on a PG deployment receive the typedUnavailableper PR-7b/final contract. - SQLite:
Unavailablefor the same reason as Postgres.
Sourcefn drain_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_partition: Partition,
_flow_id: &'life1 FlowId,
_downstream_eid: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn drain_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_partition: Partition,
_flow_id: &'life1 FlowId,
_downstream_eid: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-016 Stage C — sibling-cancel group drain.
After edge_cancel_dispatcher has issued
EngineBackend::cancel_execution against every listed
sibling of a (flow_id, downstream_eid) group, this trait
method atomically removes the tuple from the partition-local
pending-cancel-groups index and clears the per-group flag +
members state.
Valkey: FCALL ff_drain_sibling_cancel_group (SREM +
HDEL in one Lua unit).
Postgres: Unavailable — PG’s Wave-6b
reconcilers::edge_cancel_dispatcher::dispatcher_tick
owns the equivalent row drain inside its own per-group
transaction; the Valkey-shaped per-tuple call is not used on
the PG scanner path.
SQLite: Unavailable (mirrors PG; RFC-023 scope).
Sourcefn reconcile_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_partition: Partition,
_flow_id: &'life1 FlowId,
_downstream_eid: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<SiblingCancelReconcileAction, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn reconcile_sibling_cancel_group<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_partition: Partition,
_flow_id: &'life1 FlowId,
_downstream_eid: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<SiblingCancelReconcileAction, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
RFC-016 Stage D — sibling-cancel group reconcile (Invariant Q6 safety net).
Re-examines a (flow_id, downstream_eid) tuple in the
partition-local pending-cancel-groups index and returns one
of three atomic dispositions — see
SiblingCancelReconcileAction — so the crash-recovery
reconciler can drain stale or completed tuples without
fighting the Stage-C dispatcher.
Valkey: FCALL ff_reconcile_sibling_cancel_group.
Postgres: Unavailable — PG’s
reconcilers::edge_cancel_reconciler::reconciler_tick
owns the row-level reconcile inside its own batched tx.
SQLite: Unavailable (mirrors PG).
Sourcefn reconcile_execution_index<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lanes: &'life1 [LaneId],
_filter: &'life2 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn reconcile_execution_index<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lanes: &'life1 [LaneId],
_filter: &'life2 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Index-reconciler pass — walks ff:idx:{p:N}:all_executions
and verifies each execution appears in the correct scheduling
sorted set (eligible / delayed / blocked:* / active /
suspended / terminal) for its current (lifecycle_phase, eligibility_state, ownership_state) triple. Phase 1 is
log-only; auto-fix is deferred to a later phase (RFC-010 §6.14).
Sourcefn reconcile_budget_counters<'life0, 'async_trait>(
&'life0 self,
_partition: Partition,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reconcile_budget_counters<'life0, 'async_trait>(
&'life0 self,
_partition: Partition,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Budget-reconciler pass — walks ff:budget:{b:M}:policies_idx,
reads each budget’s definition / usage / limits, and reconciles
the breached_at marker against hard limits. Resetting budgets
(non-zero reset_interval_ms) are skipped — they are handled
by the budget_reset scanner (cluster 2). Drops index entries
for budgets whose definition hash has been deleted (retention
purge / manual). RFC-008 §Budget Reconciliation, RFC-010 §6.5.
Sourcefn reconcile_quota_counters<'life0, 'async_trait>(
&'life0 self,
_partition: Partition,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reconcile_quota_counters<'life0, 'async_trait>(
&'life0 self,
_partition: Partition,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<ReconcileCounts, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Quota-reconciler pass — walks ff:quota:{q:M}:policies_idx,
trims expired entries from rate-limit sliding windows, and
recomputes each policy’s concurrency counter by walking its
admitted_set and pruning entries whose admission guard key
has TTLed out. RFC-008 §Quota Reconciliation, RFC-010 §6.6.
Sourcefn project_flow_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_flow_id: &'life1 FlowId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<bool, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn project_flow_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
_partition: Partition,
_flow_id: &'life1 FlowId,
_now_ms: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<bool, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Flow-projector scanner hook — sample flow members, derive an
aggregate public_flow_state + per-state counts, and write them
to the flow summary projection. Returns Ok(true) when the
summary was updated, Ok(false) when the flow had no members
or the index entry was defensively pruned (core missing).
The derived public_flow_state written here is distinct from
ff_flow_core.public_flow_state — the former is a rollup
dashboard field, the latter is the authoritative mutation-guard
state owned by create_flow / cancel_flow. See
ff_engine::scanner::flow_projector module doc for the
two-sources contract.
§Backend status
- Valkey: lifts the pre-PR-7b Rust-composed SCARD + SRANDMEMBER + per-member HGET + HSET summary pattern. No new Lua function; the aggregation is inherently multi-round-trip (cross-partition member reads) and atomicity is neither required nor achievable against 256 partitions.
- Postgres: SELECT aggregates from
ff_exec_coregrouped bypublic_statefor the flow’s members, INSERT … ON CONFLICT DO UPDATE intoff_flow_summary(migration 0019). One query per flow; partition-local aggregation. - SQLite:
Unavailableper RFC-023 Phase 3.5 (flow summary projection is a shared-deployment dashboard feature; local single-tenant SQLite deployments don’t need it).
Sourcefn trim_retention<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane_id: &'life1 LaneId,
_retention_ms: u64,
_now_ms: TimestampMs,
_batch_size: u32,
_filter: &'life2 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<u32, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn trim_retention<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_partition: Partition,
_lane_id: &'life1 LaneId,
_retention_ms: u64,
_now_ms: TimestampMs,
_batch_size: u32,
_filter: &'life2 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<u32, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Retention-trimmer scanner hook — delete terminal executions and
all their subordinate keys/rows once they are older than the
configured retention window. Returns the number of executions
actually purged in this call (so the scanner can loop when it
hits batch_size).
Retention trimming is inherently a scan-and-delete loop over time; the trait surface exists to remove engine-side Valkey coupling, not to atomise the operation into a single round-trip. Implementations may issue multiple round-trips (e.g. per-execution cascade across sibling tables); the trait contract is “make progress, bounded by batch_size”.
§Backend status
- Valkey: lifts the pre-PR-7b ZRANGEBYSCORE + per-exec
cascade-delete loop verbatim. Cluster-safe (all keys for one
execution live on the same
{p:N}slot). - Postgres:
DELETE FROM ff_exec_corefor terminal rows past the cutoff plus explicit cascade DELETEs on every execution-scoped sibling table (no FK CASCADE in the schema). Single transaction per batch. - SQLite:
Unavailableper RFC-023 Phase 3.5. Single-tenant local deployments typically manage their own DB lifecycle and do not need a retention scanner.
Sourcefn read_exec_core_fields<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_fields: &'life2 [&'life3 str],
) -> Pin<Box<dyn Future<Output = Result<HashMap<String, Option<String>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
fn read_exec_core_fields<'life0, 'life1, 'life2, 'life3, 'async_trait>(
&'life0 self,
_partition: Partition,
_execution_id: &'life1 ExecutionId,
_fields: &'life2 [&'life3 str],
) -> Pin<Box<dyn Future<Output = Result<HashMap<String, Option<String>>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
'life3: 'async_trait,
Point-read N fields from the exec_core hash for a given
execution. Returns a map of field-name → OptionHGET/HMGET on ExecKeyContext::core()
route through this trait method (cairn #436 / PR-7b Wave 0a).
Field values are coerced to String at the trait boundary for
wire compatibility with the Valkey HGET shape. Consumers parse
specific fields (lane_id, current_attempt_index, etc.) from
the returned strings as needed.
- Valkey:
HMGET exec_core_key f1 f2 ..., zipping names to values. - Postgres:
SELECTagainstff_exec_corewith dynamic column extraction; fields inraw_fieldsJSONB are extracted via->>. - SQLite: equivalent with
json_extract(raw_fields, '$.field').
Default body returns Unavailable so non-v0.13 backends remain
compile-compatible.
Sourcefn server_time_ms<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<u64, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn server_time_ms<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<u64, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Returns the backend’s current wall-clock epoch milliseconds.
Used by 15 scanners to compute “due” thresholds before issuing
per-partition due-scans. Previously a Valkey-only helper in
ff-engine’s scanner::lease_expiry issuing TIME; this
trait method is the backend-agnostic replacement (cairn #436 /
PR-7b Wave 0a).
Every in-tree backend overrides. The default falls back to
SystemTime::now() so out-of-tree EngineBackend impls (e.g.
cairn mocks, test doubles) stay source-compatible across v0.12
→ v0.13.
- Valkey:
TIMEcommand. - Postgres:
SELECT (EXTRACT(EPOCH FROM clock_timestamp()) * 1000)::bigint(notnow()—now()is the transaction start timestamp, which is stale under any long-running tx; scanners need the true wall-clock read). - SQLite:
SELECT CAST((julianday('now') - 2440587.5) * 86400000 AS INTEGER).