graphrefly_operators/producer.rs
1//! Producer-shape operator substrate (Slice D-ops, Commit 2).
2//!
3//! Producer ops (zip / concat / race / takeUntil) are nodes with no
4//! declared deps that fire their fn ONCE on first activation. The fn
5//! body subscribes to upstream sources via [`ProducerCtx::subscribe_to`]
6//! and registers per-op state (queues, phase flags, winner index). When
7//! upstream emits, the operator's sink closures re-enter Core via
8//! `Core::emit` / `Core::complete` / `Core::error` on the producer node.
9//!
10//! On last-subscriber unsubscribe, Core invokes
11//! [`BindingBoundary::producer_deactivate(node_id)`](graphrefly_core::BindingBoundary::producer_deactivate);
12//! the binding's impl drops the per-node entry from its
13//! `producer_states` map, which cascades:
14//!
15//! ```text
16//! producer_states.remove(node_id) →
17//! Vec<Subscription> drops →
18//! each Subscription::Drop fires →
19//! upstream sinks unsubscribe.
20//! ```
21//!
22//! # Reference-cycle discipline (Slice Y, 2026-05-08)
23//!
24//! Build closures registered via
25//! [`ProducerBinding::register_producer_build`] are stored long-term in
26//! the binding's `producer_builds` registry. To avoid the strong-Arc
27//! cycle `BenchBinding → registry → producer_builds[fn_id] → closure →
28//! strong-Arc<dyn ProducerBinding> → BenchBinding`, factory bodies
29//! (`zip` / `concat` / `race` / `take_until` in `ops_impl.rs` plus
30//! `switch_map` / `exhaust_map` / `merge_map` / `concat_map` in
31//! `higher_order.rs`) capture `WeakCore` and
32//! `Weak<dyn ProducerBinding>` (and `Weak<dyn HigherOrderBinding>`
33//! for the higher-order factories). The build closure upgrades both
34//! on each invocation; if the host `Core` was already dropped, upgrade
35//! returns `None` and the build closure no-ops cleanly.
36//!
37//! Sinks spawned by the build closure capture STRONG refs cloned from
38//! the upgraded weaks. Their lifetime is tied to the producer's active
39//! subscription — `producer_deactivate` on last-subscriber unsubscribe
40//! clears `producer_storage[node_id]`, dropping the upstream
41//! `Subscription`s, which drops the sinks, which drops the strong
42//! captures. So the strong-ref window is bounded by producer-active
43//! state, not by the long-lived `producer_builds` registry.
44
45use std::any::Any;
46use std::sync::Arc;
47
48use ahash::AHashMap as HashMap;
49use parking_lot::Mutex;
50
51use graphrefly_core::{BindingBoundary, Core, FnId, NodeId, Sink, Subscription};
52
53/// Outcome of [`ProducerCtx::subscribe_to`] — the producer-layer
54/// translation of [`graphrefly_core::SubscribeError`] into a positive
55/// outcome enum that operators (zip / concat / race / take_until /
56/// merge_map / switch_map / exhaust_map / concat_map) can match on for
57/// per-operator dead-source semantics.
58///
59/// Introduced /qa F2 (2026-05-10) to close the silent-wedge class of
60/// bugs where operators previously couldn't tell that a `subscribe_to`
61/// call had been rejected per R2.2.7.b (non-resubscribable terminal
62/// source) — pre-F2 the rejection was logged-and-skipped silently,
63/// which left zip waiting for a queue that would never fill, concat
64/// stuck on a source that would never advance, etc.
65///
66/// Mirrors the per-domain status-string-union pattern used in TS
67/// (`RefineStatus`, `AgentStatus`, process status: `"running" |
68/// "completed" | "errored" | "cancelled"`) — each operator-layer
69/// outcome lives in its own typed enum rather than sharing a global
70/// `Outcome<T, E>` type.
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum SubscribeOutcome {
73 /// Subscription installed successfully. The
74 /// [`ProducerNodeState`] holds the [`Subscription`]; no further
75 /// operator action required.
76 Live,
77 /// Subscription was deferred to wave-end via the
78 /// [`graphrefly_core::DeferredProducerOp::Callback`] queue (Phase
79 /// H+ STRICT, D115). The deferred callback installs the
80 /// subscription after wave_guards release. Operators MAY treat
81 /// this as `Live` for lifecycle bookkeeping — the subscription
82 /// WILL be installed; just not yet.
83 Deferred,
84 /// The target node is non-resubscribable AND has terminated
85 /// (R2.2.7.b, D118). The sink will NOT be installed. Operators
86 /// MUST handle this per their semantics:
87 ///
88 /// - **zip / take_until (source)**: self-Complete (tuple stream
89 /// can never form; take_until's source is gone).
90 /// - **concat**: advance to the next source (treat as inner
91 /// Complete signal).
92 /// - **race**: mark `completed[idx] = true`; if all sources are
93 /// Dead/Complete, self-Complete.
94 /// - **take_until (notifier)**: ignore (notifier signal will
95 /// never fire; take_until reduces to a passthrough of source).
96 /// - **switch_map / exhaust_map / concat_map / merge_map (inner)**:
97 /// treat as immediate `on_inner_complete` — decrement active,
98 /// advance to next, check self-Complete trigger.
99 Dead {
100 /// The dead node that rejected the subscribe.
101 node: NodeId,
102 },
103}
104
105/// Build closure type — the producer's fn body, called once on first
106/// activation. The closure receives a [`ProducerCtx`] for setting up
107/// upstream subscriptions; emissions on the producer come from sink
108/// callbacks the closure registers.
109pub type ProducerBuildFn = Box<dyn Fn(ProducerCtx<'_>) + Send + Sync>;
110
111/// Per-producer-node state owned by the [`ProducerBinding`] impl.
112///
113/// Holds upstream `Subscription`s (auto-dropped on producer
114/// deactivation) plus an optional `Box<dyn Any>` slot for op-specific
115/// state shared across the build closure and its sink closures.
116/// (Most ops capture state via `Arc<Mutex<...>>` directly in closure
117/// captures; the `op_state` slot is reserved for ops that prefer
118/// trait-object storage.)
119#[derive(Default)]
120pub struct ProducerNodeState {
121 /// Subscriptions to upstream sources, taken by
122 /// [`ProducerCtx::subscribe_to`]. Dropped on producer deactivation.
123 pub subs: Vec<Subscription>,
124 /// Optional op-specific scratch (rarely used; most ops capture
125 /// state via closure).
126 pub op_state: Option<Box<dyn Any + Send + Sync>>,
127}
128
129/// Storage shared between the [`ProducerBinding`] impl and the
130/// [`ProducerCtx`] passed to build closures. Keyed by producer NodeId.
131///
132/// Access via `Arc<Mutex<_>>` so the binding's `producer_deactivate`
133/// hook can clear an entry while build/sink closures hold their own
134/// per-op state via separate Arc captures.
135pub type ProducerStorage = Arc<Mutex<HashMap<NodeId, ProducerNodeState>>>;
136
137/// Closure-registration interface for producer-shape operators —
138/// extends [`BindingBoundary`] with one method that bindings shipping
139/// producers must implement.
140///
141/// Bindings that don't ship producers (e.g., minimal test bindings)
142/// don't need to implement this trait. The operator factories below
143/// (`zip`, `concat`, `race`, `take_until`) require it.
144pub trait ProducerBinding: BindingBoundary {
145 /// Register a producer build closure. The returned [`FnId`] is
146 /// passed to [`Core::register_producer`]; on first activation,
147 /// Core invokes [`BindingBoundary::invoke_fn`] which the binding
148 /// dispatches to the registered build closure.
149 fn register_producer_build(&self, build: ProducerBuildFn) -> FnId;
150
151 /// Access the binding's producer-state storage. Used by
152 /// [`ProducerCtx::subscribe_to`] to push subscriptions into the
153 /// per-node entry, and by the binding's `producer_deactivate`
154 /// impl to drop the entry on last unsubscribe.
155 fn producer_storage(&self) -> &ProducerStorage;
156}
157
158/// Context handed to a producer's build closure on activation.
159///
160/// Provides:
161/// - [`Self::node_id`] / [`Self::core`] — identity + Core access for
162/// sink callbacks that re-enter Core.
163/// - [`Self::subscribe_to`] — subscribe to an upstream Core node;
164/// the resulting `Subscription` is auto-tracked under
165/// `node_id` in the binding's producer storage and dropped on
166/// producer deactivation.
167pub struct ProducerCtx<'a> {
168 node_id: NodeId,
169 core: &'a Core,
170 storage: &'a ProducerStorage,
171}
172
173impl<'a> ProducerCtx<'a> {
174 /// Construct a new context for the binding's `invoke_fn` dispatch
175 /// to call build closures. Internal — bindings call this; user
176 /// code receives the constructed ctx via the build closure's arg.
177 pub fn new(node_id: NodeId, core: &'a Core, storage: &'a ProducerStorage) -> Self {
178 Self {
179 node_id,
180 core,
181 storage,
182 }
183 }
184
185 /// The producer node's id.
186 #[must_use]
187 pub fn node_id(&self) -> NodeId {
188 self.node_id
189 }
190
191 /// The Core dispatcher. Sink closures use this to re-enter Core —
192 /// `core.emit(self.node_id(), h)` to emit a value, etc.
193 #[must_use]
194 pub fn core(&self) -> &Core {
195 self.core
196 }
197
198 /// Subscribe `sink` to upstream `source`. The `Subscription` is
199 /// auto-tracked under the producer's `node_id`; on producer
200 /// deactivation, the binding drops the storage entry, which drops
201 /// the Subscription, which unsubscribes the sink.
202 ///
203 /// **Phase H+ STRICT (D115, 2026-05-10):** uses `try_subscribe`
204 /// to attempt the subscription. On partition order violation, the
205 /// subscribe is deferred to wave-end via
206 /// `DeferredProducerOp::Callback` — the deferred callback runs
207 /// after all partition wave_owners are released (no partitions
208 /// held → safe to acquire any partition).
209 ///
210 /// **R2.2.7.b (D118, 2026-05-10):** if the upstream is
211 /// non-resubscribable AND already terminated, `try_subscribe`
212 /// returns `Err(SubscribeError::TornDown)`. /qa F2 (2026-05-10):
213 /// the rejection is now surfaced to the caller via
214 /// [`SubscribeOutcome::Dead`] so the operator can apply its
215 /// per-op dead-source semantics — pre-F2 the rejection was
216 /// silently swallowed, leaving operators wedged (zip waiting on a
217 /// queue that would never fill, concat stuck on a source that
218 /// would never advance, etc.). See [`SubscribeOutcome::Dead`] for
219 /// per-operator guidance.
220 pub fn subscribe_to(&self, source: NodeId, sink: Sink) -> SubscribeOutcome {
221 let sink_for_defer = sink.clone();
222 match self.core.try_subscribe(source, sink) {
223 Ok(sub) => {
224 self.storage
225 .lock()
226 .entry(self.node_id)
227 .or_default()
228 .subs
229 .push(sub);
230 SubscribeOutcome::Live
231 }
232 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
233 let core = self.core.clone();
234 let core_for_callback = core.clone();
235 let storage = self.storage.clone();
236 let node_id = self.node_id;
237 // F2 /qa (2026-05-10): deferred Callback uses
238 // `try_subscribe` (not the panicking `subscribe`) so a
239 // source that became non-resubscribable + terminal
240 // between the original defer-queue push and the
241 // wave-end drain doesn't crash the binding boundary.
242 // On TornDown at drain time, the producer-layer's
243 // SubscribeOutcome::Dead path was never reached at
244 // subscribe-time (because we deferred); we silently
245 // drop the deferred sub here. Per-operator dead-source
246 // semantics rely on the subscribe-time outcome — if
247 // the source went terminal during the defer window,
248 // the operator's other inputs / lifecycle paths must
249 // handle it (e.g., zip with one Live + one
250 // raced-to-Dead source would still emit until the
251 // dead one Complete-cascades to the operator via
252 // other state, which it doesn't here — but the
253 // condition requires concurrent termination during
254 // wave-end drain which is a narrow window).
255 core.push_deferred_producer_op(graphrefly_core::DeferredProducerOp::Callback(
256 Box::new(move || {
257 match core_for_callback.try_subscribe(source, sink_for_defer) {
258 Ok(sub) => {
259 storage.lock().entry(node_id).or_default().subs.push(sub);
260 }
261 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
262 // Source became Dead during the defer
263 // window — silently drop (see comment
264 // above).
265 }
266 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
267 // Should never happen: the deferred
268 // Callback drains AFTER wave_guards
269 // release, so partition acquisition
270 // can't fail. Surface as a panic if
271 // it ever does.
272 panic!(
273 "deferred producer-op Callback: partition-order \
274 violation at wave-end drain — substrate invariant \
275 broken (wave_guards still held during drain)"
276 );
277 }
278 }
279 }),
280 ));
281 SubscribeOutcome::Deferred
282 }
283 Err(graphrefly_core::SubscribeError::TornDown { node }) => {
284 SubscribeOutcome::Dead { node }
285 }
286 }
287 }
288}
289
290/// Default helper — drop the producer's storage entry on
291/// deactivation. Bindings can call this from their
292/// [`BindingBoundary::producer_deactivate`] impl to get the standard
293/// auto-cleanup behavior.
294pub fn default_producer_deactivate(storage: &ProducerStorage, node_id: NodeId) {
295 let mut states = storage.lock();
296 states.remove(&node_id);
297}
298
299// =====================================================================
300// Producer-shape operators (D-ops, Slice D Commit 2)
301// =====================================================================
302//
303// All four producer ops follow the same shape:
304//
305// 1. Operator factory captures `Core::clone()` + sources + per-op state
306// (Arc<Mutex<...>>) into a build closure.
307// 2. `register_producer_build` returns a FnId.
308// 3. `Core::register_producer(fn_id)` creates the producer node.
309// 4. On first subscribe, Core fires invoke_fn → binding dispatches to
310// the build closure → ProducerCtx is constructed.
311// 5. Build closure subscribes to each upstream source, providing sink
312// closures that capture per-op state and the producer's NodeId.
313// 6. Sink closures process upstream emissions and emit on the producer
314// node via `core.emit` / `core.complete` / `core.error`.
315// 7. On last subscriber unsubscribe, Core fires producer_deactivate →
316// binding drops storage entry → Subscription Vec drops → sinks
317// unsub from upstream.
318//
319// The concrete operators (`zip` / `concat` / `race` / `take_until`)
320// live in [`super::ops_impl`] (sibling module) and are re-exported
321// from the crate root.