Skip to main content

graphrefly_operators/
control.rs

1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Control operators — side-effect, gating, error recovery, convergence.
11//!
12//! # Operators
13//!
14//! - [`tap`] / [`tap_observer`] — side-effect passthrough.
15//! - [`on_first_data`] — one-shot side-effect on first DATA.
16//! - [`rescue`] — error recovery via user callback.
17//! - [`valve`] — boolean-gated passthrough with optional cancellation.
18//! - [`settle`] — wave-count convergence detector.
19//! - [`repeat`] — sequential resubscribe loop.
20
21#![allow(clippy::too_many_lines, clippy::items_after_statements)]
22
23use std::sync::Arc;
24
25use parking_lot::Mutex;
26
27use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
28use smallvec::SmallVec;
29
30use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
31
32// =========================================================================
33// tap(source, fn_id) — side-effect passthrough
34// =========================================================================
35
36/// Passthrough operator that forwards all DATA unchanged. On each DATA,
37/// calls `binding.invoke_tap_fn(fn_id, handle)` as a side-effect.
38/// Forwards COMPLETE and ERROR unchanged.
39#[must_use]
40pub fn tap(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, fn_id: FnId) -> NodeId {
41    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
42        let core_s = ctx.core();
43        let binding_s = ctx.core().binding();
44        let em = ctx.emitter();
45        let pid = ctx.node_id();
46        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
47        let core_sink = em.clone();
48
49        let source_sink: Sink = Arc::new(move |msgs| {
50            enum Act {
51                EmitAndTap(HandleId),
52                Complete,
53                Error(HandleId),
54            }
55            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
56            for m in msgs {
57                match m.tier() {
58                    3 => {
59                        if let Some(h) = m.payload_handle() {
60                            bb.retain_handle(h);
61                            actions.push(Act::EmitAndTap(h));
62                        }
63                    }
64                    5 => {
65                        if let Some(h) = m.payload_handle() {
66                            bb.retain_handle(h);
67                            actions.push(Act::Error(h));
68                        } else {
69                            actions.push(Act::Complete);
70                        }
71                    }
72                    _ => {}
73                }
74            }
75            for a in actions {
76                match a {
77                    Act::EmitAndTap(h) => {
78                        bb.invoke_tap_fn(fn_id, h);
79                        core_sink.emit_or_defer(pid, h);
80                    }
81                    Act::Complete => core_sink.complete_or_defer(pid),
82                    Act::Error(h) => core_sink.error_or_defer(pid, h),
83                }
84            }
85        });
86
87        let outcome = ctx.subscribe_to(source, source_sink);
88        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
89            core_s.complete_or_defer(pid);
90        }
91    });
92
93    let fn_id_reg = binding.register_producer_build(build);
94    core.register_producer(fn_id_reg)
95        .expect("tap: register_producer failed")
96}
97
98// =========================================================================
99// tap_observer(source, data_fn, error_fn, complete_fn)
100// =========================================================================
101
102/// Like [`tap`] but with lifecycle observer callbacks. Each callback is
103/// optional (`None` = skip that lifecycle event).
104///
105/// - `data_fn_id`: called on each DATA via `invoke_tap_fn`.
106/// - `error_fn_id`: called on ERROR via `invoke_tap_error_fn`.
107/// - `complete_fn_id`: called on COMPLETE via `invoke_tap_complete_fn`.
108///
109/// All messages are forwarded unchanged regardless of callback presence.
110#[must_use]
111pub fn tap_observer(
112    core: &Core,
113    binding: &Arc<dyn ProducerBinding>,
114    source: NodeId,
115    data_fn_id: Option<FnId>,
116    error_fn_id: Option<FnId>,
117    complete_fn_id: Option<FnId>,
118) -> NodeId {
119    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
120        let core_s = ctx.core();
121        let binding_s = ctx.core().binding();
122        let em = ctx.emitter();
123        let pid = ctx.node_id();
124        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
125        let core_sink = em.clone();
126
127        let source_sink: Sink = Arc::new(move |msgs| {
128            enum Act {
129                Emit(HandleId),
130                Complete,
131                Error(HandleId),
132            }
133            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
134            for m in msgs {
135                match m.tier() {
136                    3 => {
137                        if let Some(h) = m.payload_handle() {
138                            bb.retain_handle(h);
139                            actions.push(Act::Emit(h));
140                        }
141                    }
142                    5 => {
143                        if let Some(h) = m.payload_handle() {
144                            bb.retain_handle(h);
145                            actions.push(Act::Error(h));
146                        } else {
147                            actions.push(Act::Complete);
148                        }
149                    }
150                    _ => {}
151                }
152            }
153            for a in actions {
154                match a {
155                    Act::Emit(h) => {
156                        if let Some(fid) = data_fn_id {
157                            bb.invoke_tap_fn(fid, h);
158                        }
159                        core_sink.emit_or_defer(pid, h);
160                    }
161                    Act::Complete => {
162                        if let Some(fid) = complete_fn_id {
163                            bb.invoke_tap_complete_fn(fid);
164                        }
165                        core_sink.complete_or_defer(pid);
166                    }
167                    Act::Error(h) => {
168                        if let Some(fid) = error_fn_id {
169                            bb.invoke_tap_error_fn(fid, h);
170                        }
171                        core_sink.error_or_defer(pid, h);
172                    }
173                }
174            }
175        });
176
177        let outcome = ctx.subscribe_to(source, source_sink);
178        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
179            if let Some(fid) = complete_fn_id {
180                binding_s.invoke_tap_complete_fn(fid);
181            }
182            core_s.complete_or_defer(pid);
183        }
184    });
185
186    let fn_id = binding.register_producer_build(build);
187    core.register_producer(fn_id)
188        .expect("tap_observer: register_producer failed")
189}
190
191// =========================================================================
192// on_first_data(source, fn_id) — one-shot tap
193// =========================================================================
194
195/// One-shot side-effect tap. Calls `invoke_tap_fn(fn_id, handle)` on
196/// the first DATA only, then becomes a pure passthrough. All messages
197/// are forwarded unchanged.
198#[must_use]
199pub fn on_first_data(
200    core: &Core,
201    binding: &Arc<dyn ProducerBinding>,
202    source: NodeId,
203    fn_id: FnId,
204) -> NodeId {
205    struct OnFirstState {
206        fired: bool,
207    }
208
209    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
210        let core_s = ctx.core();
211        let binding_s = ctx.core().binding();
212        let em = ctx.emitter();
213        let pid = ctx.node_id();
214        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
215        let core_sink = em.clone();
216        let state: Arc<Mutex<OnFirstState>> = Arc::new(Mutex::new(OnFirstState { fired: false }));
217
218        let source_sink: Sink = Arc::new(move |msgs| {
219            enum Act {
220                EmitWithTap(HandleId),
221                Emit(HandleId),
222                Complete,
223                Error(HandleId),
224            }
225            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
226            {
227                let mut s = state.lock();
228                for m in msgs {
229                    match m.tier() {
230                        3 => {
231                            if let Some(h) = m.payload_handle() {
232                                bb.retain_handle(h);
233                                if s.fired {
234                                    actions.push(Act::Emit(h));
235                                } else {
236                                    s.fired = true;
237                                    actions.push(Act::EmitWithTap(h));
238                                }
239                            }
240                        }
241                        5 => {
242                            if let Some(h) = m.payload_handle() {
243                                bb.retain_handle(h);
244                                actions.push(Act::Error(h));
245                            } else {
246                                actions.push(Act::Complete);
247                            }
248                        }
249                        _ => {}
250                    }
251                }
252            }
253            for a in actions {
254                match a {
255                    Act::EmitWithTap(h) => {
256                        bb.invoke_tap_fn(fn_id, h);
257                        core_sink.emit_or_defer(pid, h);
258                    }
259                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
260                    Act::Complete => core_sink.complete_or_defer(pid),
261                    Act::Error(h) => core_sink.error_or_defer(pid, h),
262                }
263            }
264        });
265
266        let outcome = ctx.subscribe_to(source, source_sink);
267        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
268            core_s.complete_or_defer(pid);
269        }
270    });
271
272    let fn_id_reg = binding.register_producer_build(build);
273    core.register_producer(fn_id_reg)
274        .expect("on_first_data: register_producer failed")
275}
276
277// =========================================================================
278// rescue(source, fn_id) — error recovery
279// =========================================================================
280
281/// Error recovery operator. On ERROR, calls
282/// `binding.invoke_rescue_fn(fn_id, error_handle)`:
283///
284/// - `Ok(recovered_handle)` — emit DATA with the recovered value.
285/// - `Err(())` — forward the original ERROR unchanged.
286///
287/// DATA and COMPLETE pass through unchanged.
288#[must_use]
289pub fn rescue(
290    core: &Core,
291    binding: &Arc<dyn ProducerBinding>,
292    source: NodeId,
293    fn_id: FnId,
294) -> NodeId {
295    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
296        let core_s = ctx.core();
297        let binding_s = ctx.core().binding();
298        let em = ctx.emitter();
299        let pid = ctx.node_id();
300        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
301        let core_sink = em.clone();
302
303        let source_sink: Sink = Arc::new(move |msgs| {
304            enum Act {
305                Emit(HandleId),
306                Complete,
307                TryRescue(HandleId),
308            }
309            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
310            for m in msgs {
311                match m.tier() {
312                    3 => {
313                        if let Some(h) = m.payload_handle() {
314                            bb.retain_handle(h);
315                            actions.push(Act::Emit(h));
316                        }
317                    }
318                    5 => {
319                        if let Some(h) = m.payload_handle() {
320                            bb.retain_handle(h);
321                            actions.push(Act::TryRescue(h));
322                        } else {
323                            actions.push(Act::Complete);
324                        }
325                    }
326                    _ => {}
327                }
328            }
329            for a in actions {
330                match a {
331                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
332                    Act::Complete => core_sink.complete_or_defer(pid),
333                    Act::TryRescue(err_h) => {
334                        match bb.invoke_rescue_fn(fn_id, err_h) {
335                            Ok(recovered_h) => {
336                                // Recovery succeeded — release original error,
337                                // emit recovered value as DATA.
338                                bb.release_handle(err_h);
339                                core_sink.emit_or_defer(pid, recovered_h);
340                            }
341                            Err(()) => {
342                                // Recovery failed — forward original ERROR.
343                                core_sink.error_or_defer(pid, err_h);
344                            }
345                        }
346                    }
347                }
348            }
349        });
350
351        let outcome = ctx.subscribe_to(source, source_sink);
352        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
353            core_s.complete_or_defer(pid);
354        }
355    });
356
357    let fn_id_reg = binding.register_producer_build(build);
358    core.register_producer(fn_id_reg)
359        .expect("rescue: register_producer failed")
360}
361
362// =========================================================================
363// valve(source, control, gate_fn_id, cancel) — boolean gate
364// =========================================================================
365
366/// Boolean-gated passthrough. Subscribes to both `source` and `control`.
367///
368/// - **Control DATA**: evaluates `binding.predicate_each(gate_fn_id,
369///   &[handle])` to determine gate state. `true` = open, `false` = closed.
370///   On transition from open to closed, if `cancel` is `Some`, invokes
371///   `cancel.cancel()`.
372/// - **Source DATA**: if gate is open, retains + emits. If closed, drops
373///   the handle (source DATA while gate is closed is silently discarded).
374/// - **Source COMPLETE/ERROR**: forwarded unchanged.
375/// - **Control COMPLETE**: does NOT auto-complete the valve (per TS
376///   `completeWhenDepsComplete: false` equivalent).
377/// - **Control ERROR**: terminates the valve with ERROR.
378#[must_use]
379pub fn valve(
380    core: &Core,
381    binding: &Arc<dyn ProducerBinding>,
382    source: NodeId,
383    control: NodeId,
384    gate_fn_id: FnId,
385    cancel: Option<tokio_util::sync::CancellationToken>,
386) -> NodeId {
387    struct ValveState {
388        open: bool,
389        terminated: bool,
390    }
391
392    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
393        let core_s = ctx.core();
394        let binding_s = ctx.core().binding();
395        let em = ctx.emitter();
396        let pid = ctx.node_id();
397        let state: Arc<Mutex<ValveState>> = Arc::new(Mutex::new(ValveState {
398            open: false,
399            terminated: false,
400        }));
401
402        // --- control sink ---
403        let st_ctrl = state.clone();
404        let bb_ctrl: Arc<dyn BindingBoundary> = binding_s.clone();
405        let core_ctrl = em.clone();
406        let cancel_ctrl = cancel.clone();
407        let control_sink: Sink = Arc::new(move |msgs| {
408            let mut should_cancel = false;
409            let mut error_action: Option<HandleId> = None;
410            {
411                let mut s = st_ctrl.lock();
412                if s.terminated {
413                    return;
414                }
415                for m in msgs {
416                    match m.tier() {
417                        3 => {
418                            if let Some(h) = m.payload_handle() {
419                                let results = bb_ctrl.predicate_each(gate_fn_id, &[h]);
420                                let new_open = results.first().copied().unwrap_or(false);
421                                let was_open = s.open;
422                                s.open = new_open;
423                                // Transition open -> closed: trigger cancel.
424                                if was_open && !new_open && cancel_ctrl.is_some() {
425                                    should_cancel = true;
426                                }
427                            }
428                        }
429                        5 => {
430                            if let Some(h) = m.payload_handle() {
431                                // Control ERROR terminates valve.
432                                if !s.terminated {
433                                    s.terminated = true;
434                                    bb_ctrl.retain_handle(h);
435                                    error_action = Some(h);
436                                }
437                            }
438                            // Control COMPLETE: do NOT auto-complete.
439                        }
440                        _ => {}
441                    }
442                }
443            }
444            if should_cancel {
445                if let Some(ref ct) = cancel_ctrl {
446                    ct.cancel();
447                }
448            }
449            if let Some(h) = error_action {
450                core_ctrl.error_or_defer(pid, h);
451            }
452        });
453
454        let ctrl_outcome = ctx.subscribe_to(control, control_sink);
455        if matches!(ctrl_outcome, SubscribeOutcome::Dead { .. }) {
456            // Control is dead — gate state remains as-is (closed by default).
457            // No auto-complete; source can still flow if gate was already opened.
458        }
459
460        // --- source sink ---
461        let st_src = state.clone();
462        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
463        let core_src = em.clone();
464        let source_sink: Sink = Arc::new(move |msgs| {
465            enum Act {
466                Emit(HandleId),
467                Complete,
468                Error(HandleId),
469            }
470            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
471            {
472                let s = st_src.lock();
473                if s.terminated {
474                    return;
475                }
476                for m in msgs {
477                    match m.tier() {
478                        3 => {
479                            if let Some(h) = m.payload_handle() {
480                                if s.open {
481                                    bb_src.retain_handle(h);
482                                    actions.push(Act::Emit(h));
483                                }
484                                // Closed gate: silently discard.
485                            }
486                        }
487                        5 => {
488                            if let Some(h) = m.payload_handle() {
489                                bb_src.retain_handle(h);
490                                actions.push(Act::Error(h));
491                            } else {
492                                actions.push(Act::Complete);
493                            }
494                        }
495                        _ => {}
496                    }
497                }
498            }
499            for a in actions {
500                match a {
501                    Act::Emit(h) => core_src.emit_or_defer(pid, h),
502                    Act::Complete => core_src.complete_or_defer(pid),
503                    Act::Error(h) => core_src.error_or_defer(pid, h),
504                }
505            }
506        });
507
508        let src_outcome = ctx.subscribe_to(source, source_sink);
509        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
510            let mut s = state.lock();
511            if !s.terminated {
512                s.terminated = true;
513                drop(s);
514                core_s.complete_or_defer(pid);
515            }
516        }
517    });
518
519    let fn_id = binding.register_producer_build(build);
520    core.register_producer(fn_id)
521        .expect("valve: register_producer failed")
522}
523
524// =========================================================================
525// settle(source, quiet_waves, max_waves) — convergence detector
526// =========================================================================
527
528/// Wave-count convergence detector.
529///
530/// - On DATA: resets `quiet_count` to 0, increments `wave_count`. Forwards
531///   DATA unchanged. If `max_waves` is set and `wave_count >= max_waves`,
532///   completes.
533/// - On RESOLVED (tier 3, no payload): increments `quiet_count`. If
534///   `quiet_count >= quiet_waves`, completes.
535/// - On COMPLETE/ERROR: forwarded unchanged.
536#[must_use]
537pub fn settle(
538    core: &Core,
539    binding: &Arc<dyn ProducerBinding>,
540    source: NodeId,
541    quiet_waves: u32,
542    max_waves: Option<u32>,
543) -> NodeId {
544    struct SettleState {
545        wave_count: u32,
546        quiet_count: u32,
547        completed: bool,
548    }
549
550    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
551        let core_s = ctx.core();
552        let binding_s = ctx.core().binding();
553        let em = ctx.emitter();
554        let pid = ctx.node_id();
555        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
556        let core_sink = em.clone();
557        let state: Arc<Mutex<SettleState>> = Arc::new(Mutex::new(SettleState {
558            wave_count: 0,
559            quiet_count: 0,
560            completed: false,
561        }));
562
563        let source_sink: Sink = Arc::new(move |msgs| {
564            enum Act {
565                Emit(HandleId),
566                Complete,
567                Error(HandleId),
568                SelfComplete,
569            }
570            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
571            {
572                let mut s = state.lock();
573                if s.completed {
574                    return;
575                }
576                for m in msgs {
577                    if s.completed {
578                        break;
579                    }
580                    match m.tier() {
581                        3 => {
582                            if let Some(h) = m.payload_handle() {
583                                // DATA: reset quiet, increment waves.
584                                s.quiet_count = 0;
585                                s.wave_count += 1;
586                                bb.retain_handle(h);
587                                actions.push(Act::Emit(h));
588                                // Check max_waves.
589                                if let Some(max) = max_waves {
590                                    if s.wave_count >= max {
591                                        s.completed = true;
592                                        actions.push(Act::SelfComplete);
593                                    }
594                                }
595                            } else {
596                                // RESOLVED (tier 3, no payload): quiet wave.
597                                s.quiet_count += 1;
598                                if s.quiet_count >= quiet_waves {
599                                    s.completed = true;
600                                    actions.push(Act::SelfComplete);
601                                }
602                            }
603                        }
604                        5 => {
605                            if let Some(h) = m.payload_handle() {
606                                s.completed = true;
607                                bb.retain_handle(h);
608                                actions.push(Act::Error(h));
609                            } else {
610                                s.completed = true;
611                                actions.push(Act::Complete);
612                            }
613                        }
614                        _ => {}
615                    }
616                }
617            }
618            for a in actions {
619                match a {
620                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
621                    Act::Complete | Act::SelfComplete => core_sink.complete_or_defer(pid),
622                    Act::Error(h) => core_sink.error_or_defer(pid, h),
623                }
624            }
625        });
626
627        let outcome = ctx.subscribe_to(source, source_sink);
628        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
629            core_s.complete_or_defer(pid);
630        }
631    });
632
633    let fn_id = binding.register_producer_build(build);
634    core.register_producer(fn_id)
635        .expect("settle: register_producer failed")
636}
637
638// =========================================================================
639// repeat(source, count) — sequential resubscribe loop
640// =========================================================================
641
642/// Sequential resubscribe loop. Forwards all DATA from `source`. On
643/// source COMPLETE, if `remaining > 0`, resubscribes to `source` and
644/// decrements `remaining`. On ERROR, terminates immediately (no retry).
645///
646/// `count` is the number of ADDITIONAL subscriptions after the initial
647/// one. `count = 0` is identity passthrough. `count = 2` means the
648/// source will be subscribed up to 3 times total.
649#[must_use]
650pub fn repeat(
651    core: &Core,
652    binding: &Arc<dyn ProducerBinding>,
653    source: NodeId,
654    count: u32,
655) -> NodeId {
656    struct RepeatState {
657        remaining: u32,
658        terminated: bool,
659    }
660
661    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
662        let core_s = ctx.core();
663        let binding_s = ctx.core().binding();
664        let em = ctx.emitter();
665        let pid = ctx.node_id();
666        let state: Arc<Mutex<RepeatState>> = Arc::new(Mutex::new(RepeatState {
667            remaining: count,
668            terminated: false,
669        }));
670
671        // We need to store the subscription in producer storage and be able
672        // to replace it on resubscribe. S2b/D231: storage via the new
673        // `ProducerCtx::storage()` accessor (the build closure no longer
674        // holds a `ProducerBinding`). Resubscribe re-enters Core to
675        // `try_subscribe`, which a long-lived sink can only do via
676        // `em.defer` (D234) — see the `Act::Resubscribe` arm.
677        let storage = ctx.storage();
678
679        // Build the sink closure. It needs to reference itself for
680        // resubscription, so we use a shared slot.
681        let sink_slot: Arc<Mutex<Option<Sink>>> = Arc::new(Mutex::new(None));
682        let sink_slot_inner = sink_slot.clone();
683        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
684        let core_sink = em.clone();
685        let storage_inner = storage.clone();
686
687        let source_sink: Sink = Arc::new(move |msgs| {
688            enum Act {
689                Emit(HandleId),
690                Error(HandleId),
691                Resubscribe,
692                Complete,
693            }
694            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
695            {
696                let mut s = state.lock();
697                if s.terminated {
698                    return;
699                }
700                for m in msgs {
701                    if s.terminated {
702                        break;
703                    }
704                    match m.tier() {
705                        3 => {
706                            if let Some(h) = m.payload_handle() {
707                                bb.retain_handle(h);
708                                actions.push(Act::Emit(h));
709                            }
710                        }
711                        5 => {
712                            if let Some(h) = m.payload_handle() {
713                                // ERROR — terminate immediately.
714                                s.terminated = true;
715                                bb.retain_handle(h);
716                                actions.push(Act::Error(h));
717                            } else {
718                                // COMPLETE — resubscribe if remaining > 0.
719                                if s.remaining > 0 {
720                                    s.remaining -= 1;
721                                    actions.push(Act::Resubscribe);
722                                } else {
723                                    s.terminated = true;
724                                    actions.push(Act::Complete);
725                                }
726                            }
727                        }
728                        _ => {}
729                    }
730                }
731            }
732            for a in actions {
733                match a {
734                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
735                    Act::Error(h) => core_sink.error_or_defer(pid, h),
736                    Act::Complete => core_sink.complete_or_defer(pid),
737                    Act::Resubscribe => {
738                        // Get our own sink from the shared slot.
739                        let maybe_sink = sink_slot_inner.lock().clone();
740                        if let Some(new_sink) = maybe_sink {
741                            // D234: a long-lived sink can't hold `&Core`;
742                            // route the re-subscribe through `em.defer`
743                            // (owner-side, in-wave, FIFO-ordered). The
744                            // returned `SubscriptionId` is recorded
745                            // (D229 `(source, sub)` pair) inside the
746                            // closure; the dead/violation path completes
747                            // self there too.
748                            let storage_d = storage_inner.clone();
749                            let state_d = state.clone();
750                            let _ = core_sink.defer(move |c| {
751                                if let Ok(sub) = c.try_subscribe(source, new_sink) {
752                                    storage_d
753                                        .lock()
754                                        .entry(pid)
755                                        .or_default()
756                                        .subs
757                                        .push((source, sub));
758                                } else {
759                                    // Source dead / partition
760                                    // violation — terminal complete.
761                                    let mut s = state_d.lock();
762                                    if !s.terminated {
763                                        s.terminated = true;
764                                        drop(s);
765                                        c.complete(pid);
766                                    }
767                                }
768                            });
769                        }
770                    }
771                }
772            }
773        });
774
775        // Store sink in the shared slot so resubscribe can access it.
776        *sink_slot.lock() = Some(source_sink.clone());
777
778        let outcome = ctx.subscribe_to(source, source_sink);
779        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
780            // Dead source (non-resubscribable + terminated) will stay
781            // dead — resubscribing won't help. Complete immediately.
782            core_s.complete_or_defer(pid);
783        }
784    });
785
786    let fn_id = binding.register_producer_build(build);
787    core.register_producer(fn_id)
788        .expect("repeat: register_producer failed")
789}