pub struct PostgresBackend { /* private fields */ }Expand description
Postgres-backed EngineBackend.
Wave 0 shape: holds a sqlx::PgPool, the deployment’s
PartitionConfig (Q5 — partition column survives on Postgres
with hash partitioning across the same 256 slots Valkey uses),
and an optional ff_observability::Metrics handle mirroring
[ff_backend_valkey::ValkeyBackend]. Future waves add the
StreamNotifier handle once Wave 4 wires up LISTEN/NOTIFY.
Implementations§
Source§impl PostgresBackend
impl PostgresBackend
Sourcepub async fn connect(
config: BackendConfig,
) -> Result<Arc<dyn EngineBackend>, EngineError>
pub async fn connect( config: BackendConfig, ) -> Result<Arc<dyn EngineBackend>, EngineError>
Dial Postgres with BackendConfig and return the backend as
Arc<dyn EngineBackend>. Modeled on
[ff_backend_valkey::ValkeyBackend::connect] so ff-server /
SDK call sites can swap backends without changing the
constructor shape.
Wave 0: builds the pool and constructs the backend. Does
NOT run migrations (Q12 — operator out-of-band). Does NOT run
the schema-version check (Wave 3 adds the version const and
wires check_schema_version in). Does NOT start the LISTEN
task (Wave 4).
Returns EngineError::Unavailable when the config’s
connection arm is not Postgres.
Sourcepub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self>
pub fn from_pool(pool: PgPool, partition_config: PartitionConfig) -> Arc<Self>
Test / advanced constructor: build a PostgresBackend from an
already-constructed PgPool + explicit partition config. No
network I/O. Useful for integration tests against a shared
pool and for a future migration CLI that wants to reuse a pool
across migrate-run + smoke-check.
Sourcepub async fn connect_with_metrics(
config: BackendConfig,
partition_config: PartitionConfig,
metrics: Arc<Metrics>,
) -> Result<Arc<Self>, EngineError>
pub async fn connect_with_metrics( config: BackendConfig, partition_config: PartitionConfig, metrics: Arc<Metrics>, ) -> Result<Arc<Self>, EngineError>
RFC-017 Wave 8 Stage E1: dial Postgres with an explicit
PartitionConfig + shared ff_observability::Metrics.
Mirrors [ff_backend_valkey::ValkeyBackend::connect_with_metrics]
so ff-server::Server::start_with_metrics can wire the Postgres
branch without reaching into the pool builder directly.
Returns a concrete Arc<Self> rather than Arc<dyn EngineBackend>
so the caller can cast to the trait object after any additional
field installs (parallel to the Valkey path which calls
with_scheduler / with_stream_semaphore_permits before the
cast). Stage E1 does NOT run apply_migrations — schema
provisioning is an operator concern (matches the Wave 0 contract
on Self::connect).
Sourcepub fn with_scanners(self: &mut Arc<Self>, cfg: PostgresScannerConfig) -> bool
pub fn with_scanners(self: &mut Arc<Self>, cfg: PostgresScannerConfig) -> bool
RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers as
background tick loops. Returns true if the scanner handle
was installed; false if the Arc<Self> has outstanding
clones (mirrors the Valkey with_* pattern). Callers must
invoke this before publishing the Arc<dyn EngineBackend> so
the underlying Arc::get_mut succeeds.
Sourcepub fn pool(&self) -> &PgPool
pub fn pool(&self) -> &PgPool
Accessor for the underlying PgPool. Stage E1 uses this so
ff-server::Server::start_with_metrics can run
apply_migrations on the same pool before handing the backend
out as Arc<dyn EngineBackend>.
Sourcepub async fn create_execution(
&self,
args: CreateExecutionArgs,
) -> Result<ExecutionId, EngineError>
pub async fn create_execution( &self, args: CreateExecutionArgs, ) -> Result<ExecutionId, EngineError>
Create one execution row (+ seed the lane registry if new).
RFC-017 Stage A: this inherent method is retained as a
thin wrapper around the module-level impl so existing in-tree
callers (ff-server request handlers, integration tests) keep
compiling. The trait-lifted entry point is
EngineBackend::create_execution below, which calls the
same impl. Return shape differs — inherent returns
ExecutionId, trait returns
CreateExecutionResult per RFC-017 §5 — so we cannot simply
replace the inherent method. A follow-up PR may deprecate
this inherent alongside the broader ingress shape alignment.
pub async fn create_flow( &self, args: &CreateFlowArgs, ) -> Result<CreateFlowResult, EngineError>
pub async fn add_execution_to_flow( &self, args: &AddExecutionToFlowArgs, ) -> Result<AddExecutionToFlowResult, EngineError>
pub async fn stage_dependency_edge( &self, args: &StageDependencyEdgeArgs, ) -> Result<StageDependencyEdgeResult, EngineError>
pub async fn apply_dependency_to_child( &self, args: &ApplyDependencyToChildArgs, ) -> Result<ApplyDependencyToChildResult, EngineError>
Trait Implementations§
Source§impl CompletionBackend for PostgresBackend
impl CompletionBackend for PostgresBackend
Source§fn subscribe_completions<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe_completions<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn subscribe_completions_filtered<'life0, 'life1, 'async_trait>(
&'life0 self,
filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_completions_filtered<'life0, 'life1, 'async_trait>(
&'life0 self,
filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
ScannerFilter applied at the backend boundary (issue #122). Read moreSource§impl EngineBackend for PostgresBackend
impl EngineBackend for PostgresBackend
Source§fn create_execution<'life0, 'async_trait>(
&'life0 self,
args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_execution<'life0, 'async_trait>(
&'life0 self,
args: CreateExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
RFC-017 Wave 8 Stage E1: lift the inherent
PostgresBackend::create_execution onto the trait so
ff-server’s migrated HTTP handler can dispatch to Postgres.
Post-insert the row is idempotent; the Postgres impl does not
distinguish Created from Duplicate at the helper level
(both paths commit and return the execution id), so we always
surface Created { public_state: Waiting } here. A follow-up
may lift the distinction if a consumer relies on it.
Source§fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_for_worker<'life0, 'async_trait>(
&'life0 self,
args: ClaimForWorkerArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimForWorkerOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
RFC-017 Wave 8 Stage E3 (§4 row 9, §7): forward the claim to the
Postgres-native admission pipeline. Returns NoWork when no
eligible execution is admissible this scan cycle. Budget
breaches surface as NoWork (leaving the row eligible for a
retry by another worker); validation-class rejections
(malformed partition, unknown kid) surface as typed
EngineError variants mapped to the Server’s 400/503 arms.
Source§fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn shutdown_prepare<'life0, 'async_trait>(
&'life0 self,
grace: Duration,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
RFC-017 Wave 8 Stage E3: drain the scanner supervisor’s
reconciler tasks up to grace, then close the sqlx pool.
Matches the Valkey backend’s shutdown_prepare contract —
bounded best-effort drain, never returns an error.
Source§fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn claim<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
lane: &'life1 LaneId,
capabilities: &'life2 CapabilitySet,
policy: ClaimPolicy,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Ok(None) when no work is currently
available; Err only on transport or input-validation faults.Source§fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn renew<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<LeaseRenewal, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
State::StaleLease / State::LeaseExpired
when the lease has been stolen or timed out.Source§fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn progress<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
percent: Option<u8>,
message: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn append_frame<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
frame: Frame,
) -> Pin<Box<dyn Future<Output = Result<AppendFrameOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn complete<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
payload: Option<Vec<u8>>,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
handle (round-4 M-D2) so callers
can retry under EngineError::Transport without losing the
cookie. Payload is Option<Vec<u8>> per the note above.Source§fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn fail<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: FailureReason,
classification: FailureClass,
) -> Pin<Box<dyn Future<Output = Result<FailOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
FailOutcome
so the caller learns whether a retry was scheduled.Source§fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn cancel<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
reason: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Source§fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn suspend<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
args: SuspendArgs,
) -> Pin<Box<dyn Future<Output = Result<SuspendOutcome, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_handle: &'life1 Handle,
_waitpoint_key: &'life2 str,
_expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn create_waitpoint<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_handle: &'life1 Handle,
_waitpoint_key: &'life2 str,
_expires_in: Duration,
) -> Pin<Box<dyn Future<Output = Result<PendingWaitpoint, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Source§fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn observe_signals<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<Vec<ResumeSignal>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn claim_from_reclaim<'life0, 'async_trait>(
&'life0 self,
token: ReclaimToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_from_reclaim<'life0, 'async_trait>(
&'life0 self,
token: ReclaimToken,
) -> Pin<Box<dyn Future<Output = Result<Option<Handle>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Ok(None) when the grant’s target execution is no longer
resumable (already reclaimed, terminal, etc.).Source§fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn delay<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
delay_until: TimestampMs,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
delay_until, releasing the lease.Source§fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn wait_children<'life0, 'life1, 'async_trait>(
&'life0 self,
handle: &'life1 Handle,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_execution<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Ok(None) ⇒ no such execution.Source§fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn describe_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Ok(None) ⇒ no such flow.Source§fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn list_edges<'life0, 'life1, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
direction: EdgeDirection,
) -> Pin<Box<dyn Future<Output = Result<Vec<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
edge:<edge_id> hash. Read moreSource§fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn describe_edge<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
edge_id: &'life2 EdgeId,
) -> Pin<Box<dyn Future<Output = Result<Option<EdgeSnapshot>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Source§fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn resolve_execution_flow_id<'life0, 'life1, 'async_trait>(
&'life0 self,
eid: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<FlowId>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn list_flows<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_flows<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListFlowsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn list_lanes<'life0, 'async_trait>(
&'life0 self,
cursor: Option<LaneId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_lanes<'life0, 'async_trait>(
&'life0 self,
cursor: Option<LaneId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListLanesPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn list_suspended<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_suspended<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListSuspendedPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
reason_code populated (issue
#183). Read moreSource§fn list_executions<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_executions<'life0, 'async_trait>(
&'life0 self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<ListExecutionsPage, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn deliver_signal<'life0, 'async_trait>(
&'life0 self,
args: DeliverSignalArgs,
) -> Pin<Box<dyn Future<Output = Result<DeliverSignalResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn claim_resumed_execution<'life0, 'async_trait>(
&'life0 self,
args: ClaimResumedExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ClaimResumedExecutionResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Self::deliver_signal) and now needs a worker to pick up the
same attempt index. Read moreSource§fn create_flow<'life0, 'async_trait>(
&'life0 self,
args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn create_flow<'life0, 'async_trait>(
&'life0 self,
args: CreateFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn add_execution_to_flow<'life0, 'async_trait>(
&'life0 self,
args: AddExecutionToFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<AddExecutionToFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn stage_dependency_edge<'life0, 'async_trait>(
&'life0 self,
args: StageDependencyEdgeArgs,
) -> Pin<Box<dyn Future<Output = Result<StageDependencyEdgeResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
graph_revision — stale rev returns Contention(StaleGraphRevision).Source§fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn apply_dependency_to_child<'life0, 'async_trait>(
&'life0 self,
args: ApplyDependencyToChildArgs,
) -> Pin<Box<dyn Future<Output = Result<ApplyDependencyToChildResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn backend_label(&self) -> &'static str
fn backend_label(&self) -> &'static str
"unknown" so legacy impl EngineBackend blocks that
have not upgraded keep compiling; every in-tree backend
overrides — ValkeyBackend → "valkey", PostgresBackend →
"postgres".Source§fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn ping<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
PING;
Postgres: SELECT 1.Source§fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn cancel_flow<'life0, 'life1, 'async_trait>(
&'life0 self,
id: &'life1 FlowId,
policy: CancelFlowPolicy,
wait: CancelFlowWait,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
downstream_execution_id: &'life2 ExecutionId,
policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn set_edge_group_policy<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
flow_id: &'life1 FlowId,
downstream_execution_id: &'life2 ExecutionId,
policy: EdgeDependencyPolicy,
) -> Pin<Box<dyn Future<Output = Result<SetEdgeGroupPolicyResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
add_dependency(... -> downstream_execution_id) — the backend
rejects with EngineError::Conflict if edges have already
been staged for this group. Read moreSource§fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn report_usage<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_handle: &'life1 Handle,
budget: &'life2 BudgetId,
dimensions: UsageDimensions,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
ReportUsageResult variant; backends enforce
idempotency via the caller-supplied
[UsageDimensions::dedup_key] (RFC-012 §R7.2.3 — replaces
the pre-Round-7 AdmissionDecision return).Source§fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn rotate_waitpoint_hmac_secret_all<'life0, 'async_trait>(
&'life0 self,
args: RotateWaitpointHmacSecretAllArgs,
) -> Pin<Box<dyn Future<Output = Result<RotateWaitpointHmacSecretAllResult, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
from: StreamCursor,
to: StreamCursor,
count_limit: u64,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn tail_stream<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
after: StreamCursor,
block_ms: u64,
count_limit: u64,
visibility: TailVisibility,
) -> Pin<Box<dyn Future<Output = Result<StreamFrames, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn read_summary<'life0, 'life1, 'async_trait>(
&'life0 self,
execution_id: &'life1 ExecutionId,
attempt_index: AttemptIndex,
) -> Pin<Box<dyn Future<Output = Result<Option<SummaryDocument>, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn cancel_execution<'life0, 'async_trait>(
&'life0 self,
_args: CancelExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelExecutionResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn change_priority<'life0, 'async_trait>(
&'life0 self,
_args: ChangePriorityArgs,
) -> Pin<Box<dyn Future<Output = Result<ChangePriorityResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn replay_execution<'life0, 'async_trait>(
&'life0 self,
_args: ReplayExecutionArgs,
) -> Pin<Box<dyn Future<Output = Result<ReplayExecutionResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn revoke_lease<'life0, 'async_trait>(
&'life0 self,
_args: RevokeLeaseArgs,
) -> Pin<Box<dyn Future<Output = Result<RevokeLeaseResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn create_budget<'life0, 'async_trait>(
&'life0 self,
_args: CreateBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateBudgetResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn reset_budget<'life0, 'async_trait>(
&'life0 self,
_args: ResetBudgetArgs,
) -> Pin<Box<dyn Future<Output = Result<ResetBudgetResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn create_quota_policy<'life0, 'async_trait>(
&'life0 self,
_args: CreateQuotaPolicyArgs,
) -> Pin<Box<dyn Future<Output = Result<CreateQuotaPolicyResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
Source§fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn get_budget_status<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 BudgetId,
) -> Pin<Box<dyn Future<Output = Result<BudgetStatus, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Source§fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn report_usage_admin<'life0, 'life1, 'async_trait>(
&'life0 self,
_budget: &'life1 BudgetId,
_args: ReportUsageAdminArgs,
) -> Pin<Box<dyn Future<Output = Result<ReportUsageResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
report_usage (row 9 + RFC-017 §5 round-1 F4).
Distinct from the existing Self::report_usage which takes
a worker handle — the admin path has no lease context.Source§fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn get_execution_result<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
Ok(None) when the execution is missing, not
yet complete, or its payload was trimmed by retention policy.Source§fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn list_pending_waitpoints<'life0, 'async_trait>(
&'life0 self,
_args: ListPendingWaitpointsArgs,
) -> Pin<Box<dyn Future<Output = Result<ListPendingWaitpointsResult, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
PendingWaitpointInfo shape; Stage D ships the §8 HMAC
sanitisation + (token_kid, token_fingerprint) schema.Source§fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
fn cancel_flow_header<'life0, 'async_trait>(
&'life0 self,
_args: CancelFlowArgs,
) -> Pin<Box<dyn Future<Output = Result<CancelFlowHeader, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
Self: 'async_trait,
cancel_flow — run the
atomic flow-state flip (Valkey: ff_cancel_flow FCALL; Postgres:
cancel_flow_once tx), decode policy + membership, and surface
the flow_already_terminal idempotency branch as a first-class
[CancelFlowHeader::AlreadyTerminal] so the Server can build
the wire CancelFlowResult without reaching for a raw
Client. Separate from the existing
EngineBackend::cancel_flow entry point (which takes the
enum-typed (policy, wait) split and returns the wait-collapsed
CancelFlowResult) because the Server owns its own
wait-dispatch + member-cancel machinery via
EngineBackend::cancel_execution + backlog ack. Read moreSource§fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
fn ack_cancel_member<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
_flow_id: &'life1 FlowId,
_execution_id: &'life2 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<(), EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Self: 'async_trait,
cancel_all flow has completed its per-member cancel. Drains
the member from the flow’s pending_cancels set and, if empty,
removes the flow from the partition-level cancel_backlog
(Valkey: ff_ack_cancel_member FCALL; Postgres: table write —
default Unavailable until Wave 9). Read moreSource§fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn read_execution_info<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<ExecutionInfo>, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
GET /v1/executions/{id} HTTP route. Returns the legacy
ExecutionInfo wire shape (not the decoupled
ExecutionSnapshot) so the existing HTTP response bytes stay
identical across the migration. Read moreSource§fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
fn read_execution_state<'life0, 'life1, 'async_trait>(
&'life0 self,
_id: &'life1 ExecutionId,
) -> Pin<Box<dyn Future<Output = Result<Option<PublicState>, EngineError>> + Send + 'async_trait>>where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait,
public_state read used by the
GET /v1/executions/{id}/state HTTP route. Returns Ok(None)
when the execution is missing. Default Unavailable.Auto Trait Implementations§
impl Freeze for PostgresBackend
impl !RefUnwindSafe for PostgresBackend
impl Send for PostgresBackend
impl Sync for PostgresBackend
impl Unpin for PostgresBackend
impl UnsafeUnpin for PostgresBackend
impl !UnwindSafe for PostgresBackend
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more