graphrefly_operators/producer.rs
1//! Producer-shape operator substrate (Slice D-ops, Commit 2).
2//!
3//! Producer ops (zip / concat / race / takeUntil) are nodes with no
4//! declared deps that fire their fn ONCE on first activation. The fn
5//! body subscribes to upstream sources via [`ProducerCtx::subscribe_to`]
6//! and registers per-op state (queues, phase flags, winner index). When
7//! upstream emits, the operator's sink closures re-enter Core via
8//! `Core::emit` / `Core::complete` / `Core::error` on the producer node.
9//!
10//! On last-subscriber unsubscribe, Core invokes
11//! [`BindingBoundary::producer_deactivate(node_id)`](graphrefly_core::BindingBoundary::producer_deactivate);
12//! the binding's impl drops the per-node entry from its
13//! `producer_states` map, which cascades:
14//!
15//! ```text
16//! producer_states.remove(node_id) →
17//! Vec<Subscription> drops →
18//! each Subscription::Drop fires →
19//! upstream sinks unsubscribe.
20//! ```
21//!
22//! # Reference-cycle discipline (Slice Y, 2026-05-08)
23//!
24//! Build closures registered via
25//! [`ProducerBinding::register_producer_build`] are stored long-term in
26//! the binding's `producer_builds` registry. To avoid the strong-Arc
27//! cycle `BenchBinding → registry → producer_builds[fn_id] → closure →
28//! strong-Arc<dyn ProducerBinding> → BenchBinding`, factory bodies
29//! (`zip` / `concat` / `race` / `take_until` in `ops_impl.rs` plus
30//! `switch_map` / `exhaust_map` / `merge_map` / `concat_map` in
31//! `higher_order.rs`) capture `WeakCore` and
32//! `Weak<dyn ProducerBinding>` (and `Weak<dyn HigherOrderBinding>`
33//! for the higher-order factories). The build closure upgrades both
34//! on each invocation; if the host `Core` was already dropped, upgrade
35//! returns `None` and the build closure no-ops cleanly.
36//!
37//! Sinks spawned by the build closure capture STRONG refs cloned from
38//! the upgraded weaks. Their lifetime is tied to the producer's active
39//! subscription — `producer_deactivate` on last-subscriber unsubscribe
40//! clears `producer_storage[node_id]`, dropping the upstream
41//! `Subscription`s, which drops the sinks, which drops the strong
42//! captures. So the strong-ref window is bounded by producer-active
43//! state, not by the long-lived `producer_builds` registry.
44
45use std::any::Any;
46use std::sync::Arc;
47
48use ahash::AHashMap as HashMap;
49use parking_lot::Mutex;
50
51use graphrefly_core::{BindingBoundary, Core, FnId, NodeId, Sink, Subscription};
52
53/// Build closure type — the producer's fn body, called once on first
54/// activation. The closure receives a [`ProducerCtx`] for setting up
55/// upstream subscriptions; emissions on the producer come from sink
56/// callbacks the closure registers.
57pub type ProducerBuildFn = Box<dyn Fn(ProducerCtx<'_>) + Send + Sync>;
58
59/// Per-producer-node state owned by the [`ProducerBinding`] impl.
60///
61/// Holds upstream `Subscription`s (auto-dropped on producer
62/// deactivation) plus an optional `Box<dyn Any>` slot for op-specific
63/// state shared across the build closure and its sink closures.
64/// (Most ops capture state via `Arc<Mutex<...>>` directly in closure
65/// captures; the `op_state` slot is reserved for ops that prefer
66/// trait-object storage.)
67#[derive(Default)]
68pub struct ProducerNodeState {
69 /// Subscriptions to upstream sources, taken by
70 /// [`ProducerCtx::subscribe_to`]. Dropped on producer deactivation.
71 pub subs: Vec<Subscription>,
72 /// Optional op-specific scratch (rarely used; most ops capture
73 /// state via closure).
74 pub op_state: Option<Box<dyn Any + Send + Sync>>,
75}
76
77/// Storage shared between the [`ProducerBinding`] impl and the
78/// [`ProducerCtx`] passed to build closures. Keyed by producer NodeId.
79///
80/// Access via `Arc<Mutex<_>>` so the binding's `producer_deactivate`
81/// hook can clear an entry while build/sink closures hold their own
82/// per-op state via separate Arc captures.
83pub type ProducerStorage = Arc<Mutex<HashMap<NodeId, ProducerNodeState>>>;
84
85/// Closure-registration interface for producer-shape operators —
86/// extends [`BindingBoundary`] with one method that bindings shipping
87/// producers must implement.
88///
89/// Bindings that don't ship producers (e.g., minimal test bindings)
90/// don't need to implement this trait. The operator factories below
91/// (`zip`, `concat`, `race`, `take_until`) require it.
92pub trait ProducerBinding: BindingBoundary {
93 /// Register a producer build closure. The returned [`FnId`] is
94 /// passed to [`Core::register_producer`]; on first activation,
95 /// Core invokes [`BindingBoundary::invoke_fn`] which the binding
96 /// dispatches to the registered build closure.
97 fn register_producer_build(&self, build: ProducerBuildFn) -> FnId;
98
99 /// Access the binding's producer-state storage. Used by
100 /// [`ProducerCtx::subscribe_to`] to push subscriptions into the
101 /// per-node entry, and by the binding's `producer_deactivate`
102 /// impl to drop the entry on last unsubscribe.
103 fn producer_storage(&self) -> &ProducerStorage;
104}
105
106/// Context handed to a producer's build closure on activation.
107///
108/// Provides:
109/// - [`Self::node_id`] / [`Self::core`] — identity + Core access for
110/// sink callbacks that re-enter Core.
111/// - [`Self::subscribe_to`] — subscribe to an upstream Core node;
112/// the resulting `Subscription` is auto-tracked under
113/// `node_id` in the binding's producer storage and dropped on
114/// producer deactivation.
115pub struct ProducerCtx<'a> {
116 node_id: NodeId,
117 core: &'a Core,
118 storage: &'a ProducerStorage,
119}
120
121impl<'a> ProducerCtx<'a> {
122 /// Construct a new context for the binding's `invoke_fn` dispatch
123 /// to call build closures. Internal — bindings call this; user
124 /// code receives the constructed ctx via the build closure's arg.
125 pub fn new(node_id: NodeId, core: &'a Core, storage: &'a ProducerStorage) -> Self {
126 Self {
127 node_id,
128 core,
129 storage,
130 }
131 }
132
133 /// The producer node's id.
134 #[must_use]
135 pub fn node_id(&self) -> NodeId {
136 self.node_id
137 }
138
139 /// The Core dispatcher. Sink closures use this to re-enter Core —
140 /// `core.emit(self.node_id(), h)` to emit a value, etc.
141 #[must_use]
142 pub fn core(&self) -> &Core {
143 self.core
144 }
145
146 /// Subscribe `sink` to upstream `source`. The `Subscription` is
147 /// auto-tracked under the producer's `node_id`; on producer
148 /// deactivation, the binding drops the storage entry, which drops
149 /// the Subscription, which unsubscribes the sink.
150 ///
151 /// **Phase H+ option (d) /qa N1(a) (2026-05-09):** the `sink` is
152 /// wrapped in `ProducerSinkGuard` so for the duration of every
153 /// sink invocation, the per-thread `IN_PRODUCER_BUILD` refcount
154 /// is bumped, suppressing the H+ ascending-order check. This
155 /// preserves the existing producer-pattern operator architecture
156 /// against the widened H+ gate. The producer-pattern operators
157 /// (zip, concat, race, take_until, switch_map, exhaust_map,
158 /// concat_map, merge_map) all do cross-partition `Core::subscribe`
159 /// plus `Core::emit` from inside their inner-source sink
160 /// callbacks; the wrapping carve-out lets those continue to work
161 /// while non-producer sink-callback re-entry IS checked by the
162 /// widened gate. Refactoring the operators to defer their
163 /// sink-time inner subscribes and re-emits to wave-end is the
164 /// broader Phase H+ STRICT variant scope (option `b`
165 /// defer-to-post-flush, estimated 1500+ LOC) per
166 /// `docs/porting-deferred.md`.
167 pub fn subscribe_to(&self, source: NodeId, sink: Sink) {
168 let wrapped: Sink = std::sync::Arc::new(move |msgs| {
169 // RAII guard so producer_build_exit() runs even if the
170 // sink panics (Drop in Rust runs during unwind).
171 struct ProducerSinkGuard;
172 impl Drop for ProducerSinkGuard {
173 fn drop(&mut self) {
174 graphrefly_core::producer_build_exit();
175 }
176 }
177 graphrefly_core::producer_build_enter();
178 let _g = ProducerSinkGuard;
179 sink(msgs);
180 });
181 let sub = self.core.subscribe(source, wrapped);
182 let mut states = self.storage.lock();
183 states.entry(self.node_id).or_default().subs.push(sub);
184 }
185}
186
187/// Default helper — drop the producer's storage entry on
188/// deactivation. Bindings can call this from their
189/// [`BindingBoundary::producer_deactivate`] impl to get the standard
190/// auto-cleanup behavior.
191pub fn default_producer_deactivate(storage: &ProducerStorage, node_id: NodeId) {
192 let mut states = storage.lock();
193 states.remove(&node_id);
194}
195
196// =====================================================================
197// Producer-shape operators (D-ops, Slice D Commit 2)
198// =====================================================================
199//
200// All four producer ops follow the same shape:
201//
202// 1. Operator factory captures `Core::clone()` + sources + per-op state
203// (Arc<Mutex<...>>) into a build closure.
204// 2. `register_producer_build` returns a FnId.
205// 3. `Core::register_producer(fn_id)` creates the producer node.
206// 4. On first subscribe, Core fires invoke_fn → binding dispatches to
207// the build closure → ProducerCtx is constructed.
208// 5. Build closure subscribes to each upstream source, providing sink
209// closures that capture per-op state and the producer's NodeId.
210// 6. Sink closures process upstream emissions and emit on the producer
211// node via `core.emit` / `core.complete` / `core.error`.
212// 7. On last subscriber unsubscribe, Core fires producer_deactivate →
213// binding drops storage entry → Subscription Vec drops → sinks
214// unsub from upstream.
215//
216// The concrete operators (`zip` / `concat` / `race` / `take_until`)
217// live in [`super::ops_impl`] (sibling module) and are re-exported
218// from the crate root.