graphrefly_operators/producer.rs
1//! Producer-shape operator substrate (Slice D-ops, Commit 2).
2//!
3//! Producer ops (zip / concat / race / takeUntil) are nodes with no
4//! declared deps that fire their fn ONCE on first activation. The fn
5//! body subscribes to upstream sources via [`ProducerCtx::subscribe_to`]
6//! and registers per-op state (queues, phase flags, winner index). When
7//! upstream emits, the operator's sink closures re-enter Core via
8//! `Core::emit` / `Core::complete` / `Core::error` on the producer node.
9//!
10//! On last-subscriber unsubscribe, Core invokes
11//! [`BindingBoundary::producer_deactivate(node_id)`](graphrefly_core::BindingBoundary::producer_deactivate);
12//! the binding's impl drops the per-node entry from its
13//! `producer_states` map, which cascades:
14//!
15//! ```text
16//! producer_states.remove(node_id) →
17//! Vec<Subscription> drops →
18//! each Subscription::Drop fires →
19//! upstream sinks unsubscribe.
20//! ```
21//!
22//! # Reference-cycle discipline (Slice Y, 2026-05-08)
23//!
24//! Build closures registered via
25//! [`ProducerBinding::register_producer_build`] are stored long-term in
26//! the binding's `producer_builds` registry. To avoid the strong-Arc
27//! cycle `BenchBinding → registry → producer_builds[fn_id] → closure →
28//! strong-Arc<dyn ProducerBinding> → BenchBinding`, factory bodies
29//! (`zip` / `concat` / `race` / `take_until` in `ops_impl.rs` plus
30//! `switch_map` / `exhaust_map` / `merge_map` / `concat_map` in
31//! `higher_order.rs`) capture `WeakCore` and
32//! `Weak<dyn ProducerBinding>` (and `Weak<dyn HigherOrderBinding>`
33//! for the higher-order factories). The build closure upgrades both
34//! on each invocation; if the host `Core` was already dropped, upgrade
35//! returns `None` and the build closure no-ops cleanly.
36//!
37//! Sinks spawned by the build closure capture STRONG refs cloned from
38//! the upgraded weaks. Their lifetime is tied to the producer's active
39//! subscription — `producer_deactivate` on last-subscriber unsubscribe
40//! clears `producer_storage[node_id]`, dropping the upstream
41//! `Subscription`s, which drops the sinks, which drops the strong
42//! captures. So the strong-ref window is bounded by producer-active
43//! state, not by the long-lived `producer_builds` registry.
44
45use std::any::Any;
46use std::sync::Arc;
47
48use ahash::AHashMap as HashMap;
49use parking_lot::Mutex;
50
51use graphrefly_core::{
52 BindingBoundary, Core, CoreFull, FnId, HandleId, NodeId, Sink, SubscriptionId,
53};
54
55/// Outcome of [`ProducerCtx::subscribe_to`] — the producer-layer
56/// translation of [`graphrefly_core::SubscribeError`] into a positive
57/// outcome enum that operators (zip / concat / race / take_until /
58/// merge_map / switch_map / exhaust_map / concat_map) can match on for
59/// per-operator dead-source semantics.
60///
61/// Introduced /qa F2 (2026-05-10) to close the silent-wedge class of
62/// bugs where operators previously couldn't tell that a `subscribe_to`
63/// call had been rejected per R2.2.7.b (non-resubscribable terminal
64/// source) — pre-F2 the rejection was logged-and-skipped silently,
65/// which left zip waiting for a queue that would never fill, concat
66/// stuck on a source that would never advance, etc.
67///
68/// Mirrors the per-domain status-string-union pattern used in TS
69/// (`RefineStatus`, `AgentStatus`, process status: `"running" |
70/// "completed" | "errored" | "cancelled"`) — each operator-layer
71/// outcome lives in its own typed enum rather than sharing a global
72/// `Outcome<T, E>` type.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74pub enum SubscribeOutcome {
75 /// Subscription installed successfully. The
76 /// [`ProducerNodeState`] holds the [`Subscription`]; no further
77 /// operator action required.
78 Live,
79 /// Subscription was deferred to wave-end via the
80 /// [`graphrefly_core::DeferredProducerOp::Callback`] queue (Phase
81 /// H+ STRICT, D115). The deferred callback installs the
82 /// subscription after wave_guards release. Operators MAY treat
83 /// this as `Live` for lifecycle bookkeeping — the subscription
84 /// WILL be installed; just not yet.
85 Deferred,
86 /// The target node is non-resubscribable AND has terminated
87 /// (R2.2.7.b, D118). The sink will NOT be installed. Operators
88 /// MUST handle this per their semantics:
89 ///
90 /// - **zip / take_until (source)**: self-Complete (tuple stream
91 /// can never form; take_until's source is gone).
92 /// - **concat**: advance to the next source (treat as inner
93 /// Complete signal).
94 /// - **race**: mark `completed[idx] = true`; if all sources are
95 /// Dead/Complete, self-Complete.
96 /// - **take_until (notifier)**: ignore (notifier signal will
97 /// never fire; take_until reduces to a passthrough of source).
98 /// - **switch_map / exhaust_map / concat_map / merge_map (inner)**:
99 /// treat as immediate `on_inner_complete` — decrement active,
100 /// advance to next, check self-Complete trigger.
101 Dead {
102 /// The dead node that rejected the subscribe.
103 node: NodeId,
104 },
105}
106
107/// Build closure type — the producer's fn body, called once on first
108/// activation. The closure receives a [`ProducerCtx`] for setting up
109/// upstream subscriptions; emissions on the producer come from sink
110/// callbacks the closure registers.
111pub type ProducerBuildFn = Box<dyn Fn(ProducerCtx<'_>) + Send + Sync>;
112
113/// Per-producer-node state owned by the [`ProducerBinding`] impl.
114///
115/// Holds upstream `Subscription`s (auto-dropped on producer
116/// deactivation) plus an optional `Box<dyn Any>` slot for op-specific
117/// state shared across the build closure and its sink closures.
118/// (Most ops capture state via `Rc<RefCell<...>>` directly in closure
119/// captures; the `op_state` slot is reserved for ops that prefer
120/// trait-object storage.)
121#[derive(Default)]
122pub struct ProducerNodeState {
123 /// Recorded upstream `(source_node, sub_id)` pairs taken by
124 /// [`ProducerCtx::subscribe_to`]. S2b/D229: core-level RAII
125 /// `Subscription` is retired — these are explicitly unsubscribed by
126 /// the binding's [`BindingBoundary::producer_deactivate`] impl via
127 /// the owner-supplied `unsub` closure (see
128 /// [`default_producer_deactivate`]), behaviour-identical to the old
129 /// `Vec<Subscription>`-drop cascade.
130 pub subs: Vec<(NodeId, SubscriptionId)>,
131 /// Optional op-specific scratch (rarely used; most ops capture
132 /// state via closure).
133 pub op_state: Option<Box<dyn Any + Send + Sync>>,
134}
135
136/// Storage shared between the [`ProducerBinding`] impl and the
137/// [`ProducerCtx`] passed to build closures. Keyed by producer NodeId.
138///
139/// Cat-1/2 (D273): `ProducerStorage` is held inside `Arc<dyn
140/// ProducerBinding>` (the binding side's aggregate), which is `Send +
141/// Sync` by trait bound. `Rc<RefCell<...>>` would violate that — stays
142/// `Arc<Mutex<...>>`. Per-op state on build/sink closures uses its own
143/// Cat-3 cells (see `higher_order.rs`/`buffer.rs`).
144pub type ProducerStorage = Arc<Mutex<HashMap<NodeId, ProducerNodeState>>>;
145
146/// Closure-registration interface for producer-shape operators —
147/// extends [`BindingBoundary`] with one method that bindings shipping
148/// producers must implement.
149///
150/// Bindings that don't ship producers (e.g., minimal test bindings)
151/// don't need to implement this trait. The operator factories below
152/// (`zip`, `concat`, `race`, `take_until`) require it.
153pub trait ProducerBinding: BindingBoundary {
154 /// Register a producer build closure. The returned [`FnId`] is
155 /// passed to [`Core::register_producer`]; on first activation,
156 /// Core invokes [`BindingBoundary::invoke_fn`] which the binding
157 /// dispatches to the registered build closure.
158 fn register_producer_build(&self, build: ProducerBuildFn) -> FnId;
159
160 /// Access the binding's producer-state storage. Used by
161 /// [`ProducerCtx::subscribe_to`] to push subscriptions into the
162 /// per-node entry, and by the binding's `producer_deactivate`
163 /// impl to drop the entry on last unsubscribe.
164 fn producer_storage(&self) -> &ProducerStorage;
165}
166
167/// Sink-side emit handle (S2b / D231 / D232-AMEND/A′).
168///
169/// Producer build closures spawn long-lived `Sink`s that fire on every
170/// future upstream emit — long after the build closure's `&Core`
171/// (`ctx`) is gone. Under the actor model the `Core` is owned by value
172/// and relocates between workers, so sinks can no longer capture a
173/// cloned `Core` / `WeakCore`. Instead they capture a `ProducerEmitter`
174/// (cheap `Clone`: two `Arc`s) and post `MailboxOp`s to the
175/// `Core`-owned [`graphrefly_core::CoreMailbox`]; the `BatchGuard`
176/// drain-to-quiescence loop applies them **in-wave** via the sync
177/// `Core::{emit,complete,error}` (immediate, cascade-ordering-preserving
178/// — D232-AMEND).
179///
180/// Method names are `emit`/`complete`/`error` (post-D274 — the
181/// `_or_defer` suffix was dropped together with `Core::{emit,complete,
182/// error}_or_defer` since groups are static identity post-D248/D253 and
183/// no deferral seam remains). Sink bodies route through these instead
184/// of cloning `&Core`: `em = ctx.emitter()` carries the mailbox handle.
185/// The **`Send + Sync` cross-thread** producer emit handle (D249/S2c).
186///
187/// Holds only the id-only `Arc<CoreMailbox>` post side + the binding
188/// (for the `Core`-gone handle-release branch). This is what an
189/// autonomous timer task (`temporal.rs`, `tokio::spawn`-ed) captures —
190/// it stays `Send` so the spawned future is `Send`. It deliberately
191/// has **no `defer`** (that is the `!Send` owner-side path; see
192/// [`ProducerEmitter`]).
193#[derive(Clone)]
194pub struct MailboxEmitter {
195 mailbox: Arc<graphrefly_core::CoreMailbox>,
196 /// For the `Core`-gone branch only: if the owning `Core` already
197 /// dropped (mailbox closed), an `Emit`/`Error` payload handle would
198 /// leak — release it (mirrors `timer.rs`'s post-`false` path).
199 binding: Arc<dyn BindingBoundary>,
200}
201
202impl MailboxEmitter {
203 /// Post an `Emit`. If the owning `Core` is gone, release `handle`
204 /// (it held a retain for the would-be payload) — no leak.
205 pub fn emit(&self, node_id: NodeId, handle: HandleId) {
206 if !self.mailbox.post_emit(node_id, handle) {
207 self.binding.release_handle(handle);
208 }
209 }
210
211 /// Post a `Complete`. No payload handle; `Core`-gone is a no-op.
212 pub fn complete(&self, node_id: NodeId) {
213 let _ = self.mailbox.post_complete(node_id);
214 }
215
216 /// Post an `Error`. If the owning `Core` is gone, release the error
217 /// payload `handle` — no leak.
218 pub fn error(&self, node_id: NodeId, handle: HandleId) {
219 if !self.mailbox.post_error(node_id, handle) {
220 self.binding.release_handle(handle);
221 }
222 }
223
224 /// Post a **`Send`** cross-thread owner-side closure (D233/D249).
225 /// For an autonomous timer task (`temporal.rs` `window_time`) doing
226 /// task-side topology mutation that must run owner-side in FIFO
227 /// order — the closure captures only `Send` state, so it rides the
228 /// `Send + Sync` `CoreMailbox`. Returns `false` iff the `Core` is
229 /// gone (closure dropped unrun; release any captured handles).
230 #[must_use = "a `false` return means the Core is gone and the closure was dropped unrun; release any handles it captured"]
231 pub fn defer(&self, f: impl FnOnce(&dyn graphrefly_core::CoreFull) + Send + 'static) -> bool {
232 self.mailbox.post_defer(Box::new(f))
233 }
234
235 /// Whether the owning `Core` has dropped (mailbox closed) — for
236 /// prompt timer-task shutdown (see [`ProducerEmitter::is_core_gone`]).
237 #[must_use]
238 pub fn is_core_gone(&self) -> bool {
239 self.mailbox.is_closed()
240 }
241}
242
243/// The owner-side producer handle (D249/S2c). `MailboxEmitter` (the
244/// `Send` cross-thread emit side) **plus** the owner-only `!Send`
245/// `Rc<DeferQueue>` for [`Self::defer`]. Captured into owner-side
246/// `!Send` producer sinks (control/higher-order dynamic-inner); the
247/// `Rc` makes it `!Send`, consistent with the D248 single-owner `Sink`
248/// relaxation. A timer task that needs only the cross-thread emit side
249/// takes [`Self::emitter`] (a `Send` [`MailboxEmitter`]) instead.
250#[derive(Clone)]
251pub struct ProducerEmitter {
252 emitter: MailboxEmitter,
253 /// Owner-side `!Send` `Defer` queue split off `CoreMailbox`
254 /// (D249/S2c).
255 deferred: std::rc::Rc<graphrefly_core::DeferQueue>,
256}
257
258impl ProducerEmitter {
259 /// Construct directly from any `&Core` (S2b). Used by the
260 /// **binding-layer RAII** convenience (D228-A): a test harness /
261 /// napi `BenchCore` that co-owns the `Core` builds a [`SubGuard`]
262 /// over `core.subscribe(...)`'s returned `SubscriptionId` so drop
263 /// schedules the unsubscribe — the sanctioned replacement for the
264 /// retired core-level RAII `Subscription`.
265 #[must_use]
266 pub fn for_core(core: &Core) -> Self {
267 Self {
268 emitter: MailboxEmitter {
269 mailbox: core.mailbox(),
270 binding: core.binding(),
271 },
272 deferred: core.defer_queue(),
273 }
274 }
275
276 /// Construct from the object-safe [`CoreFull`] facade (D246 r5 /
277 /// D245). Used by [`ProducerCtx::emitter`] now that the ctx holds
278 /// `&dyn CoreFull` rather than a concrete `&Core`.
279 #[must_use]
280 pub fn from_corefull(core: &dyn CoreFull) -> Self {
281 Self {
282 emitter: MailboxEmitter {
283 mailbox: core.mailbox(),
284 binding: core.binding(),
285 },
286 deferred: core.defer_queue(),
287 }
288 }
289
290 /// The `Send` cross-thread emit sub-handle — for autonomous timer
291 /// tasks (`temporal.rs`, `tokio::spawn`) that only emit/complete/
292 /// error and must keep their spawned future `Send` (D249/S2c).
293 #[must_use]
294 pub fn emitter(&self) -> MailboxEmitter {
295 self.emitter.clone()
296 }
297
298 /// Post an `Emit`. If the owning `Core` is gone, release `handle`
299 /// (it held a retain for the would-be payload) — no leak.
300 pub fn emit(&self, node_id: NodeId, handle: HandleId) {
301 self.emitter.emit(node_id, handle);
302 }
303
304 /// Post a `Complete`. No payload handle; `Core`-gone is a no-op.
305 pub fn complete(&self, node_id: NodeId) {
306 self.emitter.complete(node_id);
307 }
308
309 /// Post an `Error`. If the owning `Core` is gone, release the error
310 /// payload `handle` — no leak.
311 pub fn error(&self, node_id: NodeId, handle: HandleId) {
312 self.emitter.error(node_id, handle);
313 }
314
315 /// Post an owner-side closure (D233) given the full object-safe
316 /// `Core` surface — for sinks that must perform value-returning
317 /// topology mutation (windowing `create_window_node`, higher-order
318 /// dynamic-inner `subscribe`). Runs **in-wave** (the drain loop
319 /// holds `&Core`); the closure consumes any returned
320 /// `NodeId`/`SubscriptionId` to drive its captured op-state.
321 ///
322 /// Returns `false` iff the owning `Core` is already gone — the
323 /// closure is dropped **unrun** (running `CoreFull` on a half-dropped
324 /// `Core` is unsound; user-locked QA decision A). QA F2 (2026-05-18):
325 /// this now surfaces the `Core`-gone signal (was a silent
326 /// `let _ = …`) so a caller whose closure captured retained
327 /// `HandleId`s can release them on `false` — mirroring the
328 /// `emit` / `error` release-on-`false` contract.
329 /// The not-yet-written windowing / higher-order callers MUST honour
330 /// this (release captured payload handles when it returns `false`).
331 #[must_use = "a `false` return means the Core is gone and the closure was dropped unrun; release any handles it captured"]
332 pub fn defer(&self, f: impl FnOnce(&dyn graphrefly_core::CoreFull) + 'static) -> bool {
333 self.deferred.post(Box::new(f))
334 }
335
336 /// Whether the owning `Core` has dropped (mailbox closed). Lets a
337 /// long-lived task stop promptly + release any handle it holds
338 /// (preserves the old `WeakCore::upgrade() == None` promptness).
339 /// NOT required for leak-safety (`MailboxEmitter::{emit,error}`
340 /// already release the captured handle on a closed post) — only for
341 /// prompt task shutdown.
342 #[must_use]
343 pub fn is_core_gone(&self) -> bool {
344 self.emitter.is_core_gone()
345 }
346}
347
348/// Binding-layer RAII subscription handle (S2b / D225 / D234). The
349/// core-level RAII `Subscription` was retired (a parameterless `Drop`
350/// can't reach a relocating owned `Core`); this wrapper IS the
351/// sanctioned binding-layer replacement for *substrate operators* that
352/// manage an inner subscription's lifetime by ownership (higher-order
353/// `switch/exhaust/merge/concat_map` inner subs). It holds a
354/// `ProducerEmitter` (an `Arc<CoreMailbox>` — `Send + Sync`, `'static`,
355/// NOT the `Core`), so its `Drop` legitimately posts a deferred
356/// `unsubscribe` via `em.defer` (owner-side, in-wave, FIFO-ordered —
357/// D234). FIFO ordering gives the correct cancel-then-resubscribe
358/// semantics: a `SubGuard` dropped before a new subscribe is posted is
359/// drained (unsub) before the new subscribe. A `Core`-gone post is
360/// dropped unrun (subscription moot at teardown — no leak).
361#[must_use = "dropping a SubGuard schedules the inner unsubscribe"]
362pub struct SubGuard {
363 node: NodeId,
364 sub: SubscriptionId,
365 em: ProducerEmitter,
366}
367
368impl SubGuard {
369 /// Track `sub` (returned by `CoreFull::try_subscribe` on `node`) so
370 /// dropping this guard unsubscribes it.
371 pub fn new(node: NodeId, sub: SubscriptionId, em: ProducerEmitter) -> Self {
372 Self { node, sub, em }
373 }
374}
375
376impl Drop for SubGuard {
377 fn drop(&mut self) {
378 let (n, s) = (self.node, self.sub);
379 // D234: post the unsubscribe owner-side, in-wave, FIFO-ordered
380 // (so a cancel-before-resubscribe drains in order). Dropped
381 // unrun if the Core is already gone — the sub is moot then.
382 let _ = self.em.defer(move |c| c.unsubscribe(n, s));
383 }
384}
385
386/// Context handed to a producer's build closure on activation.
387///
388/// Provides:
389/// - [`Self::node_id`] / [`Self::core`] — identity + Core access for
390/// sink callbacks that re-enter Core.
391/// - [`Self::subscribe_to`] — subscribe to an upstream Core node;
392/// the resulting `Subscription` is auto-tracked under
393/// `node_id` in the binding's producer storage and dropped on
394/// producer deactivation.
395pub struct ProducerCtx<'a> {
396 node_id: NodeId,
397 core: &'a dyn CoreFull,
398 storage: &'a ProducerStorage,
399}
400
401impl<'a> ProducerCtx<'a> {
402 /// Construct a new context for the binding's `invoke_fn` dispatch
403 /// to call build closures. Internal — bindings call this; user
404 /// code receives the constructed ctx via the build closure's arg.
405 ///
406 /// D246 r5 / D245: takes `&dyn CoreFull` — the one object-safe Core
407 /// facade Core hands the binding via
408 /// [`graphrefly_core::BindingBoundary::invoke_fn_with_core`]. A
409 /// concrete `&Core` unsized-coerces to `&dyn CoreFull` at the call
410 /// site, so existing `&Core`-holding call sites pass it directly
411 /// (`ProducerCtx::new(node, &core, &storage)`). `ProducerCtx` only
412 /// needs `subscribe`/`try_subscribe`/`register_*`/`emit`/`mailbox`/
413 /// `binding` — all on `CoreFull` — so no concrete `Core` /
414 /// thread-local / stored back-reference is required. (D274 deleted
415 /// the `*_or_defer` family the original docstring listed alongside
416 /// these methods.)
417 pub fn new(node_id: NodeId, core: &'a dyn CoreFull, storage: &'a ProducerStorage) -> Self {
418 Self {
419 node_id,
420 core,
421 storage,
422 }
423 }
424
425 /// The producer node's id.
426 #[must_use]
427 pub fn node_id(&self) -> NodeId {
428 self.node_id
429 }
430
431 /// The Core dispatcher, as the object-safe [`CoreFull`] facade
432 /// (D246 r5 / D245). **Build-closure-side only** — valid only for
433 /// the duration of the build call (the `Core` relocates; D231).
434 /// Long-lived sinks must use [`Self::emitter`] instead. Carries
435 /// everything a build closure uses (`subscribe`/`try_subscribe`/
436 /// `register_*`/`emit`/`binding`) without naming the concrete cell
437 /// type. (D274 deleted the `*_or_defer` family the original
438 /// docstring listed alongside these methods.)
439 #[must_use]
440 pub fn core(&self) -> &dyn CoreFull {
441 self.core
442 }
443
444 /// Sink-side emit handle (D232-AMEND/A′). Cheap-`Clone`; capture it
445 /// into spawned sink closures and call
446 /// `emit`/`complete`/`error` exactly as
447 /// the old cloned-`Core` did — ops post to the `Core`-owned mailbox
448 /// and are applied in-wave by the drain-to-quiescence loop.
449 #[must_use]
450 pub fn emitter(&self) -> ProducerEmitter {
451 ProducerEmitter::from_corefull(self.core)
452 }
453
454 /// The binding's per-producer state storage (S2b). Replaces
455 /// `binding.producer_storage()` for build closures / spawned sinks
456 /// that track their own upstream subscriptions or per-op state:
457 /// under D231 the build closure no longer holds a
458 /// `Arc<dyn ProducerBinding>` (only `ctx`'s borrowed `&Core` +
459 /// `&ProducerStorage`), and a sink can't reach `ProducerBinding`
460 /// either. The returned `ProducerStorage` is
461 /// `Rc<RefCell<…>>` — `'static` + cheap-`Clone`, so it can be
462 /// captured into long-lived sink closures (exactly how the old code
463 /// captured `binding.producer_storage().clone()`).
464 #[must_use]
465 pub fn storage(&self) -> ProducerStorage {
466 self.storage.clone()
467 }
468
469 /// Subscribe `sink` to upstream `source`. The `Subscription` is
470 /// auto-tracked under the producer's `node_id`; on producer
471 /// deactivation, the binding drops the storage entry, which drops
472 /// the Subscription, which unsubscribes the sink.
473 ///
474 /// **Phase H+ STRICT (D115, 2026-05-10):** uses `try_subscribe`
475 /// to attempt the subscription. On partition order violation, the
476 /// subscribe is deferred to wave-end via
477 /// `DeferredProducerOp::Callback`. (S2c/D248 single-owner: the
478 /// per-partition `wave_owner` `ReentrantMutex`es are deleted —
479 /// there is no cross-thread interleaving wave to serialize — so
480 /// the deferred callback simply runs owner-side at wave-end with
481 /// no lock acquisition.)
482 ///
483 /// **R2.2.7.b (D118, 2026-05-10):** if the upstream is
484 /// non-resubscribable AND already terminated, `try_subscribe`
485 /// returns `Err(SubscribeError::TornDown)`. /qa F2 (2026-05-10):
486 /// the rejection is now surfaced to the caller via
487 /// [`SubscribeOutcome::Dead`] so the operator can apply its
488 /// per-op dead-source semantics — pre-F2 the rejection was
489 /// silently swallowed, leaving operators wedged (zip waiting on a
490 /// queue that would never fill, concat stuck on a source that
491 /// would never advance, etc.). See [`SubscribeOutcome::Dead`] for
492 /// per-operator guidance.
493 pub fn subscribe_to(&self, source: NodeId, sink: Sink) -> SubscribeOutcome {
494 // D274 (2026-05-21): the
495 // `SubscribeError::PartitionOrderViolation` retry arm was deleted
496 // — the union-find ascending-order shim it dodged is gone (groups
497 // are user-declared + static post-D248/D253; one Core per OS
498 // thread per D252). The only remaining failure is `TornDown`.
499 match self.core.try_subscribe(source, sink) {
500 Ok(sub) => {
501 // S2b/D229: record `(source, sub_id)` for explicit
502 // owner-driven unsubscribe at `producer_deactivate`.
503 self.storage
504 .lock()
505 .entry(self.node_id)
506 .or_default()
507 .subs
508 .push((source, sub));
509 SubscribeOutcome::Live
510 }
511 Err(graphrefly_core::SubscribeError::TornDown { node }) => {
512 SubscribeOutcome::Dead { node }
513 }
514 }
515 }
516}
517
518/// Default helper — explicitly unsubscribe the producer's recorded
519/// upstream subs, then drop its storage entry, on deactivation.
520///
521/// S2b/D229: core-level RAII `Subscription` is retired, so the binding's
522/// [`BindingBoundary::producer_deactivate`] impl receives a
523/// `Core::unsubscribe`-capable `unsub` closure (the owner-driven chain
524/// passes it the `&Core` it already holds). Looping it over the recorded
525/// `(source, sub_id)` pairs is behaviour-identical to the old
526/// `Vec<Subscription>`-drop cascade (same deregister + Phase-G chain,
527/// lock-released so re-entrant producer cascades are safe).
528///
529/// Ordering (QA F3, 2026-05-18 — corrected from an earlier
530/// remove-AFTER comment that contradicted the code): the entry is
531/// **taken out under the `storage` lock FIRST**, then the `unsub`
532/// cascade runs lock-released over the moved-out `subs`. This is
533/// behaviour-identical to the retired path (old code did
534/// `states.remove(&node_id)` and the dropped `Vec<Subscription>`'s
535/// `Drop` ran the cascade — i.e. remove-then-cascade). Because the
536/// entry is already gone before any re-entrant call, a re-entrant
537/// `subscribe_to(node_id, …)` *during* the cascade `or_default()`s a
538/// **fresh** entry that correctly survives this deactivation (a
539/// genuine re-subscription) — there is never a half-cleared entry to
540/// observe. Do NOT reorder to remove-after-unsub: that *would* expose
541/// the live entry to the lock-released re-entrant cascade.
542pub fn default_producer_deactivate(
543 storage: &ProducerStorage,
544 node_id: NodeId,
545 unsub: &dyn Fn(NodeId, SubscriptionId),
546) {
547 // Take the entry out under the lock, then unsubscribe lock-released
548 // (the `unsub` closure re-enters Core; holding `storage` across it
549 // would risk a binding-vs-Core lock-order inversion).
550 let removed = storage.lock().remove(&node_id);
551 if let Some(state) = removed {
552 for (source, sub_id) in state.subs {
553 unsub(source, sub_id);
554 }
555 }
556}
557
558// =====================================================================
559// Producer-shape operators (D-ops, Slice D Commit 2)
560// =====================================================================
561//
562// All four producer ops follow the same shape:
563//
564// 1. Operator factory captures `Core::clone()` + sources + per-op state
565// (Rc<RefCell<...>>) into a build closure.
566// 2. `register_producer_build` returns a FnId.
567// 3. `Core::register_producer(fn_id)` creates the producer node.
568// 4. On first subscribe, Core fires invoke_fn → binding dispatches to
569// the build closure → ProducerCtx is constructed.
570// 5. Build closure subscribes to each upstream source, providing sink
571// closures that capture per-op state and the producer's NodeId.
572// 6. Sink closures process upstream emissions and emit on the producer
573// node via `core.emit` / `core.complete` / `core.error`.
574// 7. On last subscriber unsubscribe, Core fires producer_deactivate →
575// binding drops storage entry → Subscription Vec drops → sinks
576// unsub from upstream.
577//
578// The concrete operators (`zip` / `concat` / `race` / `take_until`)
579// live in [`super::ops_impl`] (sibling module) and are re-exported
580// from the crate root.