ff-core 0.5.0

FlowFabric core types, partition math, key builders, error codes
Documentation
//! The [`CompletionBackend`] trait — backend-agnostic completion
//! subscription surface (issue #90).
//!
//! ff-engine's DAG promotion listener used to `SUBSCRIBE
//! ff:dag:completions` directly on a dedicated RESP3 `ferriskey::Client`
//! — a Valkey-specific wire detail baked into an otherwise
//! backend-agnostic crate. RFC-012 trait-ifies the write surface via
//! [`EngineBackend`]; this trait closes the symmetric gap on the
//! read-side notification path. A Postgres backend would implement
//! this over `LISTEN/NOTIFY`; the Valkey backend keeps the pubsub
//! wiring but hides the channel string behind the trait.
//!
//! # Object safety
//!
//! `CompletionBackend` is object-safe: the single method is
//! `async fn` behind `#[async_trait]` and takes `&self`. Consumers
//! can hold `Arc<dyn CompletionBackend>` alongside
//! `Arc<dyn EngineBackend>` for heterogeneous-backend deployments.
//! A compile-time assertion ([`_assert_dyn_compatible`]) guards
//! future method additions against accidental dyn-incompatibility.
//!
//! # Fanout policy
//!
//! Each `subscribe_completions()` call returns an independent
//! [`CompletionStream`]. Backends are free to implement fanout
//! however they like — the Valkey backend today opens one
//! `SUBSCRIBE` per call on a dedicated RESP3 connection. Callers
//! that want to share a single subscription across many consumers
//! should wrap the stream in a broadcast themselves; the trait does
//! not assume shared-subscription semantics.
//!
//! # Reconnect + transient errors
//!
//! The stream yields [`CompletionPayload`] (not `Result<_, _>`)
//! indefinitely until the consumer drops it. Backends handle
//! reconnect, resubscribe, and transient-error logging internally;
//! completions produced during a reconnect window may be missed
//! and are picked up by ff-engine's `dependency_reconciler` safety
//! net. Issue #90 does NOT promise at-least-once delivery through
//! the stream — that contract stays with the reconciler scanner.
//!
//! # Scope: no `publish_completion`
//!
//! The publisher side of the completion channel is backend-internal.
//! On Valkey, `ff_complete_execution` / `ff_fail_execution` /
//! `ff_cancel_execution` PUBLISH from inside Lua (FCALL-atomic); on
//! Postgres, the stored procedure would NOTIFY. Rust never
//! publishes. There is intentionally no `publish_completion` method
//! on this trait — if you find yourself wanting one, you are on the
//! wrong side of the atomicity boundary.

use std::pin::Pin;

use async_trait::async_trait;
use futures_core::Stream;

use crate::backend::{CompletionPayload, ScannerFilter};
use crate::engine_error::EngineError;

/// A backend-agnostic stream of completion events.
///
/// Boxed because the concrete stream type is backend-specific
/// (tokio `mpsc::Receiver` adapter on Valkey; `LISTEN` notification
/// adapter on Postgres). `Unpin` keeps `.next().await` ergonomic in
/// loop bodies without manual pinning. `Send` so the stream is
/// usable from any tokio task.
pub type CompletionStream = Pin<Box<dyn Stream<Item = CompletionPayload> + Send + Unpin>>;

/// Backend surface for subscribing to completion events.
///
/// The channel string (Valkey: `ff:dag:completions`) is a backend
/// implementation detail and deliberately does NOT appear on the
/// trait. Callers route through `subscribe_completions()` and
/// consume [`CompletionPayload`]s; they never see the wire channel.
#[async_trait]
pub trait CompletionBackend: Send + Sync + 'static {
    /// Subscribe to the completion event stream.
    ///
    /// Each call opens its own subscription (per-call bounded mpsc
    /// fanout on Valkey). The returned stream is independent of all
    /// other outstanding streams; dropping it releases the
    /// backend-side subscription.
    ///
    /// Returns `EngineError` only for synchronous setup failures
    /// (e.g. connection pool exhausted at subscribe-time). Transient
    /// errors after the stream is returned are handled silently by
    /// the backend's reconnect loop — callers do not see them.
    async fn subscribe_completions(&self) -> Result<CompletionStream, EngineError>;

    /// Subscribe to the completion event stream with a per-event
    /// [`ScannerFilter`] applied at the backend boundary (issue #122).
    ///
    /// Identical contract to [`Self::subscribe_completions`] except
    /// that events whose execution does not match `filter` are
    /// dropped by the backend before reaching the stream.
    ///
    /// # Default implementation
    ///
    /// The default body delegates to `subscribe_completions` when
    /// `filter.is_noop()` (the predicate would accept every event —
    /// no cost to the per-push filter path). When the filter is
    /// non-trivial the default returns
    /// [`EngineError::Unavailable`] — a default *can't* implement
    /// the filter correctly without backend-specific HGET routing,
    /// and a silently-unfiltered stream would break tenant isolation.
    /// Backends that implement the filter (today: Valkey) override
    /// this method. External backends that need isolation MUST
    /// override.
    ///
    /// # Cost (Valkey backend)
    ///
    /// Per push frame: one HGET on `exec_core` when `filter.namespace`
    /// is set, and/or one HGET on `ff:exec:{p}:<eid>:tags` when
    /// `filter.instance_tag` is set (2 HGETs total when both are
    /// set). The backend short-circuits on the cheaper namespace
    /// check first.
    ///
    /// # Gotcha: completions are only published for executions that
    /// belong to a flow
    ///
    /// The Lua terminal emits `PUBLISH ff:dag:completions <payload>`
    /// only when `core.flow_id` is set on the execution (see
    /// `crates/ff-script/src/flowfabric.lua` — the PUBLISH is gated
    /// on `is_set(core.flow_id)`). Solo / standalone executions
    /// submitted without a flow never hit the channel, so a
    /// completion subscriber will observe **nothing** for them —
    /// the terminal state lands in `exec_core` as usual and the
    /// `dependency_reconciler` interval scan picks it up, but the
    /// push stream stays silent. Smoke tests that submit a single
    /// execution and wait on a completion subscription will hang
    /// indefinitely; either submit under a flow or poll
    /// `describe_execution` instead.
    async fn subscribe_completions_filtered(
        &self,
        filter: &ScannerFilter,
    ) -> Result<CompletionStream, EngineError> {
        if filter.is_noop() {
            self.subscribe_completions().await
        } else {
            Err(EngineError::Unavailable {
                op: "subscribe_completions_filtered (filter non-trivial; backend did not override)",
            })
        }
    }
}

/// Object-safety assertion: `dyn CompletionBackend` compiles iff
/// every method is dyn-compatible. Compile-time guard so a future
/// trait change that accidentally breaks dyn-safety fails the build
/// at this site rather than at every downstream
/// `Arc<dyn CompletionBackend>` use. Mirrors the sibling assertion
/// in `engine_backend.rs`.
#[allow(dead_code)]
fn _assert_dyn_compatible(_: &dyn CompletionBackend) {}