Skip to main content

graphrefly_operators/
control.rs

1//! Control operators — side-effect, gating, error recovery, convergence.
2//!
3//! # Operators
4//!
5//! - [`tap`] / [`tap_observer`] — side-effect passthrough.
6//! - [`on_first_data`] — one-shot side-effect on first DATA.
7//! - [`rescue`] — error recovery via user callback.
8//! - [`valve`] — boolean-gated passthrough with optional cancellation.
9//! - [`settle`] — wave-count convergence detector.
10//! - [`repeat`] — sequential resubscribe loop.
11
12#![allow(clippy::too_many_lines, clippy::items_after_statements)]
13
14use std::cell::RefCell;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
22
23// =========================================================================
24// tap(source, fn_id) — side-effect passthrough
25// =========================================================================
26
27/// Passthrough operator that forwards all DATA unchanged. On each DATA,
28/// calls `binding.invoke_tap_fn(fn_id, handle)` as a side-effect.
29/// Forwards COMPLETE and ERROR unchanged.
30#[must_use]
31pub fn tap(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, fn_id: FnId) -> NodeId {
32    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
33        let core_s = ctx.core();
34        let binding_s = ctx.core().binding();
35        let em = ctx.emitter();
36        let pid = ctx.node_id();
37        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
38        let core_sink = em.clone();
39
40        let source_sink: Sink = Rc::new(move |msgs| {
41            enum Act {
42                EmitAndTap(HandleId),
43                Complete,
44                Error(HandleId),
45            }
46            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
47            for m in msgs {
48                match m.tier() {
49                    3 => {
50                        if let Some(h) = m.payload_handle() {
51                            bb.retain_handle(h);
52                            actions.push(Act::EmitAndTap(h));
53                        }
54                    }
55                    5 => {
56                        if let Some(h) = m.payload_handle() {
57                            bb.retain_handle(h);
58                            actions.push(Act::Error(h));
59                        } else {
60                            actions.push(Act::Complete);
61                        }
62                    }
63                    _ => {}
64                }
65            }
66            for a in actions {
67                match a {
68                    Act::EmitAndTap(h) => {
69                        bb.invoke_tap_fn(fn_id, h);
70                        core_sink.emit(pid, h);
71                    }
72                    Act::Complete => core_sink.complete(pid),
73                    Act::Error(h) => core_sink.error(pid, h),
74                }
75            }
76        });
77
78        let outcome = ctx.subscribe_to(source, source_sink);
79        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
80            core_s.complete(pid);
81        }
82    });
83
84    let fn_id_reg = binding.register_producer_build(build);
85    core.register_producer(fn_id_reg)
86        .expect("tap: register_producer failed")
87}
88
89// =========================================================================
90// tap_observer(source, data_fn, error_fn, complete_fn)
91// =========================================================================
92
93/// Like [`tap`] but with lifecycle observer callbacks. Each callback is
94/// optional (`None` = skip that lifecycle event).
95///
96/// - `data_fn_id`: called on each DATA via `invoke_tap_fn`.
97/// - `error_fn_id`: called on ERROR via `invoke_tap_error_fn`.
98/// - `complete_fn_id`: called on COMPLETE via `invoke_tap_complete_fn`.
99///
100/// All messages are forwarded unchanged regardless of callback presence.
101#[must_use]
102pub fn tap_observer(
103    core: &Core,
104    binding: &Arc<dyn ProducerBinding>,
105    source: NodeId,
106    data_fn_id: Option<FnId>,
107    error_fn_id: Option<FnId>,
108    complete_fn_id: Option<FnId>,
109) -> NodeId {
110    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
111        let core_s = ctx.core();
112        let binding_s = ctx.core().binding();
113        let em = ctx.emitter();
114        let pid = ctx.node_id();
115        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
116        let core_sink = em.clone();
117
118        let source_sink: Sink = Rc::new(move |msgs| {
119            enum Act {
120                Emit(HandleId),
121                Complete,
122                Error(HandleId),
123            }
124            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
125            for m in msgs {
126                match m.tier() {
127                    3 => {
128                        if let Some(h) = m.payload_handle() {
129                            bb.retain_handle(h);
130                            actions.push(Act::Emit(h));
131                        }
132                    }
133                    5 => {
134                        if let Some(h) = m.payload_handle() {
135                            bb.retain_handle(h);
136                            actions.push(Act::Error(h));
137                        } else {
138                            actions.push(Act::Complete);
139                        }
140                    }
141                    _ => {}
142                }
143            }
144            for a in actions {
145                match a {
146                    Act::Emit(h) => {
147                        if let Some(fid) = data_fn_id {
148                            bb.invoke_tap_fn(fid, h);
149                        }
150                        core_sink.emit(pid, h);
151                    }
152                    Act::Complete => {
153                        if let Some(fid) = complete_fn_id {
154                            bb.invoke_tap_complete_fn(fid);
155                        }
156                        core_sink.complete(pid);
157                    }
158                    Act::Error(h) => {
159                        if let Some(fid) = error_fn_id {
160                            bb.invoke_tap_error_fn(fid, h);
161                        }
162                        core_sink.error(pid, h);
163                    }
164                }
165            }
166        });
167
168        let outcome = ctx.subscribe_to(source, source_sink);
169        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
170            if let Some(fid) = complete_fn_id {
171                binding_s.invoke_tap_complete_fn(fid);
172            }
173            core_s.complete(pid);
174        }
175    });
176
177    let fn_id = binding.register_producer_build(build);
178    core.register_producer(fn_id)
179        .expect("tap_observer: register_producer failed")
180}
181
182// =========================================================================
183// on_first_data(source, fn_id) — one-shot tap
184// =========================================================================
185
186/// One-shot side-effect tap. Calls `invoke_tap_fn(fn_id, handle)` on
187/// the first DATA only, then becomes a pure passthrough. All messages
188/// are forwarded unchanged.
189#[must_use]
190pub fn on_first_data(
191    core: &Core,
192    binding: &Arc<dyn ProducerBinding>,
193    source: NodeId,
194    fn_id: FnId,
195) -> NodeId {
196    struct OnFirstState {
197        fired: bool,
198    }
199
200    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
201        let core_s = ctx.core();
202        let binding_s = ctx.core().binding();
203        let em = ctx.emitter();
204        let pid = ctx.node_id();
205        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
206        let core_sink = em.clone();
207        let state: Rc<RefCell<OnFirstState>> = Rc::new(RefCell::new(OnFirstState { fired: false }));
208
209        let source_sink: Sink = Rc::new(move |msgs| {
210            enum Act {
211                EmitWithTap(HandleId),
212                Emit(HandleId),
213                Complete,
214                Error(HandleId),
215            }
216            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
217            {
218                let mut s = state.borrow_mut();
219                for m in msgs {
220                    match m.tier() {
221                        3 => {
222                            if let Some(h) = m.payload_handle() {
223                                bb.retain_handle(h);
224                                if s.fired {
225                                    actions.push(Act::Emit(h));
226                                } else {
227                                    s.fired = true;
228                                    actions.push(Act::EmitWithTap(h));
229                                }
230                            }
231                        }
232                        5 => {
233                            if let Some(h) = m.payload_handle() {
234                                bb.retain_handle(h);
235                                actions.push(Act::Error(h));
236                            } else {
237                                actions.push(Act::Complete);
238                            }
239                        }
240                        _ => {}
241                    }
242                }
243            }
244            for a in actions {
245                match a {
246                    Act::EmitWithTap(h) => {
247                        bb.invoke_tap_fn(fn_id, h);
248                        core_sink.emit(pid, h);
249                    }
250                    Act::Emit(h) => core_sink.emit(pid, h),
251                    Act::Complete => core_sink.complete(pid),
252                    Act::Error(h) => core_sink.error(pid, h),
253                }
254            }
255        });
256
257        let outcome = ctx.subscribe_to(source, source_sink);
258        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
259            core_s.complete(pid);
260        }
261    });
262
263    let fn_id_reg = binding.register_producer_build(build);
264    core.register_producer(fn_id_reg)
265        .expect("on_first_data: register_producer failed")
266}
267
268// =========================================================================
269// rescue(source, fn_id) — error recovery
270// =========================================================================
271
272/// Error recovery operator. On ERROR, calls
273/// `binding.invoke_rescue_fn(fn_id, error_handle)`:
274///
275/// - `Ok(recovered_handle)` — emit DATA with the recovered value.
276/// - `Err(())` — forward the original ERROR unchanged.
277///
278/// DATA and COMPLETE pass through unchanged.
279#[must_use]
280pub fn rescue(
281    core: &Core,
282    binding: &Arc<dyn ProducerBinding>,
283    source: NodeId,
284    fn_id: FnId,
285) -> NodeId {
286    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
287        let core_s = ctx.core();
288        let binding_s = ctx.core().binding();
289        let em = ctx.emitter();
290        let pid = ctx.node_id();
291        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
292        let core_sink = em.clone();
293
294        let source_sink: Sink = Rc::new(move |msgs| {
295            enum Act {
296                Emit(HandleId),
297                Complete,
298                TryRescue(HandleId),
299            }
300            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
301            for m in msgs {
302                match m.tier() {
303                    3 => {
304                        if let Some(h) = m.payload_handle() {
305                            bb.retain_handle(h);
306                            actions.push(Act::Emit(h));
307                        }
308                    }
309                    5 => {
310                        if let Some(h) = m.payload_handle() {
311                            bb.retain_handle(h);
312                            actions.push(Act::TryRescue(h));
313                        } else {
314                            actions.push(Act::Complete);
315                        }
316                    }
317                    _ => {}
318                }
319            }
320            for a in actions {
321                match a {
322                    Act::Emit(h) => core_sink.emit(pid, h),
323                    Act::Complete => core_sink.complete(pid),
324                    Act::TryRescue(err_h) => {
325                        match bb.invoke_rescue_fn(fn_id, err_h) {
326                            Ok(recovered_h) => {
327                                // Recovery succeeded — release original error,
328                                // emit recovered value as DATA.
329                                bb.release_handle(err_h);
330                                core_sink.emit(pid, recovered_h);
331                            }
332                            Err(()) => {
333                                // Recovery failed — forward original ERROR.
334                                core_sink.error(pid, err_h);
335                            }
336                        }
337                    }
338                }
339            }
340        });
341
342        let outcome = ctx.subscribe_to(source, source_sink);
343        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
344            core_s.complete(pid);
345        }
346    });
347
348    let fn_id_reg = binding.register_producer_build(build);
349    core.register_producer(fn_id_reg)
350        .expect("rescue: register_producer failed")
351}
352
353// =========================================================================
354// valve(source, control, gate_fn_id, cancel) — boolean gate
355// =========================================================================
356
357/// Boolean-gated passthrough. Subscribes to both `source` and `control`.
358///
359/// - **Control DATA**: evaluates `binding.predicate_each(gate_fn_id,
360///   &[handle])` to determine gate state. `true` = open, `false` = closed.
361///   On transition from open to closed, if `cancel` is `Some`, invokes
362///   `cancel.cancel()`.
363/// - **Source DATA**: if gate is open, retains + emits. If closed, drops
364///   the handle (source DATA while gate is closed is silently discarded).
365/// - **Source COMPLETE/ERROR**: forwarded unchanged.
366/// - **Control COMPLETE**: does NOT auto-complete the valve (per TS
367///   `completeWhenDepsComplete: false` equivalent).
368/// - **Control ERROR**: terminates the valve with ERROR.
369#[must_use]
370pub fn valve(
371    core: &Core,
372    binding: &Arc<dyn ProducerBinding>,
373    source: NodeId,
374    control: NodeId,
375    gate_fn_id: FnId,
376    cancel: Option<tokio_util::sync::CancellationToken>,
377) -> NodeId {
378    struct ValveState {
379        open: bool,
380        terminated: bool,
381    }
382
383    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
384        let core_s = ctx.core();
385        let binding_s = ctx.core().binding();
386        let em = ctx.emitter();
387        let pid = ctx.node_id();
388        let state: Rc<RefCell<ValveState>> = Rc::new(RefCell::new(ValveState {
389            open: false,
390            terminated: false,
391        }));
392
393        // --- control sink ---
394        let st_ctrl = state.clone();
395        let bb_ctrl: Arc<dyn BindingBoundary> = binding_s.clone();
396        let core_ctrl = em.clone();
397        let cancel_ctrl = cancel.clone();
398        let control_sink: Sink = Rc::new(move |msgs| {
399            let mut should_cancel = false;
400            let mut error_action: Option<HandleId> = None;
401            {
402                let mut s = st_ctrl.borrow_mut();
403                if s.terminated {
404                    return;
405                }
406                for m in msgs {
407                    match m.tier() {
408                        3 => {
409                            if let Some(h) = m.payload_handle() {
410                                let results = bb_ctrl.predicate_each(gate_fn_id, &[h]);
411                                let new_open = results.first().copied().unwrap_or(false);
412                                let was_open = s.open;
413                                s.open = new_open;
414                                // Transition open -> closed: trigger cancel.
415                                if was_open && !new_open && cancel_ctrl.is_some() {
416                                    should_cancel = true;
417                                }
418                            }
419                        }
420                        5 => {
421                            if let Some(h) = m.payload_handle() {
422                                // Control ERROR terminates valve.
423                                if !s.terminated {
424                                    s.terminated = true;
425                                    bb_ctrl.retain_handle(h);
426                                    error_action = Some(h);
427                                }
428                            }
429                            // Control COMPLETE: do NOT auto-complete.
430                        }
431                        _ => {}
432                    }
433                }
434            }
435            if should_cancel {
436                if let Some(ref ct) = cancel_ctrl {
437                    ct.cancel();
438                }
439            }
440            if let Some(h) = error_action {
441                core_ctrl.error(pid, h);
442            }
443        });
444
445        let ctrl_outcome = ctx.subscribe_to(control, control_sink);
446        if matches!(ctrl_outcome, SubscribeOutcome::Dead { .. }) {
447            // Control is dead — gate state remains as-is (closed by default).
448            // No auto-complete; source can still flow if gate was already opened.
449        }
450
451        // --- source sink ---
452        let st_src = state.clone();
453        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
454        let core_src = em.clone();
455        let source_sink: Sink = Rc::new(move |msgs| {
456            enum Act {
457                Emit(HandleId),
458                Complete,
459                Error(HandleId),
460            }
461            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
462            {
463                let s = st_src.borrow();
464                if s.terminated {
465                    return;
466                }
467                for m in msgs {
468                    match m.tier() {
469                        3 => {
470                            if let Some(h) = m.payload_handle() {
471                                if s.open {
472                                    bb_src.retain_handle(h);
473                                    actions.push(Act::Emit(h));
474                                }
475                                // Closed gate: silently discard.
476                            }
477                        }
478                        5 => {
479                            if let Some(h) = m.payload_handle() {
480                                bb_src.retain_handle(h);
481                                actions.push(Act::Error(h));
482                            } else {
483                                actions.push(Act::Complete);
484                            }
485                        }
486                        _ => {}
487                    }
488                }
489            }
490            for a in actions {
491                match a {
492                    Act::Emit(h) => core_src.emit(pid, h),
493                    Act::Complete => core_src.complete(pid),
494                    Act::Error(h) => core_src.error(pid, h),
495                }
496            }
497        });
498
499        let src_outcome = ctx.subscribe_to(source, source_sink);
500        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
501            let mut s = state.borrow_mut();
502            if !s.terminated {
503                s.terminated = true;
504                drop(s);
505                core_s.complete(pid);
506            }
507        }
508    });
509
510    let fn_id = binding.register_producer_build(build);
511    core.register_producer(fn_id)
512        .expect("valve: register_producer failed")
513}
514
515// =========================================================================
516// settle(source, quiet_waves, max_waves) — convergence detector
517// =========================================================================
518
519/// Wave-count convergence detector.
520///
521/// - On DATA: resets `quiet_count` to 0, increments `wave_count`. Forwards
522///   DATA unchanged. If `max_waves` is set and `wave_count >= max_waves`,
523///   completes.
524/// - On RESOLVED (tier 3, no payload): increments `quiet_count`. If
525///   `quiet_count >= quiet_waves`, completes.
526/// - On COMPLETE/ERROR: forwarded unchanged.
527#[must_use]
528pub fn settle(
529    core: &Core,
530    binding: &Arc<dyn ProducerBinding>,
531    source: NodeId,
532    quiet_waves: u32,
533    max_waves: Option<u32>,
534) -> NodeId {
535    struct SettleState {
536        wave_count: u32,
537        quiet_count: u32,
538        completed: bool,
539    }
540
541    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
542        let core_s = ctx.core();
543        let binding_s = ctx.core().binding();
544        let em = ctx.emitter();
545        let pid = ctx.node_id();
546        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
547        let core_sink = em.clone();
548        let state: Rc<RefCell<SettleState>> = Rc::new(RefCell::new(SettleState {
549            wave_count: 0,
550            quiet_count: 0,
551            completed: false,
552        }));
553
554        let source_sink: Sink = Rc::new(move |msgs| {
555            enum Act {
556                Emit(HandleId),
557                Complete,
558                Error(HandleId),
559                SelfComplete,
560            }
561            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
562            {
563                let mut s = state.borrow_mut();
564                if s.completed {
565                    return;
566                }
567                for m in msgs {
568                    if s.completed {
569                        break;
570                    }
571                    match m.tier() {
572                        3 => {
573                            if let Some(h) = m.payload_handle() {
574                                // DATA: reset quiet, increment waves.
575                                s.quiet_count = 0;
576                                s.wave_count += 1;
577                                bb.retain_handle(h);
578                                actions.push(Act::Emit(h));
579                                // Check max_waves.
580                                if let Some(max) = max_waves {
581                                    if s.wave_count >= max {
582                                        s.completed = true;
583                                        actions.push(Act::SelfComplete);
584                                    }
585                                }
586                            } else {
587                                // RESOLVED (tier 3, no payload): quiet wave.
588                                s.quiet_count += 1;
589                                if s.quiet_count >= quiet_waves {
590                                    s.completed = true;
591                                    actions.push(Act::SelfComplete);
592                                }
593                            }
594                        }
595                        5 => {
596                            if let Some(h) = m.payload_handle() {
597                                s.completed = true;
598                                bb.retain_handle(h);
599                                actions.push(Act::Error(h));
600                            } else {
601                                s.completed = true;
602                                actions.push(Act::Complete);
603                            }
604                        }
605                        _ => {}
606                    }
607                }
608            }
609            for a in actions {
610                match a {
611                    Act::Emit(h) => core_sink.emit(pid, h),
612                    Act::Complete | Act::SelfComplete => core_sink.complete(pid),
613                    Act::Error(h) => core_sink.error(pid, h),
614                }
615            }
616        });
617
618        let outcome = ctx.subscribe_to(source, source_sink);
619        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
620            core_s.complete(pid);
621        }
622    });
623
624    let fn_id = binding.register_producer_build(build);
625    core.register_producer(fn_id)
626        .expect("settle: register_producer failed")
627}
628
629// =========================================================================
630// repeat(source, count) — sequential resubscribe loop
631// =========================================================================
632
633/// Sequential resubscribe loop. Forwards all DATA from `source`. On
634/// source COMPLETE, if `remaining > 0`, resubscribes to `source` and
635/// decrements `remaining`. On ERROR, terminates immediately (no retry).
636///
637/// `count` is the number of ADDITIONAL subscriptions after the initial
638/// one. `count = 0` is identity passthrough. `count = 2` means the
639/// source will be subscribed up to 3 times total.
640#[must_use]
641pub fn repeat(
642    core: &Core,
643    binding: &Arc<dyn ProducerBinding>,
644    source: NodeId,
645    count: u32,
646) -> NodeId {
647    struct RepeatState {
648        remaining: u32,
649        terminated: bool,
650    }
651
652    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
653        let core_s = ctx.core();
654        let binding_s = ctx.core().binding();
655        let em = ctx.emitter();
656        let pid = ctx.node_id();
657        let state: Rc<RefCell<RepeatState>> = Rc::new(RefCell::new(RepeatState {
658            remaining: count,
659            terminated: false,
660        }));
661
662        // We need to store the subscription in producer storage and be able
663        // to replace it on resubscribe. S2b/D231: storage via the new
664        // `ProducerCtx::storage()` accessor (the build closure no longer
665        // holds a `ProducerBinding`). Resubscribe re-enters Core to
666        // `try_subscribe`, which a long-lived sink can only do via
667        // `em.defer` (D234) — see the `Act::Resubscribe` arm.
668        let storage = ctx.storage();
669
670        // Build the sink closure. It needs to reference itself for
671        // resubscription, so we use a shared slot. Single-owner since
672        // D248/D272 (`Sink = Rc<dyn Fn>`); the slot is owner-thread-only.
673        let sink_slot: Rc<RefCell<Option<Sink>>> = Rc::new(RefCell::new(None));
674        let sink_slot_inner = sink_slot.clone();
675        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
676        let core_sink = em.clone();
677        let storage_inner = storage.clone();
678
679        let source_sink: Sink = Rc::new(move |msgs| {
680            enum Act {
681                Emit(HandleId),
682                Error(HandleId),
683                Resubscribe,
684                Complete,
685            }
686            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
687            {
688                let mut s = state.borrow_mut();
689                if s.terminated {
690                    return;
691                }
692                for m in msgs {
693                    if s.terminated {
694                        break;
695                    }
696                    match m.tier() {
697                        3 => {
698                            if let Some(h) = m.payload_handle() {
699                                bb.retain_handle(h);
700                                actions.push(Act::Emit(h));
701                            }
702                        }
703                        5 => {
704                            if let Some(h) = m.payload_handle() {
705                                // ERROR — terminate immediately.
706                                s.terminated = true;
707                                bb.retain_handle(h);
708                                actions.push(Act::Error(h));
709                            } else {
710                                // COMPLETE — resubscribe if remaining > 0.
711                                if s.remaining > 0 {
712                                    s.remaining -= 1;
713                                    actions.push(Act::Resubscribe);
714                                } else {
715                                    s.terminated = true;
716                                    actions.push(Act::Complete);
717                                }
718                            }
719                        }
720                        _ => {}
721                    }
722                }
723            }
724            for a in actions {
725                match a {
726                    Act::Emit(h) => core_sink.emit(pid, h),
727                    Act::Error(h) => core_sink.error(pid, h),
728                    Act::Complete => core_sink.complete(pid),
729                    Act::Resubscribe => {
730                        // Get our own sink from the shared slot.
731                        let maybe_sink = sink_slot_inner.borrow_mut().clone();
732                        if let Some(new_sink) = maybe_sink {
733                            // D234: a long-lived sink can't hold `&Core`;
734                            // route the re-subscribe through `em.defer`
735                            // (owner-side, in-wave, FIFO-ordered). The
736                            // returned `SubscriptionId` is recorded
737                            // (D229 `(source, sub)` pair) inside the
738                            // closure; the dead/violation path completes
739                            // self there too.
740                            let storage_d = storage_inner.clone();
741                            let state_d = state.clone();
742                            let _ = core_sink.defer(move |c| {
743                                if let Ok(sub) = c.try_subscribe(source, new_sink) {
744                                    storage_d
745                                        .lock()
746                                        .entry(pid)
747                                        .or_default()
748                                        .subs
749                                        .push((source, sub));
750                                } else {
751                                    // Source dead / partition
752                                    // violation — terminal complete.
753                                    let mut s = state_d.borrow_mut();
754                                    if !s.terminated {
755                                        s.terminated = true;
756                                        drop(s);
757                                        c.complete(pid);
758                                    }
759                                }
760                            });
761                        }
762                    }
763                }
764            }
765        });
766
767        // Store sink in the shared slot so resubscribe can access it.
768        *sink_slot.borrow_mut() = Some(source_sink.clone());
769
770        let outcome = ctx.subscribe_to(source, source_sink);
771        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
772            // Dead source (non-resubscribable + terminated) will stay
773            // dead — resubscribing won't help. Complete immediately.
774            core_s.complete(pid);
775        }
776    });
777
778    let fn_id = binding.register_producer_build(build);
779    core.register_producer(fn_id)
780        .expect("repeat: register_producer failed")
781}