Skip to main content

graphrefly_operators/
producer.rs

1//! Producer-shape operator substrate (Slice D-ops, Commit 2).
2//!
3//! Producer ops (zip / concat / race / takeUntil) are nodes with no
4//! declared deps that fire their fn ONCE on first activation. The fn
5//! body subscribes to upstream sources via [`ProducerCtx::subscribe_to`]
6//! and registers per-op state (queues, phase flags, winner index). When
7//! upstream emits, the operator's sink closures re-enter Core via
8//! `Core::emit` / `Core::complete` / `Core::error` on the producer node.
9//!
10//! On last-subscriber unsubscribe, Core invokes
11//! [`BindingBoundary::producer_deactivate(node_id)`](graphrefly_core::BindingBoundary::producer_deactivate);
12//! the binding's impl drops the per-node entry from its
13//! `producer_states` map, which cascades:
14//!
15//! ```text
16//! producer_states.remove(node_id)  →
17//!   Vec<Subscription> drops          →
18//!     each Subscription::Drop fires  →
19//!       upstream sinks unsubscribe.
20//! ```
21//!
22//! # Reference-cycle discipline (Slice Y, 2026-05-08)
23//!
24//! Build closures registered via
25//! [`ProducerBinding::register_producer_build`] are stored long-term in
26//! the binding's `producer_builds` registry. To avoid the strong-Arc
27//! cycle `BenchBinding → registry → producer_builds[fn_id] → closure →
28//! strong-Arc<dyn ProducerBinding> → BenchBinding`, factory bodies
29//! (`zip` / `concat` / `race` / `take_until` in `ops_impl.rs` plus
30//! `switch_map` / `exhaust_map` / `merge_map` / `concat_map` in
31//! `higher_order.rs`) capture `WeakCore` and
32//! `Weak<dyn ProducerBinding>` (and `Weak<dyn HigherOrderBinding>`
33//! for the higher-order factories). The build closure upgrades both
34//! on each invocation; if the host `Core` was already dropped, upgrade
35//! returns `None` and the build closure no-ops cleanly.
36//!
37//! Sinks spawned by the build closure capture STRONG refs cloned from
38//! the upgraded weaks. Their lifetime is tied to the producer's active
39//! subscription — `producer_deactivate` on last-subscriber unsubscribe
40//! clears `producer_storage[node_id]`, dropping the upstream
41//! `Subscription`s, which drops the sinks, which drops the strong
42//! captures. So the strong-ref window is bounded by producer-active
43//! state, not by the long-lived `producer_builds` registry.
44
45use std::any::Any;
46use std::sync::Arc;
47
48use ahash::AHashMap as HashMap;
49use parking_lot::Mutex;
50
51use graphrefly_core::{
52    BindingBoundary, Core, CoreFull, FnId, HandleId, NodeId, Sink, SubscriptionId,
53};
54
55/// Outcome of [`ProducerCtx::subscribe_to`] — the producer-layer
56/// translation of [`graphrefly_core::SubscribeError`] into a positive
57/// outcome enum that operators (zip / concat / race / take_until /
58/// merge_map / switch_map / exhaust_map / concat_map) can match on for
59/// per-operator dead-source semantics.
60///
61/// Introduced /qa F2 (2026-05-10) to close the silent-wedge class of
62/// bugs where operators previously couldn't tell that a `subscribe_to`
63/// call had been rejected per R2.2.7.b (non-resubscribable terminal
64/// source) — pre-F2 the rejection was logged-and-skipped silently,
65/// which left zip waiting for a queue that would never fill, concat
66/// stuck on a source that would never advance, etc.
67///
68/// Mirrors the per-domain status-string-union pattern used in TS
69/// (`RefineStatus`, `AgentStatus`, process status: `"running" |
70/// "completed" | "errored" | "cancelled"`) — each operator-layer
71/// outcome lives in its own typed enum rather than sharing a global
72/// `Outcome<T, E>` type.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum SubscribeOutcome {
75    /// Subscription installed successfully. The
76    /// [`ProducerNodeState`] holds the [`Subscription`]; no further
77    /// operator action required.
78    Live,
79    /// Subscription was deferred to wave-end via the
80    /// [`graphrefly_core::DeferredProducerOp::Callback`] queue (Phase
81    /// H+ STRICT, D115). The deferred callback installs the
82    /// subscription after wave_guards release. Operators MAY treat
83    /// this as `Live` for lifecycle bookkeeping — the subscription
84    /// WILL be installed; just not yet.
85    Deferred,
86    /// The target node is non-resubscribable AND has terminated
87    /// (R2.2.7.b, D118). The sink will NOT be installed. Operators
88    /// MUST handle this per their semantics:
89    ///
90    /// - **zip / take_until (source)**: self-Complete (tuple stream
91    ///   can never form; take_until's source is gone).
92    /// - **concat**: advance to the next source (treat as inner
93    ///   Complete signal).
94    /// - **race**: mark `completed[idx] = true`; if all sources are
95    ///   Dead/Complete, self-Complete.
96    /// - **take_until (notifier)**: ignore (notifier signal will
97    ///   never fire; take_until reduces to a passthrough of source).
98    /// - **switch_map / exhaust_map / concat_map / merge_map (inner)**:
99    ///   treat as immediate `on_inner_complete` — decrement active,
100    ///   advance to next, check self-Complete trigger.
101    Dead {
102        /// The dead node that rejected the subscribe.
103        node: NodeId,
104    },
105}
106
107/// Build closure type — the producer's fn body, called once on first
108/// activation. The closure receives a [`ProducerCtx`] for setting up
109/// upstream subscriptions; emissions on the producer come from sink
110/// callbacks the closure registers.
111pub type ProducerBuildFn = Box<dyn Fn(ProducerCtx<'_>) + Send + Sync>;
112
113/// Per-producer-node state owned by the [`ProducerBinding`] impl.
114///
115/// Holds upstream `Subscription`s (auto-dropped on producer
116/// deactivation) plus an optional `Box<dyn Any>` slot for op-specific
117/// state shared across the build closure and its sink closures.
118/// (Most ops capture state via `Arc<Mutex<...>>` directly in closure
119/// captures; the `op_state` slot is reserved for ops that prefer
120/// trait-object storage.)
121#[derive(Default)]
122pub struct ProducerNodeState {
123    /// Recorded upstream `(source_node, sub_id)` pairs taken by
124    /// [`ProducerCtx::subscribe_to`]. S2b/D229: core-level RAII
125    /// `Subscription` is retired — these are explicitly unsubscribed by
126    /// the binding's [`BindingBoundary::producer_deactivate`] impl via
127    /// the owner-supplied `unsub` closure (see
128    /// [`default_producer_deactivate`]), behaviour-identical to the old
129    /// `Vec<Subscription>`-drop cascade.
130    pub subs: Vec<(NodeId, SubscriptionId)>,
131    /// Optional op-specific scratch (rarely used; most ops capture
132    /// state via closure).
133    pub op_state: Option<Box<dyn Any + Send + Sync>>,
134}
135
136/// Storage shared between the [`ProducerBinding`] impl and the
137/// [`ProducerCtx`] passed to build closures. Keyed by producer NodeId.
138///
139/// Access via `Arc<Mutex<_>>` so the binding's `producer_deactivate`
140/// hook can clear an entry while build/sink closures hold their own
141/// per-op state via separate Arc captures.
142pub type ProducerStorage = Arc<Mutex<HashMap<NodeId, ProducerNodeState>>>;
143
144/// Closure-registration interface for producer-shape operators —
145/// extends [`BindingBoundary`] with one method that bindings shipping
146/// producers must implement.
147///
148/// Bindings that don't ship producers (e.g., minimal test bindings)
149/// don't need to implement this trait. The operator factories below
150/// (`zip`, `concat`, `race`, `take_until`) require it.
151pub trait ProducerBinding: BindingBoundary {
152    /// Register a producer build closure. The returned [`FnId`] is
153    /// passed to [`Core::register_producer`]; on first activation,
154    /// Core invokes [`BindingBoundary::invoke_fn`] which the binding
155    /// dispatches to the registered build closure.
156    fn register_producer_build(&self, build: ProducerBuildFn) -> FnId;
157
158    /// Access the binding's producer-state storage. Used by
159    /// [`ProducerCtx::subscribe_to`] to push subscriptions into the
160    /// per-node entry, and by the binding's `producer_deactivate`
161    /// impl to drop the entry on last unsubscribe.
162    fn producer_storage(&self) -> &ProducerStorage;
163}
164
165/// Sink-side emit handle (S2b / D231 / D232-AMEND/A′).
166///
167/// Producer build closures spawn long-lived `Sink`s that fire on every
168/// future upstream emit — long after the build closure's `&Core`
169/// (`ctx`) is gone. Under the actor model the `Core` is owned by value
170/// and relocates between workers, so sinks can no longer capture a
171/// cloned `Core` / `WeakCore`. Instead they capture a `ProducerEmitter`
172/// (cheap `Clone`: two `Arc`s) and post `MailboxOp`s to the
173/// `Core`-owned [`graphrefly_core::CoreMailbox`]; the `BatchGuard`
174/// drain-to-quiescence loop applies them **in-wave** via the sync
175/// `Core::{emit,complete,error}` (immediate, cascade-ordering-preserving
176/// — D232-AMEND).
177///
178/// Method names mirror the old `Core::{emit,complete,error}_or_defer`
179/// so sink bodies are unchanged: only the captured handle's
180/// construction differs (`em = ctx.emitter()` instead of
181/// `core_s.clone()`).
182/// The **`Send + Sync` cross-thread** producer emit handle (D249/S2c).
183///
184/// Holds only the id-only `Arc<CoreMailbox>` post side + the binding
185/// (for the `Core`-gone handle-release branch). This is what an
186/// autonomous timer task (`temporal.rs`, `tokio::spawn`-ed) captures —
187/// it stays `Send` so the spawned future is `Send`. It deliberately
188/// has **no `defer`** (that is the `!Send` owner-side path; see
189/// [`ProducerEmitter`]).
190#[derive(Clone)]
191pub struct MailboxEmitter {
192    mailbox: Arc<graphrefly_core::CoreMailbox>,
193    /// For the `Core`-gone branch only: if the owning `Core` already
194    /// dropped (mailbox closed), an `Emit`/`Error` payload handle would
195    /// leak — release it (mirrors `timer.rs`'s post-`false` path).
196    binding: Arc<dyn BindingBoundary>,
197}
198
199impl MailboxEmitter {
200    /// Post an `Emit`. If the owning `Core` is gone, release `handle`
201    /// (it held a retain for the would-be payload) — no leak.
202    pub fn emit_or_defer(&self, node_id: NodeId, handle: HandleId) {
203        if !self.mailbox.post_emit(node_id, handle) {
204            self.binding.release_handle(handle);
205        }
206    }
207
208    /// Post a `Complete`. No payload handle; `Core`-gone is a no-op.
209    pub fn complete_or_defer(&self, node_id: NodeId) {
210        let _ = self.mailbox.post_complete(node_id);
211    }
212
213    /// Post an `Error`. If the owning `Core` is gone, release the error
214    /// payload `handle` — no leak.
215    pub fn error_or_defer(&self, node_id: NodeId, handle: HandleId) {
216        if !self.mailbox.post_error(node_id, handle) {
217            self.binding.release_handle(handle);
218        }
219    }
220
221    /// Post a **`Send`** cross-thread owner-side closure (D233/D249).
222    /// For an autonomous timer task (`temporal.rs` `window_time`) doing
223    /// task-side topology mutation that must run owner-side in FIFO
224    /// order — the closure captures only `Send` state, so it rides the
225    /// `Send + Sync` `CoreMailbox`. Returns `false` iff the `Core` is
226    /// gone (closure dropped unrun; release any captured handles).
227    #[must_use = "a `false` return means the Core is gone and the closure was dropped unrun; release any handles it captured"]
228    pub fn defer(&self, f: impl FnOnce(&dyn graphrefly_core::CoreFull) + Send + 'static) -> bool {
229        self.mailbox.post_defer(Box::new(f))
230    }
231
232    /// Whether the owning `Core` has dropped (mailbox closed) — for
233    /// prompt timer-task shutdown (see [`ProducerEmitter::is_core_gone`]).
234    #[must_use]
235    pub fn is_core_gone(&self) -> bool {
236        self.mailbox.is_closed()
237    }
238}
239
240/// The owner-side producer handle (D249/S2c). `MailboxEmitter` (the
241/// `Send` cross-thread emit side) **plus** the owner-only `!Send`
242/// `Rc<DeferQueue>` for [`Self::defer`]. Captured into owner-side
243/// `!Send` producer sinks (control/higher-order dynamic-inner); the
244/// `Rc` makes it `!Send`, consistent with the D248 single-owner `Sink`
245/// relaxation. A timer task that needs only the cross-thread emit side
246/// takes [`Self::emitter`] (a `Send` [`MailboxEmitter`]) instead.
247#[derive(Clone)]
248pub struct ProducerEmitter {
249    emitter: MailboxEmitter,
250    /// Owner-side `!Send` `Defer` queue split off `CoreMailbox`
251    /// (D249/S2c).
252    deferred: std::rc::Rc<graphrefly_core::DeferQueue>,
253}
254
255impl ProducerEmitter {
256    /// Construct directly from any `&Core` (S2b). Used by the
257    /// **binding-layer RAII** convenience (D228-A): a test harness /
258    /// napi `BenchCore` that co-owns the `Core` builds a [`SubGuard`]
259    /// over `core.subscribe(...)`'s returned `SubscriptionId` so drop
260    /// schedules the unsubscribe — the sanctioned replacement for the
261    /// retired core-level RAII `Subscription`.
262    #[must_use]
263    pub fn for_core(core: &Core) -> Self {
264        Self {
265            emitter: MailboxEmitter {
266                mailbox: core.mailbox(),
267                binding: core.binding(),
268            },
269            deferred: core.defer_queue(),
270        }
271    }
272
273    /// Construct from the object-safe [`CoreFull`] facade (D246 r5 /
274    /// D245). Used by [`ProducerCtx::emitter`] now that the ctx holds
275    /// `&dyn CoreFull` rather than a concrete `&Core`.
276    #[must_use]
277    pub fn from_corefull(core: &dyn CoreFull) -> Self {
278        Self {
279            emitter: MailboxEmitter {
280                mailbox: core.mailbox(),
281                binding: core.binding(),
282            },
283            deferred: core.defer_queue(),
284        }
285    }
286
287    /// The `Send` cross-thread emit sub-handle — for autonomous timer
288    /// tasks (`temporal.rs`, `tokio::spawn`) that only emit/complete/
289    /// error and must keep their spawned future `Send` (D249/S2c).
290    #[must_use]
291    pub fn emitter(&self) -> MailboxEmitter {
292        self.emitter.clone()
293    }
294
295    /// Post an `Emit`. If the owning `Core` is gone, release `handle`
296    /// (it held a retain for the would-be payload) — no leak.
297    pub fn emit_or_defer(&self, node_id: NodeId, handle: HandleId) {
298        self.emitter.emit_or_defer(node_id, handle);
299    }
300
301    /// Post a `Complete`. No payload handle; `Core`-gone is a no-op.
302    pub fn complete_or_defer(&self, node_id: NodeId) {
303        self.emitter.complete_or_defer(node_id);
304    }
305
306    /// Post an `Error`. If the owning `Core` is gone, release the error
307    /// payload `handle` — no leak.
308    pub fn error_or_defer(&self, node_id: NodeId, handle: HandleId) {
309        self.emitter.error_or_defer(node_id, handle);
310    }
311
312    /// Post an owner-side closure (D233) given the full object-safe
313    /// `Core` surface — for sinks that must perform value-returning
314    /// topology mutation (windowing `create_window_node`, higher-order
315    /// dynamic-inner `subscribe`). Runs **in-wave** (the drain loop
316    /// holds `&Core`); the closure consumes any returned
317    /// `NodeId`/`SubscriptionId` to drive its captured op-state.
318    ///
319    /// Returns `false` iff the owning `Core` is already gone — the
320    /// closure is dropped **unrun** (running `CoreFull` on a half-dropped
321    /// `Core` is unsound; user-locked QA decision A). QA F2 (2026-05-18):
322    /// this now surfaces the `Core`-gone signal (was a silent
323    /// `let _ = …`) so a caller whose closure captured retained
324    /// `HandleId`s can release them on `false` — mirroring the
325    /// `emit_or_defer` / `error_or_defer` release-on-`false` contract.
326    /// The not-yet-written windowing / higher-order callers MUST honour
327    /// this (release captured payload handles when it returns `false`).
328    #[must_use = "a `false` return means the Core is gone and the closure was dropped unrun; release any handles it captured"]
329    pub fn defer(&self, f: impl FnOnce(&dyn graphrefly_core::CoreFull) + 'static) -> bool {
330        self.deferred.post(Box::new(f))
331    }
332
333    /// Whether the owning `Core` has dropped (mailbox closed). Lets a
334    /// long-lived task stop promptly + release any handle it holds
335    /// (preserves the old `WeakCore::upgrade() == None` promptness).
336    /// NOT required for leak-safety (`*_or_defer` already releases on a
337    /// closed post) — only for prompt task shutdown.
338    #[must_use]
339    pub fn is_core_gone(&self) -> bool {
340        self.emitter.is_core_gone()
341    }
342}
343
344/// Binding-layer RAII subscription handle (S2b / D225 / D234). The
345/// core-level RAII `Subscription` was retired (a parameterless `Drop`
346/// can't reach a relocating owned `Core`); this wrapper IS the
347/// sanctioned binding-layer replacement for *substrate operators* that
348/// manage an inner subscription's lifetime by ownership (higher-order
349/// `switch/exhaust/merge/concat_map` inner subs). It holds a
350/// `ProducerEmitter` (an `Arc<CoreMailbox>` — `Send + Sync`, `'static`,
351/// NOT the `Core`), so its `Drop` legitimately posts a deferred
352/// `unsubscribe` via `em.defer` (owner-side, in-wave, FIFO-ordered —
353/// D234). FIFO ordering gives the correct cancel-then-resubscribe
354/// semantics: a `SubGuard` dropped before a new subscribe is posted is
355/// drained (unsub) before the new subscribe. A `Core`-gone post is
356/// dropped unrun (subscription moot at teardown — no leak).
357#[must_use = "dropping a SubGuard schedules the inner unsubscribe"]
358pub struct SubGuard {
359    node: NodeId,
360    sub: SubscriptionId,
361    em: ProducerEmitter,
362}
363
364impl SubGuard {
365    /// Track `sub` (returned by `CoreFull::try_subscribe` on `node`) so
366    /// dropping this guard unsubscribes it.
367    pub fn new(node: NodeId, sub: SubscriptionId, em: ProducerEmitter) -> Self {
368        Self { node, sub, em }
369    }
370}
371
372impl Drop for SubGuard {
373    fn drop(&mut self) {
374        let (n, s) = (self.node, self.sub);
375        // D234: post the unsubscribe owner-side, in-wave, FIFO-ordered
376        // (so a cancel-before-resubscribe drains in order). Dropped
377        // unrun if the Core is already gone — the sub is moot then.
378        let _ = self.em.defer(move |c| c.unsubscribe(n, s));
379    }
380}
381
382/// Context handed to a producer's build closure on activation.
383///
384/// Provides:
385/// - [`Self::node_id`] / [`Self::core`] — identity + Core access for
386///   sink callbacks that re-enter Core.
387/// - [`Self::subscribe_to`] — subscribe to an upstream Core node;
388///   the resulting `Subscription` is auto-tracked under
389///   `node_id` in the binding's producer storage and dropped on
390///   producer deactivation.
391pub struct ProducerCtx<'a> {
392    node_id: NodeId,
393    core: &'a dyn CoreFull,
394    storage: &'a ProducerStorage,
395}
396
397impl<'a> ProducerCtx<'a> {
398    /// Construct a new context for the binding's `invoke_fn` dispatch
399    /// to call build closures. Internal — bindings call this; user
400    /// code receives the constructed ctx via the build closure's arg.
401    ///
402    /// D246 r5 / D245: takes `&dyn CoreFull` — the one object-safe Core
403    /// facade Core hands the binding via
404    /// [`graphrefly_core::BindingBoundary::invoke_fn_with_core`]. A
405    /// concrete `&Core` unsized-coerces to `&dyn CoreFull` at the call
406    /// site, so existing `&Core`-holding call sites pass it directly
407    /// (`ProducerCtx::new(node, &core, &storage)`). `ProducerCtx` only
408    /// needs `subscribe`/`try_subscribe`/`register_*`/`emit`/`mailbox`/
409    /// `binding`/`*_or_defer` — all on `CoreFull` — so no concrete
410    /// `Core` / thread-local / stored back-reference is required.
411    pub fn new(node_id: NodeId, core: &'a dyn CoreFull, storage: &'a ProducerStorage) -> Self {
412        Self {
413            node_id,
414            core,
415            storage,
416        }
417    }
418
419    /// The producer node's id.
420    #[must_use]
421    pub fn node_id(&self) -> NodeId {
422        self.node_id
423    }
424
425    /// The Core dispatcher, as the object-safe [`CoreFull`] facade
426    /// (D246 r5 / D245). **Build-closure-side only** — valid only for
427    /// the duration of the build call (the `Core` relocates; D231).
428    /// Long-lived sinks must use [`Self::emitter`] instead. Carries
429    /// everything a build closure uses (`subscribe`/`try_subscribe`/
430    /// `register_*`/`emit`/`binding`/`*_or_defer`) without naming the
431    /// concrete cell type.
432    #[must_use]
433    pub fn core(&self) -> &dyn CoreFull {
434        self.core
435    }
436
437    /// Sink-side emit handle (D232-AMEND/A′). Cheap-`Clone`; capture it
438    /// into spawned sink closures and call
439    /// `emit_or_defer`/`complete_or_defer`/`error_or_defer` exactly as
440    /// the old cloned-`Core` did — ops post to the `Core`-owned mailbox
441    /// and are applied in-wave by the drain-to-quiescence loop.
442    #[must_use]
443    pub fn emitter(&self) -> ProducerEmitter {
444        ProducerEmitter::from_corefull(self.core)
445    }
446
447    /// The binding's per-producer state storage (S2b). Replaces
448    /// `binding.producer_storage()` for build closures / spawned sinks
449    /// that track their own upstream subscriptions or per-op state:
450    /// under D231 the build closure no longer holds a
451    /// `Arc<dyn ProducerBinding>` (only `ctx`'s borrowed `&Core` +
452    /// `&ProducerStorage`), and a sink can't reach `ProducerBinding`
453    /// either. The returned `ProducerStorage` is
454    /// `Arc<Mutex<…>>` — `'static` + cheap-`Clone`, so it can be
455    /// captured into long-lived sink closures (exactly how the old code
456    /// captured `binding.producer_storage().clone()`).
457    #[must_use]
458    pub fn storage(&self) -> ProducerStorage {
459        self.storage.clone()
460    }
461
462    /// Subscribe `sink` to upstream `source`. The `Subscription` is
463    /// auto-tracked under the producer's `node_id`; on producer
464    /// deactivation, the binding drops the storage entry, which drops
465    /// the Subscription, which unsubscribes the sink.
466    ///
467    /// **Phase H+ STRICT (D115, 2026-05-10):** uses `try_subscribe`
468    /// to attempt the subscription. On partition order violation, the
469    /// subscribe is deferred to wave-end via
470    /// `DeferredProducerOp::Callback`. (S2c/D248 single-owner: the
471    /// per-partition `wave_owner` `ReentrantMutex`es are deleted —
472    /// there is no cross-thread interleaving wave to serialize — so
473    /// the deferred callback simply runs owner-side at wave-end with
474    /// no lock acquisition.)
475    ///
476    /// **R2.2.7.b (D118, 2026-05-10):** if the upstream is
477    /// non-resubscribable AND already terminated, `try_subscribe`
478    /// returns `Err(SubscribeError::TornDown)`. /qa F2 (2026-05-10):
479    /// the rejection is now surfaced to the caller via
480    /// [`SubscribeOutcome::Dead`] so the operator can apply its
481    /// per-op dead-source semantics — pre-F2 the rejection was
482    /// silently swallowed, leaving operators wedged (zip waiting on a
483    /// queue that would never fill, concat stuck on a source that
484    /// would never advance, etc.). See [`SubscribeOutcome::Dead`] for
485    /// per-operator guidance.
486    pub fn subscribe_to(&self, source: NodeId, sink: Sink) -> SubscribeOutcome {
487        let sink_for_defer = sink.clone();
488        match self.core.try_subscribe(source, sink) {
489            Ok(sub) => {
490                // S2b/D229: record `(source, sub_id)` for explicit
491                // owner-driven unsubscribe at `producer_deactivate`.
492                self.storage
493                    .lock()
494                    .entry(self.node_id)
495                    .or_default()
496                    .subs
497                    .push((source, sub));
498                SubscribeOutcome::Live
499            }
500            Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
501                // S2b (D223/D231): the old code boxed a cloned-`Core`
502                // `DeferredProducerOp::Callback` only for
503                // `push_deferred_producer_op` to run it *immediately*
504                // (the deferred queue is a deleted D211 no-op shim — see
505                // `node::push_deferred_producer_op`). `Core` is no longer
506                // `Clone`; an inline retry on `self.core` is
507                // behaviour-identical (the prior path was already
508                // immediate). F2 /qa: still `try_subscribe` (not the
509                // panicking `subscribe`) so a source that raced to
510                // non-resubscribable+terminal doesn't crash the boundary.
511                match self.core.try_subscribe(source, sink_for_defer) {
512                    Ok(sub) => {
513                        self.storage
514                            .lock()
515                            .entry(self.node_id)
516                            .or_default()
517                            .subs
518                            .push((source, sub));
519                    }
520                    Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
521                        // Source became Dead during the (now-immediate)
522                        // retry — silently drop, as before.
523                    }
524                    Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
525                        // The original deferral existed to retry with no
526                        // partition held; the D211 shim already made it
527                        // immediate, so a second order violation here is
528                        // the same substrate-invariant break the old
529                        // wave-end-drain panic guarded.
530                        panic!(
531                            "producer-op subscribe retry: partition-order violation — \
532                             substrate invariant broken (wave_guards still held)"
533                        );
534                    }
535                }
536                SubscribeOutcome::Deferred
537            }
538            Err(graphrefly_core::SubscribeError::TornDown { node }) => {
539                SubscribeOutcome::Dead { node }
540            }
541        }
542    }
543}
544
545/// Default helper — explicitly unsubscribe the producer's recorded
546/// upstream subs, then drop its storage entry, on deactivation.
547///
548/// S2b/D229: core-level RAII `Subscription` is retired, so the binding's
549/// [`BindingBoundary::producer_deactivate`] impl receives a
550/// `Core::unsubscribe`-capable `unsub` closure (the owner-driven chain
551/// passes it the `&Core` it already holds). Looping it over the recorded
552/// `(source, sub_id)` pairs is behaviour-identical to the old
553/// `Vec<Subscription>`-drop cascade (same deregister + Phase-G chain,
554/// lock-released so re-entrant producer cascades are safe).
555///
556/// Ordering (QA F3, 2026-05-18 — corrected from an earlier
557/// remove-AFTER comment that contradicted the code): the entry is
558/// **taken out under the `storage` lock FIRST**, then the `unsub`
559/// cascade runs lock-released over the moved-out `subs`. This is
560/// behaviour-identical to the retired path (old code did
561/// `states.remove(&node_id)` and the dropped `Vec<Subscription>`'s
562/// `Drop` ran the cascade — i.e. remove-then-cascade). Because the
563/// entry is already gone before any re-entrant call, a re-entrant
564/// `subscribe_to(node_id, …)` *during* the cascade `or_default()`s a
565/// **fresh** entry that correctly survives this deactivation (a
566/// genuine re-subscription) — there is never a half-cleared entry to
567/// observe. Do NOT reorder to remove-after-unsub: that *would* expose
568/// the live entry to the lock-released re-entrant cascade.
569pub fn default_producer_deactivate(
570    storage: &ProducerStorage,
571    node_id: NodeId,
572    unsub: &dyn Fn(NodeId, SubscriptionId),
573) {
574    // Take the entry out under the lock, then unsubscribe lock-released
575    // (the `unsub` closure re-enters Core; holding `storage` across it
576    // would risk a binding-vs-Core lock-order inversion).
577    let removed = storage.lock().remove(&node_id);
578    if let Some(state) = removed {
579        for (source, sub_id) in state.subs {
580            unsub(source, sub_id);
581        }
582    }
583}
584
585// =====================================================================
586// Producer-shape operators (D-ops, Slice D Commit 2)
587// =====================================================================
588//
589// All four producer ops follow the same shape:
590//
591// 1. Operator factory captures `Core::clone()` + sources + per-op state
592//    (Arc<Mutex<...>>) into a build closure.
593// 2. `register_producer_build` returns a FnId.
594// 3. `Core::register_producer(fn_id)` creates the producer node.
595// 4. On first subscribe, Core fires invoke_fn → binding dispatches to
596//    the build closure → ProducerCtx is constructed.
597// 5. Build closure subscribes to each upstream source, providing sink
598//    closures that capture per-op state and the producer's NodeId.
599// 6. Sink closures process upstream emissions and emit on the producer
600//    node via `core.emit` / `core.complete` / `core.error`.
601// 7. On last subscriber unsubscribe, Core fires producer_deactivate →
602//    binding drops storage entry → Subscription Vec drops → sinks
603//    unsub from upstream.
604//
605// The concrete operators (`zip` / `concat` / `race` / `take_until`)
606// live in [`super::ops_impl`] (sibling module) and are re-exported
607// from the crate root.