Skip to main content

graphrefly_operators/
ops_impl.rs

1//! Concrete implementations of the four subscription-managed combinators
2//! (zip / concat / race / takeUntil). Built on the
3//! [`super::producer::ProducerCtx`] substrate.
4
5// Each sink closure runs a small phase-1/phase-2 dance (lock state,
6// collect actions, drop lock, replay actions). The match-then-if
7// structure inside phase 1 is intentional for readability; collapsing
8// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
9#![allow(clippy::collapsible_if, clippy::collapsible_match)]
10// `zip` is genuinely large (multi-arity tuple-pack with terminal-
11// cascade handling). Splitting would obscure the concurrent state-
12// machine logic across helpers without clear benefit.
13#![allow(clippy::too_many_lines)]
14
15use std::collections::VecDeque;
16use std::sync::{Arc, Mutex, Weak};
17
18use graphrefly_core::{Core, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use super::producer::{ProducerBinding, ProducerCtx};
22
23// =====================================================================
24// zip — pair handles N-wise across N sources
25// =====================================================================
26
27/// Per-zip-node state: one FIFO queue per source, plus a flag for each
28/// source's terminal. Lives behind `Arc<Mutex<_>>` captured by the
29/// build + sink closures.
30struct ZipState {
31    queues: Vec<VecDeque<HandleId>>,
32    completed: Vec<bool>,
33    errored: bool,
34    terminated: bool,
35}
36
37impl ZipState {
38    fn new(n: usize) -> Self {
39        Self {
40            queues: (0..n).map(|_| VecDeque::new()).collect(),
41            completed: vec![false; n],
42            errored: false,
43            terminated: false,
44        }
45    }
46}
47
48/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
49/// tuple, repeat. Models RxJS / TS `zip`:
50///
51/// - Each upstream DATA pushes into that source's per-source queue.
52/// - When **every** queue has at least one entry, pop one from each,
53///   pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
54///   and emit on the producer.
55/// - On any source's COMPLETE: if its queue is empty, terminate the
56///   producer with COMPLETE. Otherwise continue draining; terminate
57///   when this source's queue becomes empty (zip can't produce a
58///   tuple without input from every source).
59/// - On any source's ERROR: terminate the producer with the same
60///   ERROR (first error wins, like merge per Slice C-2 D022).
61///
62/// Empty source list (`n == 0`) emits a single empty-tuple event then
63/// completes. Single source (`n == 1`) is identity-passthrough.
64///
65/// # Refcount discipline
66///
67/// Each upstream DATA handle is `retain_handle`-bumped before being
68/// pushed onto a queue (the inbound message's payload retain belongs
69/// to the wave-end-flush release path; we take our own share for the
70/// queue). On pop, component handles are passed to `pack_tuple` which
71/// must NOT consume or release them — the caller (zip) retains
72/// ownership throughout the call and releases each component handle's
73/// queue share after `pack_tuple` returns. The returned tuple handle
74/// has a pre-bumped retain (binding convention per D020 doc on
75/// [`BindingBoundary::pack_tuple`]).
76#[must_use]
77pub fn zip(
78    core: &Core,
79    binding: &Arc<dyn ProducerBinding>,
80    sources: Vec<NodeId>,
81    pack_fn_id: graphrefly_core::FnId,
82) -> NodeId {
83    let n = sources.len();
84    // Weak-Arc captures break the BenchBinding → registry → producer_builds
85    // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
86    // pin the entire graph state when the host BenchCore drops with active
87    // producer registrations. See `Core::weak_handle` doc + Slice Y close.
88    let core_weak = core.weak_handle();
89    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
90
91    let build = Box::new(move |ctx: ProducerCtx<'_>| {
92        let producer_id = ctx.node_id();
93        let (Some(core_for_build), Some(binding_clone)) =
94            (core_weak.upgrade(), binding_weak.upgrade())
95        else {
96            // Host Core / binding already dropped — no-op.
97            return;
98        };
99        if n == 0 {
100            // Empty zip emits an empty tuple immediately, then completes.
101            let tuple_h = binding_clone.pack_tuple(pack_fn_id, &[]);
102            core_for_build.emit(producer_id, tuple_h);
103            core_for_build.complete(producer_id);
104            return;
105        }
106        let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
107
108        for (idx, &source) in sources.iter().enumerate() {
109            let state_inner = state.clone();
110            // Sinks live only while the producer is active (cleared via
111            // producer_deactivate on last-subscriber unsubscribe), so they
112            // can safely capture strong refs cloned from the upgraded weaks.
113            let core_inner = core_for_build.clone();
114            let binding_inner = binding_clone.clone();
115            let sink: Sink = Arc::new(move |msgs| {
116                // Phase 1 (lock held): mutate queues + collect actions.
117                // Phase 2 (lock released): pack tuples + re-enter Core.
118                enum PostLockAction {
119                    /// Pack popped handles into a tuple, release components, emit.
120                    PackAndEmit(Vec<HandleId>),
121                    Complete,
122                    Error(HandleId),
123                }
124                let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
125                // Handles to release after the lock drops (P2:
126                // drain queues on terminate to avoid handle leaks).
127                let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
128                {
129                    let mut s = state_inner.lock().unwrap();
130                    if s.terminated {
131                        return;
132                    }
133                    // Tier-based dispatch (canonical §4.2; see
134                    // `feedback_use_tier_for_signal_routing.md`). Tier 3
135                    // payload_handle.is_some() = DATA; tier 5
136                    // payload_handle.is_some() = ERROR else COMPLETE.
137                    for m in msgs {
138                        match m.tier() {
139                            3 => {
140                                if let Some(h) = m.payload_handle() {
141                                    binding_inner.retain_handle(h);
142                                    s.queues[idx].push_back(h);
143                                    // Collect complete tuples — pack_tuple runs
144                                    // after the lock drops (P5).
145                                    while s.queues.iter().all(|q| !q.is_empty()) {
146                                        let popped: Vec<HandleId> = s
147                                            .queues
148                                            .iter_mut()
149                                            .map(|q| q.pop_front().unwrap())
150                                            .collect();
151                                        post_actions.push(PostLockAction::PackAndEmit(popped));
152                                    }
153                                }
154                                // else: Resolved on a source — no action.
155                            }
156                            5 => {
157                                if let Some(h) = m.payload_handle() {
158                                    // Error
159                                    if !s.errored && !s.terminated {
160                                        s.errored = true;
161                                        s.terminated = true;
162                                        binding_inner.retain_handle(h);
163                                        // P2: release all remaining queued handles.
164                                        for q in &mut s.queues {
165                                            to_release.extend(q.drain(..));
166                                        }
167                                        post_actions.push(PostLockAction::Error(h));
168                                    }
169                                } else {
170                                    // Complete
171                                    s.completed[idx] = true;
172                                    // If this source's queue is empty, no more
173                                    // tuples from it — terminate.
174                                    if s.queues[idx].is_empty() && !s.terminated {
175                                        s.terminated = true;
176                                        // P2: release all remaining queued handles.
177                                        for q in &mut s.queues {
178                                            to_release.extend(q.drain(..));
179                                        }
180                                        post_actions.push(PostLockAction::Complete);
181                                    }
182                                }
183                            }
184                            _ => {} // Tiers 0/1/2/4/6 — no action.
185                        }
186                    }
187                }
188                // Release leaked queue handles outside the lock.
189                for h in to_release {
190                    binding_inner.release_handle(h);
191                }
192                // Phase 2 (lock released): pack tuples + re-enter Core.
193                // P5: pack_tuple runs outside the per-zip lock to avoid
194                // deadlock if the binding's pack_tuple re-enters.
195                for action in post_actions {
196                    match action {
197                        PostLockAction::PackAndEmit(popped) => {
198                            let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
199                            for h in &popped {
200                                binding_inner.release_handle(*h);
201                            }
202                            core_inner.emit(producer_id, tuple_h);
203                        }
204                        PostLockAction::Complete => core_inner.complete(producer_id),
205                        PostLockAction::Error(h) => core_inner.error(producer_id, h),
206                    }
207                }
208            });
209            ctx.subscribe_to(source, sink);
210        }
211    });
212
213    let fn_id = binding.register_producer_build(build);
214    core.register_producer(fn_id)
215        .expect("invariant: register_producer has no deps; no error variants reachable")
216}
217
218// =====================================================================
219// concat — sequentially forward `first` then `second`
220// =====================================================================
221
222struct ConcatState {
223    /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
224    phase: u8,
225    /// Buffered DATA from `second` that arrived during phase 0 (before
226    /// `first` completed). Drained on phase transition.
227    pending: VecDeque<HandleId>,
228    /// Set to true if `second` completed during phase 0. On phase
229    /// transition, after draining `pending`, concat self-completes
230    /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
231    second_completed: bool,
232    terminated: bool,
233}
234
235impl ConcatState {
236    fn new() -> Self {
237        Self {
238            phase: 0,
239            pending: VecDeque::new(),
240            second_completed: false,
241            terminated: false,
242        }
243    }
244}
245
246/// `concat(first, second)` — forward DATA from `first` until it
247/// completes, then drain any DATA `second` emitted during phase 1
248/// (buffered) and continue forwarding `second`. ERROR from either
249/// source terminates the producer with the same ERROR.
250///
251/// Subscribes to BOTH sources at activation time (matches TS impl in
252/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
253/// race after `first` completes). DATA from `second` during phase 0
254/// is buffered, not forwarded.
255#[must_use]
256pub fn concat(
257    core: &Core,
258    binding: &Arc<dyn ProducerBinding>,
259    first: NodeId,
260    second: NodeId,
261) -> NodeId {
262    // Weak captures break the producer-build Arc cycle (see `zip` doc).
263    let core_weak = core.weak_handle();
264    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
265
266    let build = Box::new(move |ctx: ProducerCtx<'_>| {
267        let producer_id = ctx.node_id();
268        let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
269        else {
270            return;
271        };
272        let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
273
274        // Subscribe to second FIRST so phase-0 DATA buffering catches
275        // synchronous initial emissions. Sinks capture strong refs cloned
276        // from the upgraded weaks; sink lifetime tied to producer activation.
277        let state_for_second = state.clone();
278        let core_for_second = core_clone.clone();
279        let binding_for_second = binding_clone.clone();
280        let second_sink: Sink = Arc::new(move |msgs| {
281            enum Action {
282                Emit(HandleId),
283                Complete,
284                Error(HandleId),
285            }
286            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
287            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
288            {
289                let mut s = state_for_second.lock().unwrap();
290                if s.terminated {
291                    return;
292                }
293                // Tier-based dispatch (canonical §4.2).
294                for m in msgs {
295                    match m.tier() {
296                        3 => {
297                            if let Some(h) = m.payload_handle() {
298                                if s.phase == 0 {
299                                    // Buffer for later drain.
300                                    binding_for_second.retain_handle(h);
301                                    s.pending.push_back(h);
302                                } else {
303                                    binding_for_second.retain_handle(h);
304                                    actions.push(Action::Emit(h));
305                                }
306                            }
307                            // else: Resolved on second source — no action.
308                        }
309                        5 => {
310                            if let Some(h) = m.payload_handle() {
311                                // Error
312                                if !s.terminated {
313                                    s.terminated = true;
314                                    binding_for_second.retain_handle(h);
315                                    // P2: release buffered pending handles.
316                                    to_release.extend(s.pending.drain(..));
317                                    actions.push(Action::Error(h));
318                                }
319                            } else {
320                                // Complete
321                                if s.phase == 1 && !s.terminated {
322                                    s.terminated = true;
323                                    actions.push(Action::Complete);
324                                } else if s.phase == 0 {
325                                    // D041 / D4 fix: remember that second
326                                    // completed during phase 0. On phase
327                                    // transition, after draining `pending`,
328                                    // first_sink will self-complete
329                                    // (second's Complete fires once and
330                                    // won't be re-observed).
331                                    s.second_completed = true;
332                                }
333                            }
334                        }
335                        _ => {} // Tiers 0/1/2/4/6 — no action.
336                    }
337                }
338            }
339            for h in to_release {
340                binding_for_second.release_handle(h);
341            }
342            for action in actions {
343                match action {
344                    Action::Emit(h) => core_for_second.emit(producer_id, h),
345                    Action::Complete => core_for_second.complete(producer_id),
346                    Action::Error(h) => core_for_second.error(producer_id, h),
347                }
348            }
349        });
350        ctx.subscribe_to(second, second_sink);
351
352        let state_for_first = state.clone();
353        let core_for_first = core_clone.clone();
354        let binding_for_first = binding_clone.clone();
355        let first_sink: Sink = Arc::new(move |msgs| {
356            // first.Complete triggers the phase transition (handled
357            // via `s.phase = 1` + draining pending into `actions`),
358            // and may also self-complete the producer if `second`
359            // already completed during phase 0 (D041 / D4 fix).
360            enum Action {
361                Emit(HandleId),
362                Complete,
363                Error(HandleId),
364            }
365            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
366            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
367            {
368                let mut s = state_for_first.lock().unwrap();
369                if s.terminated {
370                    return;
371                }
372                if s.phase != 0 {
373                    return; // first is done; ignore stale messages.
374                }
375                for m in msgs {
376                    // Slice E /qa: defensive per-iteration `terminated`
377                    // guard. The outer guard at the top of the lock
378                    // block catches a `terminated` set BEFORE this
379                    // batch arrived; this per-iteration check catches
380                    // the case where an earlier message in the SAME
381                    // batch transitioned us terminal (e.g., post-
382                    // Complete the phase moved to 1, but a defensively-
383                    // emitted stale `[Data]` later in the same batch
384                    // would otherwise queue a useless retain that
385                    // `core.emit` would discard on a terminal
386                    // producer).
387                    if s.terminated || s.phase != 0 {
388                        break;
389                    }
390                    // Tier-based dispatch (canonical §4.2).
391                    match m.tier() {
392                        3 => {
393                            if let Some(h) = m.payload_handle() {
394                                binding_for_first.retain_handle(h);
395                                actions.push(Action::Emit(h));
396                            }
397                            // else: Resolved on first source — no action.
398                        }
399                        5 => {
400                            if let Some(h) = m.payload_handle() {
401                                // Error
402                                if !s.terminated {
403                                    s.terminated = true;
404                                    binding_for_first.retain_handle(h);
405                                    // P2: release buffered pending handles.
406                                    to_release.extend(s.pending.drain(..));
407                                    actions.push(Action::Error(h));
408                                }
409                            } else {
410                                // Complete — phase transition: drain pending
411                                // second-data, then continue forwarding from
412                                // second.
413                                s.phase = 1;
414                                // Pending handles already retained at buffer time.
415                                for h in s.pending.drain(..) {
416                                    actions.push(Action::Emit(h));
417                                }
418                                // D041 / D4 fix: if second already completed
419                                // during phase 0, self-complete now (its
420                                // Complete fired once and won't re-fire).
421                                if s.second_completed && !s.terminated {
422                                    s.terminated = true;
423                                    actions.push(Action::Complete);
424                                }
425                            }
426                        }
427                        _ => {} // Tiers 0/1/2/4/6 — no action.
428                    }
429                }
430            }
431            for h in to_release {
432                binding_for_first.release_handle(h);
433            }
434            for action in actions {
435                match action {
436                    Action::Emit(h) => core_for_first.emit(producer_id, h),
437                    Action::Complete => core_for_first.complete(producer_id),
438                    Action::Error(h) => core_for_first.error(producer_id, h),
439                }
440            }
441        });
442        ctx.subscribe_to(first, first_sink);
443    });
444
445    let fn_id = binding.register_producer_build(build);
446    core.register_producer(fn_id)
447        .expect("invariant: register_producer has no deps; no error variants reachable")
448}
449
450// =====================================================================
451// race — first source to emit DATA wins; losers are ignored
452// =====================================================================
453
454struct RaceState {
455    /// Index of the winning source, or `None` if no winner yet.
456    winner: Option<usize>,
457    /// Per-source completed flags. When all complete without a winner,
458    /// the producer completes (P4: no-winner all-complete termination).
459    completed: Vec<bool>,
460    terminated: bool,
461}
462
463impl RaceState {
464    fn new(n: usize) -> Self {
465        Self {
466            winner: None,
467            completed: vec![false; n],
468            terminated: false,
469        }
470    }
471}
472
473/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
474/// emit DATA wins. Subsequent traffic from the winner is forwarded;
475/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
476/// but their sink callbacks short-circuit). Saves the dynamic
477/// rewiring cost of explicitly unsubscribing losers.
478///
479/// Empty source list completes immediately. Single source is
480/// identity-passthrough.
481#[must_use]
482pub fn race(core: &Core, binding: &Arc<dyn ProducerBinding>, sources: Vec<NodeId>) -> NodeId {
483    let n = sources.len();
484    // Weak captures break the producer-build Arc cycle (see `zip` doc).
485    let core_weak = core.weak_handle();
486    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
487
488    let build = Box::new(move |ctx: ProducerCtx<'_>| {
489        let producer_id = ctx.node_id();
490        let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
491        else {
492            return;
493        };
494        if n == 0 {
495            core_clone.complete(producer_id);
496            return;
497        }
498        let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
499
500        for (idx, &source) in sources.iter().enumerate() {
501            let state_inner = state.clone();
502            let core_inner = core_clone.clone();
503            let binding_inner = binding_clone.clone();
504            let sink: Sink = Arc::new(move |msgs| {
505                enum Action {
506                    Emit(HandleId),
507                    Complete,
508                    Error(HandleId),
509                }
510                let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
511                {
512                    let mut s = state_inner.lock().unwrap();
513                    if s.terminated {
514                        return;
515                    }
516                    // Tier-based dispatch (canonical §4.2).
517                    for m in msgs {
518                        match m.tier() {
519                            3 => {
520                                if let Some(h) = m.payload_handle() {
521                                    // P3: check s.winner live each iteration —
522                                    // this source may have just become the winner
523                                    // earlier in this batch.
524                                    if s.winner.is_none() {
525                                        s.winner = Some(idx);
526                                        binding_inner.retain_handle(h);
527                                        actions.push(Action::Emit(h));
528                                    } else if s.winner == Some(idx) {
529                                        binding_inner.retain_handle(h);
530                                        actions.push(Action::Emit(h));
531                                    }
532                                    // else: loser DATA — ignore.
533                                }
534                                // else: Resolved on a source — no action.
535                            }
536                            5 => {
537                                if let Some(h) = m.payload_handle() {
538                                    // Error — from any source pre-winner OR from
539                                    // the winner cascade; loser errors post-winner
540                                    // are ignored.
541                                    if (s.winner.is_none() || s.winner == Some(idx))
542                                        && !s.terminated
543                                    {
544                                        s.terminated = true;
545                                        binding_inner.retain_handle(h);
546                                        actions.push(Action::Error(h));
547                                    }
548                                } else {
549                                    // Complete
550                                    s.completed[idx] = true;
551                                    if s.winner == Some(idx) && !s.terminated {
552                                        s.terminated = true;
553                                        actions.push(Action::Complete);
554                                    } else if s.winner.is_none()
555                                        && s.completed.iter().all(|&c| c)
556                                        && !s.terminated
557                                    {
558                                        // P4: all sources completed without a
559                                        // winner — terminate the producer.
560                                        s.terminated = true;
561                                        actions.push(Action::Complete);
562                                    }
563                                    // else: loser complete — ignore.
564                                }
565                            }
566                            _ => {} // Tiers 0/1/2/4/6 — no action.
567                        }
568                    }
569                }
570                for action in actions {
571                    match action {
572                        Action::Emit(h) => core_inner.emit(producer_id, h),
573                        Action::Complete => core_inner.complete(producer_id),
574                        Action::Error(h) => core_inner.error(producer_id, h),
575                    }
576                }
577            });
578            ctx.subscribe_to(source, sink);
579        }
580    });
581
582    let fn_id = binding.register_producer_build(build);
583    core.register_producer(fn_id)
584        .expect("invariant: register_producer has no deps; no error variants reachable")
585}
586
587// =====================================================================
588// takeUntil — terminate when notifier emits DATA
589// =====================================================================
590
591struct TakeUntilState {
592    terminated: bool,
593}
594
595impl TakeUntilState {
596    fn new() -> Self {
597        Self { terminated: false }
598    }
599}
600
601/// `take_until(source, notifier)` — forward DATA from `source` until
602/// `notifier` emits its first DATA, then terminate the producer with
603/// COMPLETE. Errors from either source cascade. Source COMPLETE
604/// terminates the producer.
605///
606/// Notifier DATA is consumed but never forwarded (zero-FFI on the
607/// notifier path — we don't dereference its payload, just use the
608/// emission as a signal).
609#[must_use]
610pub fn take_until(
611    core: &Core,
612    binding: &Arc<dyn ProducerBinding>,
613    source: NodeId,
614    notifier: NodeId,
615) -> NodeId {
616    // Weak captures break the producer-build Arc cycle (see `zip` doc).
617    let core_weak = core.weak_handle();
618    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
619
620    let build = Box::new(move |ctx: ProducerCtx<'_>| {
621        let producer_id = ctx.node_id();
622        let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
623        else {
624            return;
625        };
626        let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
627
628        // Source sink: forward DATA, propagate terminals.
629        let state_for_source = state.clone();
630        let core_for_source = core_clone.clone();
631        let binding_for_source = binding_clone.clone();
632        let source_sink: Sink = Arc::new(move |msgs| {
633            enum Action {
634                Emit(HandleId),
635                Complete,
636                Error(HandleId),
637            }
638            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
639            {
640                let mut s = state_for_source.lock().unwrap();
641                if s.terminated {
642                    return;
643                }
644                // Tier-based dispatch (canonical §4.2).
645                for m in msgs {
646                    match m.tier() {
647                        3 => {
648                            if let Some(h) = m.payload_handle() {
649                                binding_for_source.retain_handle(h);
650                                actions.push(Action::Emit(h));
651                            }
652                            // else: Resolved on source — no action.
653                        }
654                        5 => {
655                            if let Some(h) = m.payload_handle() {
656                                // Error
657                                if !s.terminated {
658                                    s.terminated = true;
659                                    binding_for_source.retain_handle(h);
660                                    actions.push(Action::Error(h));
661                                }
662                            } else {
663                                // Complete
664                                if !s.terminated {
665                                    s.terminated = true;
666                                    actions.push(Action::Complete);
667                                }
668                            }
669                        }
670                        _ => {} // Tiers 0/1/2/4/6 — no action.
671                    }
672                }
673            }
674            for action in actions {
675                match action {
676                    Action::Emit(h) => core_for_source.emit(producer_id, h),
677                    Action::Complete => core_for_source.complete(producer_id),
678                    Action::Error(h) => core_for_source.error(producer_id, h),
679                }
680            }
681        });
682        ctx.subscribe_to(source, source_sink);
683
684        // Notifier sink: any DATA → terminate; ERROR → cascade.
685        let state_for_notifier = state.clone();
686        let core_for_notifier = core_clone.clone();
687        let binding_for_notifier = binding_clone.clone();
688        let notifier_sink: Sink = Arc::new(move |msgs| {
689            enum Action {
690                Complete,
691                Error(HandleId),
692            }
693            let mut action: Option<Action> = None;
694            {
695                let mut s = state_for_notifier.lock().unwrap();
696                if s.terminated {
697                    return;
698                }
699                // Tier-based dispatch (canonical §4.2).
700                for m in msgs {
701                    match m.tier() {
702                        3 => {
703                            // Any DATA on notifier → complete the producer.
704                            // (Resolved alone — no payload — no action.)
705                            // We don't emit notifier DATA downstream, so we
706                            // don't even need to extract the handle.
707                            if m.payload_handle().is_some() && !s.terminated {
708                                s.terminated = true;
709                                action = Some(Action::Complete);
710                                break;
711                            }
712                        }
713                        5 => {
714                            // Error: cascade. Complete (payload_handle.is_none())
715                            // without prior DATA: nothing to do — source
716                            // continues independently.
717                            if let Some(h) = m.payload_handle() {
718                                if !s.terminated {
719                                    s.terminated = true;
720                                    binding_for_notifier.retain_handle(h);
721                                    action = Some(Action::Error(h));
722                                    break;
723                                }
724                            }
725                        }
726                        _ => {} // Tiers 0/1/2/4/6 — no action.
727                    }
728                }
729            }
730            if let Some(a) = action {
731                match a {
732                    Action::Complete => core_for_notifier.complete(producer_id),
733                    Action::Error(h) => core_for_notifier.error(producer_id, h),
734                }
735            }
736        });
737        ctx.subscribe_to(notifier, notifier_sink);
738    });
739
740    let fn_id = binding.register_producer_build(build);
741    core.register_producer(fn_id)
742        .expect("invariant: register_producer has no deps; no error variants reachable")
743}