ff_core/completion_backend.rs
1//! The [`CompletionBackend`] trait — backend-agnostic completion
2//! subscription surface (issue #90).
3//!
4//! ff-engine's DAG promotion listener used to `SUBSCRIBE
5//! ff:dag:completions` directly on a dedicated RESP3 `ferriskey::Client`
6//! — a Valkey-specific wire detail baked into an otherwise
7//! backend-agnostic crate. RFC-012 trait-ifies the write surface via
8//! [`EngineBackend`]; this trait closes the symmetric gap on the
9//! read-side notification path. A Postgres backend would implement
10//! this over `LISTEN/NOTIFY`; the Valkey backend keeps the pubsub
11//! wiring but hides the channel string behind the trait.
12//!
13//! # Object safety
14//!
15//! `CompletionBackend` is object-safe: the single method is
16//! `async fn` behind `#[async_trait]` and takes `&self`. Consumers
17//! can hold `Arc<dyn CompletionBackend>` alongside
18//! `Arc<dyn EngineBackend>` for heterogeneous-backend deployments.
19//! A compile-time assertion ([`_assert_dyn_compatible`]) guards
20//! future method additions against accidental dyn-incompatibility.
21//!
22//! # Fanout policy
23//!
24//! Each `subscribe_completions()` call returns an independent
25//! [`CompletionStream`]. Backends are free to implement fanout
26//! however they like — the Valkey backend today opens one
27//! `SUBSCRIBE` per call on a dedicated RESP3 connection. Callers
28//! that want to share a single subscription across many consumers
29//! should wrap the stream in a broadcast themselves; the trait does
30//! not assume shared-subscription semantics.
31//!
32//! # Reconnect + transient errors
33//!
34//! The stream yields [`CompletionPayload`] (not `Result<_, _>`)
35//! indefinitely until the consumer drops it. Backends handle
36//! reconnect, resubscribe, and transient-error logging internally;
37//! completions produced during a reconnect window may be missed
38//! and are picked up by ff-engine's `dependency_reconciler` safety
39//! net. Issue #90 does NOT promise at-least-once delivery through
40//! the stream — that contract stays with the reconciler scanner.
41//!
42//! # Scope: no `publish_completion`
43//!
44//! The publisher side of the completion channel is backend-internal.
45//! On Valkey, `ff_complete_execution` / `ff_fail_execution` /
46//! `ff_cancel_execution` PUBLISH from inside Lua (FCALL-atomic); on
47//! Postgres, the stored procedure would NOTIFY. Rust never
48//! publishes. There is intentionally no `publish_completion` method
49//! on this trait — if you find yourself wanting one, you are on the
50//! wrong side of the atomicity boundary.
51
52use std::pin::Pin;
53
54use async_trait::async_trait;
55use futures_core::Stream;
56
57use crate::backend::{CompletionPayload, ScannerFilter};
58use crate::engine_error::EngineError;
59
60/// A backend-agnostic stream of completion events.
61///
62/// Boxed because the concrete stream type is backend-specific
63/// (tokio `mpsc::Receiver` adapter on Valkey; `LISTEN` notification
64/// adapter on Postgres). `Unpin` keeps `.next().await` ergonomic in
65/// loop bodies without manual pinning. `Send` so the stream is
66/// usable from any tokio task.
67pub type CompletionStream = Pin<Box<dyn Stream<Item = CompletionPayload> + Send + Unpin>>;
68
69/// Backend surface for subscribing to completion events.
70///
71/// The channel string (Valkey: `ff:dag:completions`) is a backend
72/// implementation detail and deliberately does NOT appear on the
73/// trait. Callers route through `subscribe_completions()` and
74/// consume [`CompletionPayload`]s; they never see the wire channel.
75#[async_trait]
76pub trait CompletionBackend: Send + Sync + 'static {
77 /// Subscribe to the completion event stream.
78 ///
79 /// Each call opens its own subscription (per-call bounded mpsc
80 /// fanout on Valkey). The returned stream is independent of all
81 /// other outstanding streams; dropping it releases the
82 /// backend-side subscription.
83 ///
84 /// Returns `EngineError` only for synchronous setup failures
85 /// (e.g. connection pool exhausted at subscribe-time). Transient
86 /// errors after the stream is returned are handled silently by
87 /// the backend's reconnect loop — callers do not see them.
88 async fn subscribe_completions(&self) -> Result<CompletionStream, EngineError>;
89
90 /// Subscribe to the completion event stream with a per-event
91 /// [`ScannerFilter`] applied at the backend boundary (issue #122).
92 ///
93 /// Identical contract to [`Self::subscribe_completions`] except
94 /// that events whose execution does not match `filter` are
95 /// dropped by the backend before reaching the stream.
96 ///
97 /// # Default implementation
98 ///
99 /// The default body delegates to `subscribe_completions` when
100 /// `filter.is_noop()` (the predicate would accept every event —
101 /// no cost to the per-push filter path). When the filter is
102 /// non-trivial the default returns
103 /// [`EngineError::Unavailable`] — a default *can't* implement
104 /// the filter correctly without backend-specific HGET routing,
105 /// and a silently-unfiltered stream would break tenant isolation.
106 /// Backends that implement the filter (today: Valkey) override
107 /// this method. External backends that need isolation MUST
108 /// override.
109 ///
110 /// # Cost (Valkey backend)
111 ///
112 /// Per push frame: one HGET on `exec_core` when `filter.namespace`
113 /// is set, and/or one HGET on `ff:exec:{p}:<eid>:tags` when
114 /// `filter.instance_tag` is set (2 HGETs total when both are
115 /// set). The backend short-circuits on the cheaper namespace
116 /// check first.
117 ///
118 /// # Gotcha: completions are only published for executions that
119 /// belong to a flow
120 ///
121 /// The Lua terminal emits `PUBLISH ff:dag:completions <payload>`
122 /// only when `core.flow_id` is set on the execution (see
123 /// `crates/ff-script/src/flowfabric.lua` — the PUBLISH is gated
124 /// on `is_set(core.flow_id)`). Solo / standalone executions
125 /// submitted without a flow never hit the channel, so a
126 /// completion subscriber will observe **nothing** for them —
127 /// the terminal state lands in `exec_core` as usual and the
128 /// `dependency_reconciler` interval scan picks it up, but the
129 /// push stream stays silent. Smoke tests that submit a single
130 /// execution and wait on a completion subscription will hang
131 /// indefinitely; either submit under a flow or poll
132 /// `describe_execution` instead.
133 async fn subscribe_completions_filtered(
134 &self,
135 filter: &ScannerFilter,
136 ) -> Result<CompletionStream, EngineError> {
137 if filter.is_noop() {
138 self.subscribe_completions().await
139 } else {
140 Err(EngineError::Unavailable {
141 op: "subscribe_completions_filtered (filter non-trivial; backend did not override)",
142 })
143 }
144 }
145}
146
147/// Object-safety assertion: `dyn CompletionBackend` compiles iff
148/// every method is dyn-compatible. Compile-time guard so a future
149/// trait change that accidentally breaks dyn-safety fails the build
150/// at this site rather than at every downstream
151/// `Arc<dyn CompletionBackend>` use. Mirrors the sibling assertion
152/// in `engine_backend.rs`.
153#[allow(dead_code)]
154fn _assert_dyn_compatible(_: &dyn CompletionBackend) {}