Skip to main content

graphrefly_operators/
producer.rs

1//! Producer-shape operator substrate (Slice D-ops, Commit 2).
2//!
3//! Producer ops (zip / concat / race / takeUntil) are nodes with no
4//! declared deps that fire their fn ONCE on first activation. The fn
5//! body subscribes to upstream sources via [`ProducerCtx::subscribe_to`]
6//! and registers per-op state (queues, phase flags, winner index). When
7//! upstream emits, the operator's sink closures re-enter Core via
8//! `Core::emit` / `Core::complete` / `Core::error` on the producer node.
9//!
10//! On last-subscriber unsubscribe, Core invokes
11//! [`BindingBoundary::producer_deactivate(node_id)`](graphrefly_core::BindingBoundary::producer_deactivate);
12//! the binding's impl drops the per-node entry from its
13//! `producer_states` map, which cascades:
14//!
15//! ```text
16//! producer_states.remove(node_id)  →
17//!   Vec<Subscription> drops          →
18//!     each Subscription::Drop fires  →
19//!       upstream sinks unsubscribe.
20//! ```
21//!
22//! # Reference-cycle discipline (Slice Y, 2026-05-08)
23//!
24//! Build closures registered via
25//! [`ProducerBinding::register_producer_build`] are stored long-term in
26//! the binding's `producer_builds` registry. To avoid the strong-Arc
27//! cycle `BenchBinding → registry → producer_builds[fn_id] → closure →
28//! strong-Arc<dyn ProducerBinding> → BenchBinding`, factory bodies
29//! (`zip` / `concat` / `race` / `take_until` in `ops_impl.rs` plus
30//! `switch_map` / `exhaust_map` / `merge_map` / `concat_map` in
31//! `higher_order.rs`) capture `WeakCore` and
32//! `Weak<dyn ProducerBinding>` (and `Weak<dyn HigherOrderBinding>`
33//! for the higher-order factories). The build closure upgrades both
34//! on each invocation; if the host `Core` was already dropped, upgrade
35//! returns `None` and the build closure no-ops cleanly.
36//!
37//! Sinks spawned by the build closure capture STRONG refs cloned from
38//! the upgraded weaks. Their lifetime is tied to the producer's active
39//! subscription — `producer_deactivate` on last-subscriber unsubscribe
40//! clears `producer_storage[node_id]`, dropping the upstream
41//! `Subscription`s, which drops the sinks, which drops the strong
42//! captures. So the strong-ref window is bounded by producer-active
43//! state, not by the long-lived `producer_builds` registry.
44
45use std::any::Any;
46use std::sync::Arc;
47
48use ahash::AHashMap as HashMap;
49use parking_lot::Mutex;
50
51use graphrefly_core::{BindingBoundary, Core, FnId, NodeId, Sink, Subscription};
52
53/// Outcome of [`ProducerCtx::subscribe_to`] — the producer-layer
54/// translation of [`graphrefly_core::SubscribeError`] into a positive
55/// outcome enum that operators (zip / concat / race / take_until /
56/// merge_map / switch_map / exhaust_map / concat_map) can match on for
57/// per-operator dead-source semantics.
58///
59/// Introduced /qa F2 (2026-05-10) to close the silent-wedge class of
60/// bugs where operators previously couldn't tell that a `subscribe_to`
61/// call had been rejected per R2.2.7.b (non-resubscribable terminal
62/// source) — pre-F2 the rejection was logged-and-skipped silently,
63/// which left zip waiting for a queue that would never fill, concat
64/// stuck on a source that would never advance, etc.
65///
66/// Mirrors the per-domain status-string-union pattern used in TS
67/// (`RefineStatus`, `AgentStatus`, process status: `"running" |
68/// "completed" | "errored" | "cancelled"`) — each operator-layer
69/// outcome lives in its own typed enum rather than sharing a global
70/// `Outcome<T, E>` type.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum SubscribeOutcome {
73    /// Subscription installed successfully. The
74    /// [`ProducerNodeState`] holds the [`Subscription`]; no further
75    /// operator action required.
76    Live,
77    /// Subscription was deferred to wave-end via the
78    /// [`graphrefly_core::DeferredProducerOp::Callback`] queue (Phase
79    /// H+ STRICT, D115). The deferred callback installs the
80    /// subscription after wave_guards release. Operators MAY treat
81    /// this as `Live` for lifecycle bookkeeping — the subscription
82    /// WILL be installed; just not yet.
83    Deferred,
84    /// The target node is non-resubscribable AND has terminated
85    /// (R2.2.7.b, D118). The sink will NOT be installed. Operators
86    /// MUST handle this per their semantics:
87    ///
88    /// - **zip / take_until (source)**: self-Complete (tuple stream
89    ///   can never form; take_until's source is gone).
90    /// - **concat**: advance to the next source (treat as inner
91    ///   Complete signal).
92    /// - **race**: mark `completed[idx] = true`; if all sources are
93    ///   Dead/Complete, self-Complete.
94    /// - **take_until (notifier)**: ignore (notifier signal will
95    ///   never fire; take_until reduces to a passthrough of source).
96    /// - **switch_map / exhaust_map / concat_map / merge_map (inner)**:
97    ///   treat as immediate `on_inner_complete` — decrement active,
98    ///   advance to next, check self-Complete trigger.
99    Dead {
100        /// The dead node that rejected the subscribe.
101        node: NodeId,
102    },
103}
104
105/// Build closure type — the producer's fn body, called once on first
106/// activation. The closure receives a [`ProducerCtx`] for setting up
107/// upstream subscriptions; emissions on the producer come from sink
108/// callbacks the closure registers.
109pub type ProducerBuildFn = Box<dyn Fn(ProducerCtx<'_>) + Send + Sync>;
110
111/// Per-producer-node state owned by the [`ProducerBinding`] impl.
112///
113/// Holds upstream `Subscription`s (auto-dropped on producer
114/// deactivation) plus an optional `Box<dyn Any>` slot for op-specific
115/// state shared across the build closure and its sink closures.
116/// (Most ops capture state via `Arc<Mutex<...>>` directly in closure
117/// captures; the `op_state` slot is reserved for ops that prefer
118/// trait-object storage.)
119#[derive(Default)]
120pub struct ProducerNodeState {
121    /// Subscriptions to upstream sources, taken by
122    /// [`ProducerCtx::subscribe_to`]. Dropped on producer deactivation.
123    pub subs: Vec<Subscription>,
124    /// Optional op-specific scratch (rarely used; most ops capture
125    /// state via closure).
126    pub op_state: Option<Box<dyn Any + Send + Sync>>,
127}
128
129/// Storage shared between the [`ProducerBinding`] impl and the
130/// [`ProducerCtx`] passed to build closures. Keyed by producer NodeId.
131///
132/// Access via `Arc<Mutex<_>>` so the binding's `producer_deactivate`
133/// hook can clear an entry while build/sink closures hold their own
134/// per-op state via separate Arc captures.
135pub type ProducerStorage = Arc<Mutex<HashMap<NodeId, ProducerNodeState>>>;
136
137/// Closure-registration interface for producer-shape operators —
138/// extends [`BindingBoundary`] with one method that bindings shipping
139/// producers must implement.
140///
141/// Bindings that don't ship producers (e.g., minimal test bindings)
142/// don't need to implement this trait. The operator factories below
143/// (`zip`, `concat`, `race`, `take_until`) require it.
144pub trait ProducerBinding: BindingBoundary {
145    /// Register a producer build closure. The returned [`FnId`] is
146    /// passed to [`Core::register_producer`]; on first activation,
147    /// Core invokes [`BindingBoundary::invoke_fn`] which the binding
148    /// dispatches to the registered build closure.
149    fn register_producer_build(&self, build: ProducerBuildFn) -> FnId;
150
151    /// Access the binding's producer-state storage. Used by
152    /// [`ProducerCtx::subscribe_to`] to push subscriptions into the
153    /// per-node entry, and by the binding's `producer_deactivate`
154    /// impl to drop the entry on last unsubscribe.
155    fn producer_storage(&self) -> &ProducerStorage;
156}
157
158/// Context handed to a producer's build closure on activation.
159///
160/// Provides:
161/// - [`Self::node_id`] / [`Self::core`] — identity + Core access for
162///   sink callbacks that re-enter Core.
163/// - [`Self::subscribe_to`] — subscribe to an upstream Core node;
164///   the resulting `Subscription` is auto-tracked under
165///   `node_id` in the binding's producer storage and dropped on
166///   producer deactivation.
167pub struct ProducerCtx<'a> {
168    node_id: NodeId,
169    core: &'a Core,
170    storage: &'a ProducerStorage,
171}
172
173impl<'a> ProducerCtx<'a> {
174    /// Construct a new context for the binding's `invoke_fn` dispatch
175    /// to call build closures. Internal — bindings call this; user
176    /// code receives the constructed ctx via the build closure's arg.
177    pub fn new(node_id: NodeId, core: &'a Core, storage: &'a ProducerStorage) -> Self {
178        Self {
179            node_id,
180            core,
181            storage,
182        }
183    }
184
185    /// The producer node's id.
186    #[must_use]
187    pub fn node_id(&self) -> NodeId {
188        self.node_id
189    }
190
191    /// The Core dispatcher. Sink closures use this to re-enter Core —
192    /// `core.emit(self.node_id(), h)` to emit a value, etc.
193    #[must_use]
194    pub fn core(&self) -> &Core {
195        self.core
196    }
197
198    /// Subscribe `sink` to upstream `source`. The `Subscription` is
199    /// auto-tracked under the producer's `node_id`; on producer
200    /// deactivation, the binding drops the storage entry, which drops
201    /// the Subscription, which unsubscribes the sink.
202    ///
203    /// **Phase H+ STRICT (D115, 2026-05-10):** uses `try_subscribe`
204    /// to attempt the subscription. On partition order violation, the
205    /// subscribe is deferred to wave-end via
206    /// `DeferredProducerOp::Callback` — the deferred callback runs
207    /// after all partition wave_owners are released (no partitions
208    /// held → safe to acquire any partition).
209    ///
210    /// **R2.2.7.b (D118, 2026-05-10):** if the upstream is
211    /// non-resubscribable AND already terminated, `try_subscribe`
212    /// returns `Err(SubscribeError::TornDown)`. /qa F2 (2026-05-10):
213    /// the rejection is now surfaced to the caller via
214    /// [`SubscribeOutcome::Dead`] so the operator can apply its
215    /// per-op dead-source semantics — pre-F2 the rejection was
216    /// silently swallowed, leaving operators wedged (zip waiting on a
217    /// queue that would never fill, concat stuck on a source that
218    /// would never advance, etc.). See [`SubscribeOutcome::Dead`] for
219    /// per-operator guidance.
220    pub fn subscribe_to(&self, source: NodeId, sink: Sink) -> SubscribeOutcome {
221        let sink_for_defer = sink.clone();
222        match self.core.try_subscribe(source, sink) {
223            Ok(sub) => {
224                self.storage
225                    .lock()
226                    .entry(self.node_id)
227                    .or_default()
228                    .subs
229                    .push(sub);
230                SubscribeOutcome::Live
231            }
232            Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
233                let core = self.core.clone();
234                let core_for_callback = core.clone();
235                let storage = self.storage.clone();
236                let node_id = self.node_id;
237                // F2 /qa (2026-05-10): deferred Callback uses
238                // `try_subscribe` (not the panicking `subscribe`) so a
239                // source that became non-resubscribable + terminal
240                // between the original defer-queue push and the
241                // wave-end drain doesn't crash the binding boundary.
242                // On TornDown at drain time, the producer-layer's
243                // SubscribeOutcome::Dead path was never reached at
244                // subscribe-time (because we deferred); we silently
245                // drop the deferred sub here. Per-operator dead-source
246                // semantics rely on the subscribe-time outcome — if
247                // the source went terminal during the defer window,
248                // the operator's other inputs / lifecycle paths must
249                // handle it (e.g., zip with one Live + one
250                // raced-to-Dead source would still emit until the
251                // dead one Complete-cascades to the operator via
252                // other state, which it doesn't here — but the
253                // condition requires concurrent termination during
254                // wave-end drain which is a narrow window).
255                core.push_deferred_producer_op(graphrefly_core::DeferredProducerOp::Callback(
256                    Box::new(move || {
257                        match core_for_callback.try_subscribe(source, sink_for_defer) {
258                            Ok(sub) => {
259                                storage.lock().entry(node_id).or_default().subs.push(sub);
260                            }
261                            Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
262                                // Source became Dead during the defer
263                                // window — silently drop (see comment
264                                // above).
265                            }
266                            Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
267                                // Should never happen: the deferred
268                                // Callback drains AFTER wave_guards
269                                // release, so partition acquisition
270                                // can't fail. Surface as a panic if
271                                // it ever does.
272                                panic!(
273                                    "deferred producer-op Callback: partition-order \
274                                     violation at wave-end drain — substrate invariant \
275                                     broken (wave_guards still held during drain)"
276                                );
277                            }
278                        }
279                    }),
280                ));
281                SubscribeOutcome::Deferred
282            }
283            Err(graphrefly_core::SubscribeError::TornDown { node }) => {
284                SubscribeOutcome::Dead { node }
285            }
286        }
287    }
288}
289
290/// Default helper — drop the producer's storage entry on
291/// deactivation. Bindings can call this from their
292/// [`BindingBoundary::producer_deactivate`] impl to get the standard
293/// auto-cleanup behavior.
294pub fn default_producer_deactivate(storage: &ProducerStorage, node_id: NodeId) {
295    let mut states = storage.lock();
296    states.remove(&node_id);
297}
298
299// =====================================================================
300// Producer-shape operators (D-ops, Slice D Commit 2)
301// =====================================================================
302//
303// All four producer ops follow the same shape:
304//
305// 1. Operator factory captures `Core::clone()` + sources + per-op state
306//    (Arc<Mutex<...>>) into a build closure.
307// 2. `register_producer_build` returns a FnId.
308// 3. `Core::register_producer(fn_id)` creates the producer node.
309// 4. On first subscribe, Core fires invoke_fn → binding dispatches to
310//    the build closure → ProducerCtx is constructed.
311// 5. Build closure subscribes to each upstream source, providing sink
312//    closures that capture per-op state and the producer's NodeId.
313// 6. Sink closures process upstream emissions and emit on the producer
314//    node via `core.emit` / `core.complete` / `core.error`.
315// 7. On last subscriber unsubscribe, Core fires producer_deactivate →
316//    binding drops storage entry → Subscription Vec drops → sinks
317//    unsub from upstream.
318//
319// The concrete operators (`zip` / `concat` / `race` / `take_until`)
320// live in [`super::ops_impl`] (sibling module) and are re-exported
321// from the crate root.