graphrefly_operators/ops_impl.rs
1//! Concrete implementations of the four subscription-managed combinators
2//! (zip / concat / race / takeUntil). Built on the
3//! [`super::producer::ProducerCtx`] substrate.
4
5// Each sink closure runs a small phase-1/phase-2 dance (lock state,
6// collect actions, drop lock, replay actions). The match-then-if
7// structure inside phase 1 is intentional for readability; collapsing
8// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
9#![allow(clippy::collapsible_if, clippy::collapsible_match)]
10// `zip` is genuinely large (multi-arity tuple-pack with terminal-
11// cascade handling). Splitting would obscure the concurrent state-
12// machine logic across helpers without clear benefit.
13#![allow(clippy::too_many_lines)]
14
15use std::collections::VecDeque;
16use std::sync::{Arc, Mutex, Weak};
17
18use graphrefly_core::{Core, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use super::producer::{ProducerBinding, ProducerCtx};
22
23// =====================================================================
24// zip — pair handles N-wise across N sources
25// =====================================================================
26
27/// Per-zip-node state: one FIFO queue per source, plus a flag for each
28/// source's terminal. Lives behind `Arc<Mutex<_>>` captured by the
29/// build + sink closures.
30struct ZipState {
31 queues: Vec<VecDeque<HandleId>>,
32 completed: Vec<bool>,
33 errored: bool,
34 terminated: bool,
35}
36
37impl ZipState {
38 fn new(n: usize) -> Self {
39 Self {
40 queues: (0..n).map(|_| VecDeque::new()).collect(),
41 completed: vec![false; n],
42 errored: false,
43 terminated: false,
44 }
45 }
46}
47
48/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
49/// tuple, repeat. Models RxJS / TS `zip`:
50///
51/// - Each upstream DATA pushes into that source's per-source queue.
52/// - When **every** queue has at least one entry, pop one from each,
53/// pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
54/// and emit on the producer.
55/// - On any source's COMPLETE: if its queue is empty, terminate the
56/// producer with COMPLETE. Otherwise continue draining; terminate
57/// when this source's queue becomes empty (zip can't produce a
58/// tuple without input from every source).
59/// - On any source's ERROR: terminate the producer with the same
60/// ERROR (first error wins, like merge per Slice C-2 D022).
61///
62/// Empty source list (`n == 0`) emits a single empty-tuple event then
63/// completes. Single source (`n == 1`) is identity-passthrough.
64///
65/// # Refcount discipline
66///
67/// Each upstream DATA handle is `retain_handle`-bumped before being
68/// pushed onto a queue (the inbound message's payload retain belongs
69/// to the wave-end-flush release path; we take our own share for the
70/// queue). On pop, component handles are passed to `pack_tuple` which
71/// must NOT consume or release them — the caller (zip) retains
72/// ownership throughout the call and releases each component handle's
73/// queue share after `pack_tuple` returns. The returned tuple handle
74/// has a pre-bumped retain (binding convention per D020 doc on
75/// [`BindingBoundary::pack_tuple`]).
76#[must_use]
77pub fn zip(
78 core: &Core,
79 binding: &Arc<dyn ProducerBinding>,
80 sources: Vec<NodeId>,
81 pack_fn_id: graphrefly_core::FnId,
82) -> NodeId {
83 let n = sources.len();
84 // Weak-Arc captures break the BenchBinding → registry → producer_builds
85 // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
86 // pin the entire graph state when the host BenchCore drops with active
87 // producer registrations. See `Core::weak_handle` doc + Slice Y close.
88 let core_weak = core.weak_handle();
89 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
90
91 let build = Box::new(move |ctx: ProducerCtx<'_>| {
92 let producer_id = ctx.node_id();
93 let (Some(core_for_build), Some(binding_clone)) =
94 (core_weak.upgrade(), binding_weak.upgrade())
95 else {
96 // Host Core / binding already dropped — no-op.
97 return;
98 };
99 if n == 0 {
100 // Empty zip emits an empty tuple immediately, then completes.
101 let tuple_h = binding_clone.pack_tuple(pack_fn_id, &[]);
102 core_for_build.emit(producer_id, tuple_h);
103 core_for_build.complete(producer_id);
104 return;
105 }
106 let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
107
108 for (idx, &source) in sources.iter().enumerate() {
109 let state_inner = state.clone();
110 // Sinks live only while the producer is active (cleared via
111 // producer_deactivate on last-subscriber unsubscribe), so they
112 // can safely capture strong refs cloned from the upgraded weaks.
113 let core_inner = core_for_build.clone();
114 let binding_inner = binding_clone.clone();
115 let sink: Sink = Arc::new(move |msgs| {
116 // Phase 1 (lock held): mutate queues + collect actions.
117 // Phase 2 (lock released): pack tuples + re-enter Core.
118 enum PostLockAction {
119 /// Pack popped handles into a tuple, release components, emit.
120 PackAndEmit(Vec<HandleId>),
121 Complete,
122 Error(HandleId),
123 }
124 let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
125 // Handles to release after the lock drops (P2:
126 // drain queues on terminate to avoid handle leaks).
127 let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
128 {
129 let mut s = state_inner.lock().unwrap();
130 if s.terminated {
131 return;
132 }
133 // Tier-based dispatch (canonical §4.2; see
134 // `feedback_use_tier_for_signal_routing.md`). Tier 3
135 // payload_handle.is_some() = DATA; tier 5
136 // payload_handle.is_some() = ERROR else COMPLETE.
137 for m in msgs {
138 match m.tier() {
139 3 => {
140 if let Some(h) = m.payload_handle() {
141 binding_inner.retain_handle(h);
142 s.queues[idx].push_back(h);
143 // Collect complete tuples — pack_tuple runs
144 // after the lock drops (P5).
145 while s.queues.iter().all(|q| !q.is_empty()) {
146 let popped: Vec<HandleId> = s
147 .queues
148 .iter_mut()
149 .map(|q| q.pop_front().unwrap())
150 .collect();
151 post_actions.push(PostLockAction::PackAndEmit(popped));
152 }
153 }
154 // else: Resolved on a source — no action.
155 }
156 5 => {
157 if let Some(h) = m.payload_handle() {
158 // Error
159 if !s.errored && !s.terminated {
160 s.errored = true;
161 s.terminated = true;
162 binding_inner.retain_handle(h);
163 // P2: release all remaining queued handles.
164 for q in &mut s.queues {
165 to_release.extend(q.drain(..));
166 }
167 post_actions.push(PostLockAction::Error(h));
168 }
169 } else {
170 // Complete
171 s.completed[idx] = true;
172 // If this source's queue is empty, no more
173 // tuples from it — terminate.
174 if s.queues[idx].is_empty() && !s.terminated {
175 s.terminated = true;
176 // P2: release all remaining queued handles.
177 for q in &mut s.queues {
178 to_release.extend(q.drain(..));
179 }
180 post_actions.push(PostLockAction::Complete);
181 }
182 }
183 }
184 _ => {} // Tiers 0/1/2/4/6 — no action.
185 }
186 }
187 }
188 // Release leaked queue handles outside the lock.
189 for h in to_release {
190 binding_inner.release_handle(h);
191 }
192 // Phase 2 (lock released): pack tuples + re-enter Core.
193 // P5: pack_tuple runs outside the per-zip lock to avoid
194 // deadlock if the binding's pack_tuple re-enters.
195 for action in post_actions {
196 match action {
197 PostLockAction::PackAndEmit(popped) => {
198 let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
199 for h in &popped {
200 binding_inner.release_handle(*h);
201 }
202 core_inner.emit(producer_id, tuple_h);
203 }
204 PostLockAction::Complete => core_inner.complete(producer_id),
205 PostLockAction::Error(h) => core_inner.error(producer_id, h),
206 }
207 }
208 });
209 ctx.subscribe_to(source, sink);
210 }
211 });
212
213 let fn_id = binding.register_producer_build(build);
214 core.register_producer(fn_id)
215 .expect("invariant: register_producer has no deps; no error variants reachable")
216}
217
218// =====================================================================
219// concat — sequentially forward `first` then `second`
220// =====================================================================
221
222struct ConcatState {
223 /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
224 phase: u8,
225 /// Buffered DATA from `second` that arrived during phase 0 (before
226 /// `first` completed). Drained on phase transition.
227 pending: VecDeque<HandleId>,
228 /// Set to true if `second` completed during phase 0. On phase
229 /// transition, after draining `pending`, concat self-completes
230 /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
231 second_completed: bool,
232 terminated: bool,
233}
234
235impl ConcatState {
236 fn new() -> Self {
237 Self {
238 phase: 0,
239 pending: VecDeque::new(),
240 second_completed: false,
241 terminated: false,
242 }
243 }
244}
245
246/// `concat(first, second)` — forward DATA from `first` until it
247/// completes, then drain any DATA `second` emitted during phase 1
248/// (buffered) and continue forwarding `second`. ERROR from either
249/// source terminates the producer with the same ERROR.
250///
251/// Subscribes to BOTH sources at activation time (matches TS impl in
252/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
253/// race after `first` completes). DATA from `second` during phase 0
254/// is buffered, not forwarded.
255#[must_use]
256pub fn concat(
257 core: &Core,
258 binding: &Arc<dyn ProducerBinding>,
259 first: NodeId,
260 second: NodeId,
261) -> NodeId {
262 // Weak captures break the producer-build Arc cycle (see `zip` doc).
263 let core_weak = core.weak_handle();
264 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
265
266 let build = Box::new(move |ctx: ProducerCtx<'_>| {
267 let producer_id = ctx.node_id();
268 let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
269 else {
270 return;
271 };
272 let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
273
274 // Subscribe to second FIRST so phase-0 DATA buffering catches
275 // synchronous initial emissions. Sinks capture strong refs cloned
276 // from the upgraded weaks; sink lifetime tied to producer activation.
277 let state_for_second = state.clone();
278 let core_for_second = core_clone.clone();
279 let binding_for_second = binding_clone.clone();
280 let second_sink: Sink = Arc::new(move |msgs| {
281 enum Action {
282 Emit(HandleId),
283 Complete,
284 Error(HandleId),
285 }
286 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
287 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
288 {
289 let mut s = state_for_second.lock().unwrap();
290 if s.terminated {
291 return;
292 }
293 // Tier-based dispatch (canonical §4.2).
294 for m in msgs {
295 match m.tier() {
296 3 => {
297 if let Some(h) = m.payload_handle() {
298 if s.phase == 0 {
299 // Buffer for later drain.
300 binding_for_second.retain_handle(h);
301 s.pending.push_back(h);
302 } else {
303 binding_for_second.retain_handle(h);
304 actions.push(Action::Emit(h));
305 }
306 }
307 // else: Resolved on second source — no action.
308 }
309 5 => {
310 if let Some(h) = m.payload_handle() {
311 // Error
312 if !s.terminated {
313 s.terminated = true;
314 binding_for_second.retain_handle(h);
315 // P2: release buffered pending handles.
316 to_release.extend(s.pending.drain(..));
317 actions.push(Action::Error(h));
318 }
319 } else {
320 // Complete
321 if s.phase == 1 && !s.terminated {
322 s.terminated = true;
323 actions.push(Action::Complete);
324 } else if s.phase == 0 {
325 // D041 / D4 fix: remember that second
326 // completed during phase 0. On phase
327 // transition, after draining `pending`,
328 // first_sink will self-complete
329 // (second's Complete fires once and
330 // won't be re-observed).
331 s.second_completed = true;
332 }
333 }
334 }
335 _ => {} // Tiers 0/1/2/4/6 — no action.
336 }
337 }
338 }
339 for h in to_release {
340 binding_for_second.release_handle(h);
341 }
342 for action in actions {
343 match action {
344 Action::Emit(h) => core_for_second.emit(producer_id, h),
345 Action::Complete => core_for_second.complete(producer_id),
346 Action::Error(h) => core_for_second.error(producer_id, h),
347 }
348 }
349 });
350 ctx.subscribe_to(second, second_sink);
351
352 let state_for_first = state.clone();
353 let core_for_first = core_clone.clone();
354 let binding_for_first = binding_clone.clone();
355 let first_sink: Sink = Arc::new(move |msgs| {
356 // first.Complete triggers the phase transition (handled
357 // via `s.phase = 1` + draining pending into `actions`),
358 // and may also self-complete the producer if `second`
359 // already completed during phase 0 (D041 / D4 fix).
360 enum Action {
361 Emit(HandleId),
362 Complete,
363 Error(HandleId),
364 }
365 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
366 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
367 {
368 let mut s = state_for_first.lock().unwrap();
369 if s.terminated {
370 return;
371 }
372 if s.phase != 0 {
373 return; // first is done; ignore stale messages.
374 }
375 for m in msgs {
376 // Slice E /qa: defensive per-iteration `terminated`
377 // guard. The outer guard at the top of the lock
378 // block catches a `terminated` set BEFORE this
379 // batch arrived; this per-iteration check catches
380 // the case where an earlier message in the SAME
381 // batch transitioned us terminal (e.g., post-
382 // Complete the phase moved to 1, but a defensively-
383 // emitted stale `[Data]` later in the same batch
384 // would otherwise queue a useless retain that
385 // `core.emit` would discard on a terminal
386 // producer).
387 if s.terminated || s.phase != 0 {
388 break;
389 }
390 // Tier-based dispatch (canonical §4.2).
391 match m.tier() {
392 3 => {
393 if let Some(h) = m.payload_handle() {
394 binding_for_first.retain_handle(h);
395 actions.push(Action::Emit(h));
396 }
397 // else: Resolved on first source — no action.
398 }
399 5 => {
400 if let Some(h) = m.payload_handle() {
401 // Error
402 if !s.terminated {
403 s.terminated = true;
404 binding_for_first.retain_handle(h);
405 // P2: release buffered pending handles.
406 to_release.extend(s.pending.drain(..));
407 actions.push(Action::Error(h));
408 }
409 } else {
410 // Complete — phase transition: drain pending
411 // second-data, then continue forwarding from
412 // second.
413 s.phase = 1;
414 // Pending handles already retained at buffer time.
415 for h in s.pending.drain(..) {
416 actions.push(Action::Emit(h));
417 }
418 // D041 / D4 fix: if second already completed
419 // during phase 0, self-complete now (its
420 // Complete fired once and won't re-fire).
421 if s.second_completed && !s.terminated {
422 s.terminated = true;
423 actions.push(Action::Complete);
424 }
425 }
426 }
427 _ => {} // Tiers 0/1/2/4/6 — no action.
428 }
429 }
430 }
431 for h in to_release {
432 binding_for_first.release_handle(h);
433 }
434 for action in actions {
435 match action {
436 Action::Emit(h) => core_for_first.emit(producer_id, h),
437 Action::Complete => core_for_first.complete(producer_id),
438 Action::Error(h) => core_for_first.error(producer_id, h),
439 }
440 }
441 });
442 ctx.subscribe_to(first, first_sink);
443 });
444
445 let fn_id = binding.register_producer_build(build);
446 core.register_producer(fn_id)
447 .expect("invariant: register_producer has no deps; no error variants reachable")
448}
449
450// =====================================================================
451// race — first source to emit DATA wins; losers are ignored
452// =====================================================================
453
454struct RaceState {
455 /// Index of the winning source, or `None` if no winner yet.
456 winner: Option<usize>,
457 /// Per-source completed flags. When all complete without a winner,
458 /// the producer completes (P4: no-winner all-complete termination).
459 completed: Vec<bool>,
460 terminated: bool,
461}
462
463impl RaceState {
464 fn new(n: usize) -> Self {
465 Self {
466 winner: None,
467 completed: vec![false; n],
468 terminated: false,
469 }
470 }
471}
472
473/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
474/// emit DATA wins. Subsequent traffic from the winner is forwarded;
475/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
476/// but their sink callbacks short-circuit). Saves the dynamic
477/// rewiring cost of explicitly unsubscribing losers.
478///
479/// Empty source list completes immediately. Single source is
480/// identity-passthrough.
481#[must_use]
482pub fn race(core: &Core, binding: &Arc<dyn ProducerBinding>, sources: Vec<NodeId>) -> NodeId {
483 let n = sources.len();
484 // Weak captures break the producer-build Arc cycle (see `zip` doc).
485 let core_weak = core.weak_handle();
486 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
487
488 let build = Box::new(move |ctx: ProducerCtx<'_>| {
489 let producer_id = ctx.node_id();
490 let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
491 else {
492 return;
493 };
494 if n == 0 {
495 core_clone.complete(producer_id);
496 return;
497 }
498 let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
499
500 for (idx, &source) in sources.iter().enumerate() {
501 let state_inner = state.clone();
502 let core_inner = core_clone.clone();
503 let binding_inner = binding_clone.clone();
504 let sink: Sink = Arc::new(move |msgs| {
505 enum Action {
506 Emit(HandleId),
507 Complete,
508 Error(HandleId),
509 }
510 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
511 {
512 let mut s = state_inner.lock().unwrap();
513 if s.terminated {
514 return;
515 }
516 // Tier-based dispatch (canonical §4.2).
517 for m in msgs {
518 match m.tier() {
519 3 => {
520 if let Some(h) = m.payload_handle() {
521 // P3: check s.winner live each iteration —
522 // this source may have just become the winner
523 // earlier in this batch.
524 if s.winner.is_none() {
525 s.winner = Some(idx);
526 binding_inner.retain_handle(h);
527 actions.push(Action::Emit(h));
528 } else if s.winner == Some(idx) {
529 binding_inner.retain_handle(h);
530 actions.push(Action::Emit(h));
531 }
532 // else: loser DATA — ignore.
533 }
534 // else: Resolved on a source — no action.
535 }
536 5 => {
537 if let Some(h) = m.payload_handle() {
538 // Error — from any source pre-winner OR from
539 // the winner cascade; loser errors post-winner
540 // are ignored.
541 if (s.winner.is_none() || s.winner == Some(idx))
542 && !s.terminated
543 {
544 s.terminated = true;
545 binding_inner.retain_handle(h);
546 actions.push(Action::Error(h));
547 }
548 } else {
549 // Complete
550 s.completed[idx] = true;
551 if s.winner == Some(idx) && !s.terminated {
552 s.terminated = true;
553 actions.push(Action::Complete);
554 } else if s.winner.is_none()
555 && s.completed.iter().all(|&c| c)
556 && !s.terminated
557 {
558 // P4: all sources completed without a
559 // winner — terminate the producer.
560 s.terminated = true;
561 actions.push(Action::Complete);
562 }
563 // else: loser complete — ignore.
564 }
565 }
566 _ => {} // Tiers 0/1/2/4/6 — no action.
567 }
568 }
569 }
570 for action in actions {
571 match action {
572 Action::Emit(h) => core_inner.emit(producer_id, h),
573 Action::Complete => core_inner.complete(producer_id),
574 Action::Error(h) => core_inner.error(producer_id, h),
575 }
576 }
577 });
578 ctx.subscribe_to(source, sink);
579 }
580 });
581
582 let fn_id = binding.register_producer_build(build);
583 core.register_producer(fn_id)
584 .expect("invariant: register_producer has no deps; no error variants reachable")
585}
586
587// =====================================================================
588// takeUntil — terminate when notifier emits DATA
589// =====================================================================
590
591struct TakeUntilState {
592 terminated: bool,
593}
594
595impl TakeUntilState {
596 fn new() -> Self {
597 Self { terminated: false }
598 }
599}
600
601/// `take_until(source, notifier)` — forward DATA from `source` until
602/// `notifier` emits its first DATA, then terminate the producer with
603/// COMPLETE. Errors from either source cascade. Source COMPLETE
604/// terminates the producer.
605///
606/// Notifier DATA is consumed but never forwarded (zero-FFI on the
607/// notifier path — we don't dereference its payload, just use the
608/// emission as a signal).
609#[must_use]
610pub fn take_until(
611 core: &Core,
612 binding: &Arc<dyn ProducerBinding>,
613 source: NodeId,
614 notifier: NodeId,
615) -> NodeId {
616 // Weak captures break the producer-build Arc cycle (see `zip` doc).
617 let core_weak = core.weak_handle();
618 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
619
620 let build = Box::new(move |ctx: ProducerCtx<'_>| {
621 let producer_id = ctx.node_id();
622 let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
623 else {
624 return;
625 };
626 let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
627
628 // Source sink: forward DATA, propagate terminals.
629 let state_for_source = state.clone();
630 let core_for_source = core_clone.clone();
631 let binding_for_source = binding_clone.clone();
632 let source_sink: Sink = Arc::new(move |msgs| {
633 enum Action {
634 Emit(HandleId),
635 Complete,
636 Error(HandleId),
637 }
638 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
639 {
640 let mut s = state_for_source.lock().unwrap();
641 if s.terminated {
642 return;
643 }
644 // Tier-based dispatch (canonical §4.2).
645 for m in msgs {
646 match m.tier() {
647 3 => {
648 if let Some(h) = m.payload_handle() {
649 binding_for_source.retain_handle(h);
650 actions.push(Action::Emit(h));
651 }
652 // else: Resolved on source — no action.
653 }
654 5 => {
655 if let Some(h) = m.payload_handle() {
656 // Error
657 if !s.terminated {
658 s.terminated = true;
659 binding_for_source.retain_handle(h);
660 actions.push(Action::Error(h));
661 }
662 } else {
663 // Complete
664 if !s.terminated {
665 s.terminated = true;
666 actions.push(Action::Complete);
667 }
668 }
669 }
670 _ => {} // Tiers 0/1/2/4/6 — no action.
671 }
672 }
673 }
674 for action in actions {
675 match action {
676 Action::Emit(h) => core_for_source.emit(producer_id, h),
677 Action::Complete => core_for_source.complete(producer_id),
678 Action::Error(h) => core_for_source.error(producer_id, h),
679 }
680 }
681 });
682 ctx.subscribe_to(source, source_sink);
683
684 // Notifier sink: any DATA → terminate; ERROR → cascade.
685 let state_for_notifier = state.clone();
686 let core_for_notifier = core_clone.clone();
687 let binding_for_notifier = binding_clone.clone();
688 let notifier_sink: Sink = Arc::new(move |msgs| {
689 enum Action {
690 Complete,
691 Error(HandleId),
692 }
693 let mut action: Option<Action> = None;
694 {
695 let mut s = state_for_notifier.lock().unwrap();
696 if s.terminated {
697 return;
698 }
699 // Tier-based dispatch (canonical §4.2).
700 for m in msgs {
701 match m.tier() {
702 3 => {
703 // Any DATA on notifier → complete the producer.
704 // (Resolved alone — no payload — no action.)
705 // We don't emit notifier DATA downstream, so we
706 // don't even need to extract the handle.
707 if m.payload_handle().is_some() && !s.terminated {
708 s.terminated = true;
709 action = Some(Action::Complete);
710 break;
711 }
712 }
713 5 => {
714 // Error: cascade. Complete (payload_handle.is_none())
715 // without prior DATA: nothing to do — source
716 // continues independently.
717 if let Some(h) = m.payload_handle() {
718 if !s.terminated {
719 s.terminated = true;
720 binding_for_notifier.retain_handle(h);
721 action = Some(Action::Error(h));
722 break;
723 }
724 }
725 }
726 _ => {} // Tiers 0/1/2/4/6 — no action.
727 }
728 }
729 }
730 if let Some(a) = action {
731 match a {
732 Action::Complete => core_for_notifier.complete(producer_id),
733 Action::Error(h) => core_for_notifier.error(producer_id, h),
734 }
735 }
736 });
737 ctx.subscribe_to(notifier, notifier_sink);
738 });
739
740 let fn_id = binding.register_producer_build(build);
741 core.register_producer(fn_id)
742 .expect("invariant: register_producer has no deps; no error variants reachable")
743}