Skip to main content

ff_core/
completion_backend.rs

1//! The [`CompletionBackend`] trait — backend-agnostic completion
2//! subscription surface (issue #90).
3//!
4//! ff-engine's DAG promotion listener used to `SUBSCRIBE
5//! ff:dag:completions` directly on a dedicated RESP3 `ferriskey::Client`
6//! — a Valkey-specific wire detail baked into an otherwise
7//! backend-agnostic crate. RFC-012 trait-ifies the write surface via
8//! [`EngineBackend`]; this trait closes the symmetric gap on the
9//! read-side notification path. A Postgres backend would implement
10//! this over `LISTEN/NOTIFY`; the Valkey backend keeps the pubsub
11//! wiring but hides the channel string behind the trait.
12//!
13//! # Object safety
14//!
15//! `CompletionBackend` is object-safe: the single method is
16//! `async fn` behind `#[async_trait]` and takes `&self`. Consumers
17//! can hold `Arc<dyn CompletionBackend>` alongside
18//! `Arc<dyn EngineBackend>` for heterogeneous-backend deployments.
19//! A compile-time assertion ([`_assert_dyn_compatible`]) guards
20//! future method additions against accidental dyn-incompatibility.
21//!
22//! # Fanout policy
23//!
24//! Each `subscribe_completions()` call returns an independent
25//! [`CompletionStream`]. Backends are free to implement fanout
26//! however they like — the Valkey backend today opens one
27//! `SUBSCRIBE` per call on a dedicated RESP3 connection. Callers
28//! that want to share a single subscription across many consumers
29//! should wrap the stream in a broadcast themselves; the trait does
30//! not assume shared-subscription semantics.
31//!
32//! # Reconnect + transient errors
33//!
34//! The stream yields [`CompletionPayload`] (not `Result<_, _>`)
35//! indefinitely until the consumer drops it. Backends handle
36//! reconnect, resubscribe, and transient-error logging internally;
37//! completions produced during a reconnect window may be missed
38//! and are picked up by ff-engine's `dependency_reconciler` safety
39//! net. Issue #90 does NOT promise at-least-once delivery through
40//! the stream — that contract stays with the reconciler scanner.
41//!
42//! # Scope: no `publish_completion`
43//!
44//! The publisher side of the completion channel is backend-internal.
45//! On Valkey, `ff_complete_execution` / `ff_fail_execution` /
46//! `ff_cancel_execution` PUBLISH from inside Lua (FCALL-atomic); on
47//! Postgres, the stored procedure would NOTIFY. Rust never
48//! publishes. There is intentionally no `publish_completion` method
49//! on this trait — if you find yourself wanting one, you are on the
50//! wrong side of the atomicity boundary.
51
52use std::pin::Pin;
53
54use async_trait::async_trait;
55use futures_core::Stream;
56
57use crate::backend::{CompletionPayload, ScannerFilter};
58use crate::engine_error::EngineError;
59
60/// A backend-agnostic stream of completion events.
61///
62/// Boxed because the concrete stream type is backend-specific
63/// (tokio `mpsc::Receiver` adapter on Valkey; `LISTEN` notification
64/// adapter on Postgres). `Unpin` keeps `.next().await` ergonomic in
65/// loop bodies without manual pinning. `Send` so the stream is
66/// usable from any tokio task.
67pub type CompletionStream = Pin<Box<dyn Stream<Item = CompletionPayload> + Send + Unpin>>;
68
69/// Backend surface for subscribing to completion events.
70///
71/// The channel string (Valkey: `ff:dag:completions`) is a backend
72/// implementation detail and deliberately does NOT appear on the
73/// trait. Callers route through `subscribe_completions()` and
74/// consume [`CompletionPayload`]s; they never see the wire channel.
75#[async_trait]
76pub trait CompletionBackend: Send + Sync + 'static {
77    /// Subscribe to the completion event stream.
78    ///
79    /// Each call opens its own subscription (per-call bounded mpsc
80    /// fanout on Valkey). The returned stream is independent of all
81    /// other outstanding streams; dropping it releases the
82    /// backend-side subscription.
83    ///
84    /// Returns `EngineError` only for synchronous setup failures
85    /// (e.g. connection pool exhausted at subscribe-time). Transient
86    /// errors after the stream is returned are handled silently by
87    /// the backend's reconnect loop — callers do not see them.
88    async fn subscribe_completions(&self) -> Result<CompletionStream, EngineError>;
89
90    /// Subscribe to the completion event stream with a per-event
91    /// [`ScannerFilter`] applied at the backend boundary (issue #122).
92    ///
93    /// Identical contract to [`Self::subscribe_completions`] except
94    /// that events whose execution does not match `filter` are
95    /// dropped by the backend before reaching the stream.
96    ///
97    /// # Default implementation
98    ///
99    /// The default body delegates to `subscribe_completions` when
100    /// `filter.is_noop()` (the predicate would accept every event —
101    /// no cost to the per-push filter path). When the filter is
102    /// non-trivial the default returns
103    /// [`EngineError::Unavailable`] — a default *can't* implement
104    /// the filter correctly without backend-specific HGET routing,
105    /// and a silently-unfiltered stream would break tenant isolation.
106    /// Backends that implement the filter (today: Valkey) override
107    /// this method. External backends that need isolation MUST
108    /// override.
109    ///
110    /// # Cost (Valkey backend)
111    ///
112    /// Per push frame: one HGET on `exec_core` when `filter.namespace`
113    /// is set, and/or one HGET on `ff:exec:{p}:<eid>:tags` when
114    /// `filter.instance_tag` is set (2 HGETs total when both are
115    /// set). The backend short-circuits on the cheaper namespace
116    /// check first.
117    async fn subscribe_completions_filtered(
118        &self,
119        filter: &ScannerFilter,
120    ) -> Result<CompletionStream, EngineError> {
121        if filter.is_noop() {
122            self.subscribe_completions().await
123        } else {
124            Err(EngineError::Unavailable {
125                op: "subscribe_completions_filtered (filter non-trivial; backend did not override)",
126            })
127        }
128    }
129}
130
131/// Object-safety assertion: `dyn CompletionBackend` compiles iff
132/// every method is dyn-compatible. Compile-time guard so a future
133/// trait change that accidentally breaks dyn-safety fails the build
134/// at this site rather than at every downstream
135/// `Arc<dyn CompletionBackend>` use. Mirrors the sibling assertion
136/// in `engine_backend.rs`.
137#[allow(dead_code)]
138fn _assert_dyn_compatible(_: &dyn CompletionBackend) {}