Skip to main content

CompletionBackend

Trait CompletionBackend 

Source
pub trait CompletionBackend:
    Send
    + Sync
    + 'static {
    // Required method
    fn subscribe_completions<'life0, 'async_trait>(
        &'life0 self,
    ) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait;

    // Provided method
    fn subscribe_completions_filtered<'life0, 'life1, 'async_trait>(
        &'life0 self,
        filter: &'life1 ScannerFilter,
    ) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>
       where Self: 'async_trait,
             'life0: 'async_trait,
             'life1: 'async_trait { ... }
}
Expand description

Backend surface for subscribing to completion events.

The channel string (Valkey: ff:dag:completions) is a backend implementation detail and deliberately does NOT appear on the trait. Callers route through subscribe_completions() and consume CompletionPayloads; they never see the wire channel.

Required Methods§

Source

fn subscribe_completions<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Subscribe to the completion event stream.

Each call opens its own subscription (per-call bounded mpsc fanout on Valkey). The returned stream is independent of all other outstanding streams; dropping it releases the backend-side subscription.

Returns EngineError only for synchronous setup failures (e.g. connection pool exhausted at subscribe-time). Transient errors after the stream is returned are handled silently by the backend’s reconnect loop — callers do not see them.

Provided Methods§

Source

fn subscribe_completions_filtered<'life0, 'life1, 'async_trait>( &'life0 self, filter: &'life1 ScannerFilter, ) -> Pin<Box<dyn Future<Output = Result<CompletionStream, EngineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Subscribe to the completion event stream with a per-event ScannerFilter applied at the backend boundary (issue #122).

Identical contract to Self::subscribe_completions except that events whose execution does not match filter are dropped by the backend before reaching the stream.

§Default implementation

The default body delegates to subscribe_completions when filter.is_noop() (the predicate would accept every event — no cost to the per-push filter path). When the filter is non-trivial the default returns EngineError::Unavailable — a default can’t implement the filter correctly without backend-specific HGET routing, and a silently-unfiltered stream would break tenant isolation. Backends that implement the filter (today: Valkey) override this method. External backends that need isolation MUST override.

§Cost (Valkey backend)

Per push frame: one HGET on exec_core when filter.namespace is set, and/or one HGET on ff:exec:{p}:<eid>:tags when filter.instance_tag is set (2 HGETs total when both are set). The backend short-circuits on the cheaper namespace check first.

§Gotcha: completions are only published for executions that

belong to a flow

The Lua terminal emits PUBLISH ff:dag:completions <payload> only when core.flow_id is set on the execution (see crates/ff-script/src/flowfabric.lua — the PUBLISH is gated on is_set(core.flow_id)). Solo / standalone executions submitted without a flow never hit the channel, so a completion subscriber will observe nothing for them — the terminal state lands in exec_core as usual and the dependency_reconciler interval scan picks it up, but the push stream stays silent. Smoke tests that submit a single execution and wait on a completion subscription will hang indefinitely; either submit under a flow or poll describe_execution instead.

Implementors§