Skip to main content

ff_core/
completion_backend.rs

1//! The [`CompletionBackend`] trait — backend-agnostic completion
2//! subscription surface (issue #90).
3//!
4//! ff-engine's DAG promotion listener used to `SUBSCRIBE
5//! ff:dag:completions` directly on a dedicated RESP3 `ferriskey::Client`
6//! — a Valkey-specific wire detail baked into an otherwise
7//! backend-agnostic crate. RFC-012 trait-ifies the write surface via
8//! [`EngineBackend`]; this trait closes the symmetric gap on the
9//! read-side notification path. A Postgres backend would implement
10//! this over `LISTEN/NOTIFY`; the Valkey backend keeps the pubsub
11//! wiring but hides the channel string behind the trait.
12//!
13//! # Object safety
14//!
15//! `CompletionBackend` is object-safe: the single method is
16//! `async fn` behind `#[async_trait]` and takes `&self`. Consumers
17//! can hold `Arc<dyn CompletionBackend>` alongside
18//! `Arc<dyn EngineBackend>` for heterogeneous-backend deployments.
19//! A compile-time assertion ([`_assert_dyn_compatible`]) guards
20//! future method additions against accidental dyn-incompatibility.
21//!
22//! # Fanout policy
23//!
24//! Each `subscribe_completions()` call returns an independent
25//! [`CompletionStream`]. Backends are free to implement fanout
26//! however they like — the Valkey backend today opens one
27//! `SUBSCRIBE` per call on a dedicated RESP3 connection. Callers
28//! that want to share a single subscription across many consumers
29//! should wrap the stream in a broadcast themselves; the trait does
30//! not assume shared-subscription semantics.
31//!
32//! # Reconnect + transient errors
33//!
34//! The stream yields [`CompletionPayload`] (not `Result<_, _>`)
35//! indefinitely until the consumer drops it. Backends handle
36//! reconnect, resubscribe, and transient-error logging internally;
37//! completions produced during a reconnect window may be missed
38//! and are picked up by ff-engine's `dependency_reconciler` safety
39//! net. Issue #90 does NOT promise at-least-once delivery through
40//! the stream — that contract stays with the reconciler scanner.
41//!
42//! # Scope: no `publish_completion`
43//!
44//! The publisher side of the completion channel is backend-internal.
45//! On Valkey, `ff_complete_execution` / `ff_fail_execution` /
46//! `ff_cancel_execution` PUBLISH from inside Lua (FCALL-atomic); on
47//! Postgres, the stored procedure would NOTIFY. Rust never
48//! publishes. There is intentionally no `publish_completion` method
49//! on this trait — if you find yourself wanting one, you are on the
50//! wrong side of the atomicity boundary.
51
52use std::pin::Pin;
53
54use async_trait::async_trait;
55use futures_core::Stream;
56
57use crate::backend::{CompletionPayload, ScannerFilter};
58use crate::engine_error::EngineError;
59
60/// A backend-agnostic stream of completion events.
61///
62/// Boxed because the concrete stream type is backend-specific
63/// (tokio `mpsc::Receiver` adapter on Valkey; `LISTEN` notification
64/// adapter on Postgres). `Unpin` keeps `.next().await` ergonomic in
65/// loop bodies without manual pinning. `Send` so the stream is
66/// usable from any tokio task.
67pub type CompletionStream = Pin<Box<dyn Stream<Item = CompletionPayload> + Send + Unpin>>;
68
69/// Backend surface for subscribing to completion events.
70///
71/// The channel string (Valkey: `ff:dag:completions`) is a backend
72/// implementation detail and deliberately does NOT appear on the
73/// trait. Callers route through `subscribe_completions()` and
74/// consume [`CompletionPayload`]s; they never see the wire channel.
75#[async_trait]
76pub trait CompletionBackend: Send + Sync + 'static {
77    /// Subscribe to the completion event stream.
78    ///
79    /// Each call opens its own subscription (per-call bounded mpsc
80    /// fanout on Valkey). The returned stream is independent of all
81    /// other outstanding streams; dropping it releases the
82    /// backend-side subscription.
83    ///
84    /// Returns `EngineError` only for synchronous setup failures
85    /// (e.g. connection pool exhausted at subscribe-time). Transient
86    /// errors after the stream is returned are handled silently by
87    /// the backend's reconnect loop — callers do not see them.
88    async fn subscribe_completions(&self) -> Result<CompletionStream, EngineError>;
89
90    /// Subscribe to the completion event stream with a per-event
91    /// [`ScannerFilter`] applied at the backend boundary (issue #122).
92    ///
93    /// Identical contract to [`Self::subscribe_completions`] except
94    /// that events whose execution does not match `filter` are
95    /// dropped by the backend before reaching the stream.
96    ///
97    /// # Default implementation
98    ///
99    /// The default body delegates to `subscribe_completions` when
100    /// `filter.is_noop()` (the predicate would accept every event —
101    /// no cost to the per-push filter path). When the filter is
102    /// non-trivial the default returns
103    /// [`EngineError::Unavailable`] — a default *can't* implement
104    /// the filter correctly without backend-specific HGET routing,
105    /// and a silently-unfiltered stream would break tenant isolation.
106    /// Backends that implement the filter (today: Valkey) override
107    /// this method. External backends that need isolation MUST
108    /// override.
109    ///
110    /// # Cost (Valkey backend)
111    ///
112    /// Per push frame: one HGET on `exec_core` when `filter.namespace`
113    /// is set, and/or one HGET on `ff:exec:{p}:<eid>:tags` when
114    /// `filter.instance_tag` is set (2 HGETs total when both are
115    /// set). The backend short-circuits on the cheaper namespace
116    /// check first.
117    ///
118    /// # Gotcha: completions are only published for executions that
119    /// belong to a flow
120    ///
121    /// The Lua terminal emits `PUBLISH ff:dag:completions <payload>`
122    /// only when `core.flow_id` is set on the execution (see
123    /// `crates/ff-script/src/flowfabric.lua` — the PUBLISH is gated
124    /// on `is_set(core.flow_id)`). Solo / standalone executions
125    /// submitted without a flow never hit the channel, so a
126    /// completion subscriber will observe **nothing** for them —
127    /// the terminal state lands in `exec_core` as usual and the
128    /// `dependency_reconciler` interval scan picks it up, but the
129    /// push stream stays silent. Smoke tests that submit a single
130    /// execution and wait on a completion subscription will hang
131    /// indefinitely; either submit under a flow or poll
132    /// `describe_execution` instead.
133    async fn subscribe_completions_filtered(
134        &self,
135        filter: &ScannerFilter,
136    ) -> Result<CompletionStream, EngineError> {
137        if filter.is_noop() {
138            self.subscribe_completions().await
139        } else {
140            Err(EngineError::Unavailable {
141                op: "subscribe_completions_filtered (filter non-trivial; backend did not override)",
142            })
143        }
144    }
145}
146
147/// Object-safety assertion: `dyn CompletionBackend` compiles iff
148/// every method is dyn-compatible. Compile-time guard so a future
149/// trait change that accidentally breaks dyn-safety fails the build
150/// at this site rather than at every downstream
151/// `Arc<dyn CompletionBackend>` use. Mirrors the sibling assertion
152/// in `engine_backend.rs`.
153#[allow(dead_code)]
154fn _assert_dyn_compatible(_: &dyn CompletionBackend) {}