pub trait CompletionBackend:
Send
+ Sync
+ 'static {
// Required method
fn subscribe_completions<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided method
fn subscribe_completions_filtered<'life0, 'life1, 'async_trait>(
&'life0 self,
filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait { ... }
}Expand description
Backend surface for subscribing to completion events.
The channel string (Valkey: ff:dag:completions) is a backend
implementation detail and deliberately does NOT appear on the
trait. Callers route through subscribe_completions() and
consume CompletionPayloads; they never see the wire channel.
Required Methods§
Sourcefn subscribe_completions<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe_completions<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Subscribe to the completion event stream.
Each call opens its own subscription (per-call bounded mpsc fanout on Valkey). The returned stream is independent of all other outstanding streams; dropping it releases the backend-side subscription.
Returns EngineError only for synchronous setup failures
(e.g. connection pool exhausted at subscribe-time). Transient
errors after the stream is returned are handled silently by
the backend’s reconnect loop — callers do not see them.
Provided Methods§
Sourcefn subscribe_completions_filtered<'life0, 'life1, 'async_trait>(
&'life0 self,
filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn subscribe_completions_filtered<'life0, 'life1, 'async_trait>(
&'life0 self,
filter: &'life1 ScannerFilter,
) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Subscribe to the completion event stream with a per-event
ScannerFilter applied at the backend boundary (issue #122).
Identical contract to Self::subscribe_completions except
that events whose execution does not match filter are
dropped by the backend before reaching the stream.
§Default implementation
The default body delegates to subscribe_completions when
filter.is_noop() (the predicate would accept every event —
no cost to the per-push filter path). When the filter is
non-trivial the default returns
EngineError::Unavailable — a default can’t implement
the filter correctly without backend-specific HGET routing,
and a silently-unfiltered stream would break tenant isolation.
Backends that implement the filter (today: Valkey) override
this method. External backends that need isolation MUST
override.
§Cost (Valkey backend)
Per push frame: one HGET on exec_core when filter.namespace
is set, and/or one HGET on ff:exec:{p}:<eid>:tags when
filter.instance_tag is set (2 HGETs total when both are
set). The backend short-circuits on the cheaper namespace
check first.
§Gotcha: completions are only published for executions that
belong to a flow
The Lua terminal emits PUBLISH ff:dag:completions <payload>
only when core.flow_id is set on the execution (see
crates/ff-script/src/flowfabric.lua — the PUBLISH is gated
on is_set(core.flow_id)). Solo / standalone executions
submitted without a flow never hit the channel, so a
completion subscriber will observe nothing for them —
the terminal state lands in exec_core as usual and the
dependency_reconciler interval scan picks it up, but the
push stream stays silent. Smoke tests that submit a single
execution and wait on a completion subscription will hang
indefinitely; either submit under a flow or poll
describe_execution instead.