Skip to main content

graphrefly_operators/
control.rs

1//! Control operators — side-effect, gating, error recovery, convergence.
2//!
3//! # Operators
4//!
5//! - [`tap`] / [`tap_observer`] — side-effect passthrough.
6//! - [`on_first_data`] — one-shot side-effect on first DATA.
7//! - [`rescue`] — error recovery via user callback.
8//! - [`valve`] — boolean-gated passthrough with optional cancellation.
9//! - [`settle`] — wave-count convergence detector.
10//! - [`repeat`] — sequential resubscribe loop.
11
12#![allow(clippy::too_many_lines, clippy::items_after_statements)]
13
14use std::sync::{Arc, Weak};
15
16use parking_lot::Mutex;
17
18use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
22
23// =========================================================================
24// tap(source, fn_id) — side-effect passthrough
25// =========================================================================
26
27/// Passthrough operator that forwards all DATA unchanged. On each DATA,
28/// calls `binding.invoke_tap_fn(fn_id, handle)` as a side-effect.
29/// Forwards COMPLETE and ERROR unchanged.
30#[must_use]
31pub fn tap(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, fn_id: FnId) -> NodeId {
32    let core_weak = core.weak_handle();
33    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
34
35    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
36        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
37            return;
38        };
39        let pid = ctx.node_id();
40        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
41        let core_sink = core_s.clone();
42
43        let source_sink: Sink = Arc::new(move |msgs| {
44            enum Act {
45                EmitAndTap(HandleId),
46                Complete,
47                Error(HandleId),
48            }
49            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
50            for m in msgs {
51                match m.tier() {
52                    3 => {
53                        if let Some(h) = m.payload_handle() {
54                            bb.retain_handle(h);
55                            actions.push(Act::EmitAndTap(h));
56                        }
57                    }
58                    5 => {
59                        if let Some(h) = m.payload_handle() {
60                            bb.retain_handle(h);
61                            actions.push(Act::Error(h));
62                        } else {
63                            actions.push(Act::Complete);
64                        }
65                    }
66                    _ => {}
67                }
68            }
69            for a in actions {
70                match a {
71                    Act::EmitAndTap(h) => {
72                        bb.invoke_tap_fn(fn_id, h);
73                        core_sink.emit_or_defer(pid, h);
74                    }
75                    Act::Complete => core_sink.complete_or_defer(pid),
76                    Act::Error(h) => core_sink.error_or_defer(pid, h),
77                }
78            }
79        });
80
81        let outcome = ctx.subscribe_to(source, source_sink);
82        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
83            core_s.complete_or_defer(pid);
84        }
85    });
86
87    let fn_id_reg = binding.register_producer_build(build);
88    core.register_producer(fn_id_reg)
89        .expect("tap: register_producer failed")
90}
91
92// =========================================================================
93// tap_observer(source, data_fn, error_fn, complete_fn)
94// =========================================================================
95
96/// Like [`tap`] but with lifecycle observer callbacks. Each callback is
97/// optional (`None` = skip that lifecycle event).
98///
99/// - `data_fn_id`: called on each DATA via `invoke_tap_fn`.
100/// - `error_fn_id`: called on ERROR via `invoke_tap_error_fn`.
101/// - `complete_fn_id`: called on COMPLETE via `invoke_tap_complete_fn`.
102///
103/// All messages are forwarded unchanged regardless of callback presence.
104#[must_use]
105pub fn tap_observer(
106    core: &Core,
107    binding: &Arc<dyn ProducerBinding>,
108    source: NodeId,
109    data_fn_id: Option<FnId>,
110    error_fn_id: Option<FnId>,
111    complete_fn_id: Option<FnId>,
112) -> NodeId {
113    let core_weak = core.weak_handle();
114    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
115
116    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
117        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
118            return;
119        };
120        let pid = ctx.node_id();
121        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
122        let core_sink = core_s.clone();
123
124        let source_sink: Sink = Arc::new(move |msgs| {
125            enum Act {
126                Emit(HandleId),
127                Complete,
128                Error(HandleId),
129            }
130            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
131            for m in msgs {
132                match m.tier() {
133                    3 => {
134                        if let Some(h) = m.payload_handle() {
135                            bb.retain_handle(h);
136                            actions.push(Act::Emit(h));
137                        }
138                    }
139                    5 => {
140                        if let Some(h) = m.payload_handle() {
141                            bb.retain_handle(h);
142                            actions.push(Act::Error(h));
143                        } else {
144                            actions.push(Act::Complete);
145                        }
146                    }
147                    _ => {}
148                }
149            }
150            for a in actions {
151                match a {
152                    Act::Emit(h) => {
153                        if let Some(fid) = data_fn_id {
154                            bb.invoke_tap_fn(fid, h);
155                        }
156                        core_sink.emit_or_defer(pid, h);
157                    }
158                    Act::Complete => {
159                        if let Some(fid) = complete_fn_id {
160                            bb.invoke_tap_complete_fn(fid);
161                        }
162                        core_sink.complete_or_defer(pid);
163                    }
164                    Act::Error(h) => {
165                        if let Some(fid) = error_fn_id {
166                            bb.invoke_tap_error_fn(fid, h);
167                        }
168                        core_sink.error_or_defer(pid, h);
169                    }
170                }
171            }
172        });
173
174        let outcome = ctx.subscribe_to(source, source_sink);
175        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
176            if let Some(fid) = complete_fn_id {
177                binding_s.invoke_tap_complete_fn(fid);
178            }
179            core_s.complete_or_defer(pid);
180        }
181    });
182
183    let fn_id = binding.register_producer_build(build);
184    core.register_producer(fn_id)
185        .expect("tap_observer: register_producer failed")
186}
187
188// =========================================================================
189// on_first_data(source, fn_id) — one-shot tap
190// =========================================================================
191
192/// One-shot side-effect tap. Calls `invoke_tap_fn(fn_id, handle)` on
193/// the first DATA only, then becomes a pure passthrough. All messages
194/// are forwarded unchanged.
195#[must_use]
196pub fn on_first_data(
197    core: &Core,
198    binding: &Arc<dyn ProducerBinding>,
199    source: NodeId,
200    fn_id: FnId,
201) -> NodeId {
202    let core_weak = core.weak_handle();
203    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
204
205    struct OnFirstState {
206        fired: bool,
207    }
208
209    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
210        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
211            return;
212        };
213        let pid = ctx.node_id();
214        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
215        let core_sink = core_s.clone();
216        let state: Arc<Mutex<OnFirstState>> = Arc::new(Mutex::new(OnFirstState { fired: false }));
217
218        let source_sink: Sink = Arc::new(move |msgs| {
219            enum Act {
220                EmitWithTap(HandleId),
221                Emit(HandleId),
222                Complete,
223                Error(HandleId),
224            }
225            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
226            {
227                let mut s = state.lock();
228                for m in msgs {
229                    match m.tier() {
230                        3 => {
231                            if let Some(h) = m.payload_handle() {
232                                bb.retain_handle(h);
233                                if s.fired {
234                                    actions.push(Act::Emit(h));
235                                } else {
236                                    s.fired = true;
237                                    actions.push(Act::EmitWithTap(h));
238                                }
239                            }
240                        }
241                        5 => {
242                            if let Some(h) = m.payload_handle() {
243                                bb.retain_handle(h);
244                                actions.push(Act::Error(h));
245                            } else {
246                                actions.push(Act::Complete);
247                            }
248                        }
249                        _ => {}
250                    }
251                }
252            }
253            for a in actions {
254                match a {
255                    Act::EmitWithTap(h) => {
256                        bb.invoke_tap_fn(fn_id, h);
257                        core_sink.emit_or_defer(pid, h);
258                    }
259                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
260                    Act::Complete => core_sink.complete_or_defer(pid),
261                    Act::Error(h) => core_sink.error_or_defer(pid, h),
262                }
263            }
264        });
265
266        let outcome = ctx.subscribe_to(source, source_sink);
267        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
268            core_s.complete_or_defer(pid);
269        }
270    });
271
272    let fn_id_reg = binding.register_producer_build(build);
273    core.register_producer(fn_id_reg)
274        .expect("on_first_data: register_producer failed")
275}
276
277// =========================================================================
278// rescue(source, fn_id) — error recovery
279// =========================================================================
280
281/// Error recovery operator. On ERROR, calls
282/// `binding.invoke_rescue_fn(fn_id, error_handle)`:
283///
284/// - `Ok(recovered_handle)` — emit DATA with the recovered value.
285/// - `Err(())` — forward the original ERROR unchanged.
286///
287/// DATA and COMPLETE pass through unchanged.
288#[must_use]
289pub fn rescue(
290    core: &Core,
291    binding: &Arc<dyn ProducerBinding>,
292    source: NodeId,
293    fn_id: FnId,
294) -> NodeId {
295    let core_weak = core.weak_handle();
296    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
297
298    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
299        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
300            return;
301        };
302        let pid = ctx.node_id();
303        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
304        let core_sink = core_s.clone();
305
306        let source_sink: Sink = Arc::new(move |msgs| {
307            enum Act {
308                Emit(HandleId),
309                Complete,
310                TryRescue(HandleId),
311            }
312            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
313            for m in msgs {
314                match m.tier() {
315                    3 => {
316                        if let Some(h) = m.payload_handle() {
317                            bb.retain_handle(h);
318                            actions.push(Act::Emit(h));
319                        }
320                    }
321                    5 => {
322                        if let Some(h) = m.payload_handle() {
323                            bb.retain_handle(h);
324                            actions.push(Act::TryRescue(h));
325                        } else {
326                            actions.push(Act::Complete);
327                        }
328                    }
329                    _ => {}
330                }
331            }
332            for a in actions {
333                match a {
334                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
335                    Act::Complete => core_sink.complete_or_defer(pid),
336                    Act::TryRescue(err_h) => {
337                        match bb.invoke_rescue_fn(fn_id, err_h) {
338                            Ok(recovered_h) => {
339                                // Recovery succeeded — release original error,
340                                // emit recovered value as DATA.
341                                bb.release_handle(err_h);
342                                core_sink.emit_or_defer(pid, recovered_h);
343                            }
344                            Err(()) => {
345                                // Recovery failed — forward original ERROR.
346                                core_sink.error_or_defer(pid, err_h);
347                            }
348                        }
349                    }
350                }
351            }
352        });
353
354        let outcome = ctx.subscribe_to(source, source_sink);
355        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
356            core_s.complete_or_defer(pid);
357        }
358    });
359
360    let fn_id_reg = binding.register_producer_build(build);
361    core.register_producer(fn_id_reg)
362        .expect("rescue: register_producer failed")
363}
364
365// =========================================================================
366// valve(source, control, gate_fn_id, cancel) — boolean gate
367// =========================================================================
368
369/// Boolean-gated passthrough. Subscribes to both `source` and `control`.
370///
371/// - **Control DATA**: evaluates `binding.predicate_each(gate_fn_id,
372///   &[handle])` to determine gate state. `true` = open, `false` = closed.
373///   On transition from open to closed, if `cancel` is `Some`, invokes
374///   `cancel.cancel()`.
375/// - **Source DATA**: if gate is open, retains + emits. If closed, drops
376///   the handle (source DATA while gate is closed is silently discarded).
377/// - **Source COMPLETE/ERROR**: forwarded unchanged.
378/// - **Control COMPLETE**: does NOT auto-complete the valve (per TS
379///   `completeWhenDepsComplete: false` equivalent).
380/// - **Control ERROR**: terminates the valve with ERROR.
381#[must_use]
382pub fn valve(
383    core: &Core,
384    binding: &Arc<dyn ProducerBinding>,
385    source: NodeId,
386    control: NodeId,
387    gate_fn_id: FnId,
388    cancel: Option<tokio_util::sync::CancellationToken>,
389) -> NodeId {
390    let core_weak = core.weak_handle();
391    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
392
393    struct ValveState {
394        open: bool,
395        terminated: bool,
396    }
397
398    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
399        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
400            return;
401        };
402        let pid = ctx.node_id();
403        let state: Arc<Mutex<ValveState>> = Arc::new(Mutex::new(ValveState {
404            open: false,
405            terminated: false,
406        }));
407
408        // --- control sink ---
409        let st_ctrl = state.clone();
410        let bb_ctrl: Arc<dyn BindingBoundary> = binding_s.clone();
411        let core_ctrl = core_s.clone();
412        let cancel_ctrl = cancel.clone();
413        let control_sink: Sink = Arc::new(move |msgs| {
414            let mut should_cancel = false;
415            let mut error_action: Option<HandleId> = None;
416            {
417                let mut s = st_ctrl.lock();
418                if s.terminated {
419                    return;
420                }
421                for m in msgs {
422                    match m.tier() {
423                        3 => {
424                            if let Some(h) = m.payload_handle() {
425                                let results = bb_ctrl.predicate_each(gate_fn_id, &[h]);
426                                let new_open = results.first().copied().unwrap_or(false);
427                                let was_open = s.open;
428                                s.open = new_open;
429                                // Transition open -> closed: trigger cancel.
430                                if was_open && !new_open && cancel_ctrl.is_some() {
431                                    should_cancel = true;
432                                }
433                            }
434                        }
435                        5 => {
436                            if let Some(h) = m.payload_handle() {
437                                // Control ERROR terminates valve.
438                                if !s.terminated {
439                                    s.terminated = true;
440                                    bb_ctrl.retain_handle(h);
441                                    error_action = Some(h);
442                                }
443                            }
444                            // Control COMPLETE: do NOT auto-complete.
445                        }
446                        _ => {}
447                    }
448                }
449            }
450            if should_cancel {
451                if let Some(ref ct) = cancel_ctrl {
452                    ct.cancel();
453                }
454            }
455            if let Some(h) = error_action {
456                core_ctrl.error_or_defer(pid, h);
457            }
458        });
459
460        let ctrl_outcome = ctx.subscribe_to(control, control_sink);
461        if matches!(ctrl_outcome, SubscribeOutcome::Dead { .. }) {
462            // Control is dead — gate state remains as-is (closed by default).
463            // No auto-complete; source can still flow if gate was already opened.
464        }
465
466        // --- source sink ---
467        let st_src = state.clone();
468        let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
469        let core_src = core_s.clone();
470        let source_sink: Sink = Arc::new(move |msgs| {
471            enum Act {
472                Emit(HandleId),
473                Complete,
474                Error(HandleId),
475            }
476            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
477            {
478                let s = st_src.lock();
479                if s.terminated {
480                    return;
481                }
482                for m in msgs {
483                    match m.tier() {
484                        3 => {
485                            if let Some(h) = m.payload_handle() {
486                                if s.open {
487                                    bb_src.retain_handle(h);
488                                    actions.push(Act::Emit(h));
489                                }
490                                // Closed gate: silently discard.
491                            }
492                        }
493                        5 => {
494                            if let Some(h) = m.payload_handle() {
495                                bb_src.retain_handle(h);
496                                actions.push(Act::Error(h));
497                            } else {
498                                actions.push(Act::Complete);
499                            }
500                        }
501                        _ => {}
502                    }
503                }
504            }
505            for a in actions {
506                match a {
507                    Act::Emit(h) => core_src.emit_or_defer(pid, h),
508                    Act::Complete => core_src.complete_or_defer(pid),
509                    Act::Error(h) => core_src.error_or_defer(pid, h),
510                }
511            }
512        });
513
514        let src_outcome = ctx.subscribe_to(source, source_sink);
515        if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
516            let mut s = state.lock();
517            if !s.terminated {
518                s.terminated = true;
519                drop(s);
520                core_s.complete_or_defer(pid);
521            }
522        }
523    });
524
525    let fn_id = binding.register_producer_build(build);
526    core.register_producer(fn_id)
527        .expect("valve: register_producer failed")
528}
529
530// =========================================================================
531// settle(source, quiet_waves, max_waves) — convergence detector
532// =========================================================================
533
534/// Wave-count convergence detector.
535///
536/// - On DATA: resets `quiet_count` to 0, increments `wave_count`. Forwards
537///   DATA unchanged. If `max_waves` is set and `wave_count >= max_waves`,
538///   completes.
539/// - On RESOLVED (tier 3, no payload): increments `quiet_count`. If
540///   `quiet_count >= quiet_waves`, completes.
541/// - On COMPLETE/ERROR: forwarded unchanged.
542#[must_use]
543pub fn settle(
544    core: &Core,
545    binding: &Arc<dyn ProducerBinding>,
546    source: NodeId,
547    quiet_waves: u32,
548    max_waves: Option<u32>,
549) -> NodeId {
550    let core_weak = core.weak_handle();
551    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
552
553    struct SettleState {
554        wave_count: u32,
555        quiet_count: u32,
556        completed: bool,
557    }
558
559    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
560        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
561            return;
562        };
563        let pid = ctx.node_id();
564        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
565        let core_sink = core_s.clone();
566        let state: Arc<Mutex<SettleState>> = Arc::new(Mutex::new(SettleState {
567            wave_count: 0,
568            quiet_count: 0,
569            completed: false,
570        }));
571
572        let source_sink: Sink = Arc::new(move |msgs| {
573            enum Act {
574                Emit(HandleId),
575                Complete,
576                Error(HandleId),
577                SelfComplete,
578            }
579            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
580            {
581                let mut s = state.lock();
582                if s.completed {
583                    return;
584                }
585                for m in msgs {
586                    if s.completed {
587                        break;
588                    }
589                    match m.tier() {
590                        3 => {
591                            if let Some(h) = m.payload_handle() {
592                                // DATA: reset quiet, increment waves.
593                                s.quiet_count = 0;
594                                s.wave_count += 1;
595                                bb.retain_handle(h);
596                                actions.push(Act::Emit(h));
597                                // Check max_waves.
598                                if let Some(max) = max_waves {
599                                    if s.wave_count >= max {
600                                        s.completed = true;
601                                        actions.push(Act::SelfComplete);
602                                    }
603                                }
604                            } else {
605                                // RESOLVED (tier 3, no payload): quiet wave.
606                                s.quiet_count += 1;
607                                if s.quiet_count >= quiet_waves {
608                                    s.completed = true;
609                                    actions.push(Act::SelfComplete);
610                                }
611                            }
612                        }
613                        5 => {
614                            if let Some(h) = m.payload_handle() {
615                                s.completed = true;
616                                bb.retain_handle(h);
617                                actions.push(Act::Error(h));
618                            } else {
619                                s.completed = true;
620                                actions.push(Act::Complete);
621                            }
622                        }
623                        _ => {}
624                    }
625                }
626            }
627            for a in actions {
628                match a {
629                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
630                    Act::Complete | Act::SelfComplete => core_sink.complete_or_defer(pid),
631                    Act::Error(h) => core_sink.error_or_defer(pid, h),
632                }
633            }
634        });
635
636        let outcome = ctx.subscribe_to(source, source_sink);
637        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
638            core_s.complete_or_defer(pid);
639        }
640    });
641
642    let fn_id = binding.register_producer_build(build);
643    core.register_producer(fn_id)
644        .expect("settle: register_producer failed")
645}
646
647// =========================================================================
648// repeat(source, count) — sequential resubscribe loop
649// =========================================================================
650
651/// Sequential resubscribe loop. Forwards all DATA from `source`. On
652/// source COMPLETE, if `remaining > 0`, resubscribes to `source` and
653/// decrements `remaining`. On ERROR, terminates immediately (no retry).
654///
655/// `count` is the number of ADDITIONAL subscriptions after the initial
656/// one. `count = 0` is identity passthrough. `count = 2` means the
657/// source will be subscribed up to 3 times total.
658#[must_use]
659pub fn repeat(
660    core: &Core,
661    binding: &Arc<dyn ProducerBinding>,
662    source: NodeId,
663    count: u32,
664) -> NodeId {
665    let core_weak = core.weak_handle();
666    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
667
668    struct RepeatState {
669        remaining: u32,
670        terminated: bool,
671    }
672
673    let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
674        let (Some(core_s), Some(binding_s)) = (core_weak.upgrade(), binding_weak.upgrade()) else {
675            return;
676        };
677        let pid = ctx.node_id();
678        let state: Arc<Mutex<RepeatState>> = Arc::new(Mutex::new(RepeatState {
679            remaining: count,
680            terminated: false,
681        }));
682
683        // We need to store the subscription in producer storage and be able
684        // to replace it on resubscribe. The producer storage already holds
685        // subs via ProducerCtx::subscribe_to. For resubscribe, we use
686        // Core::try_subscribe directly and manage the subscription ourselves
687        // in op_state.
688        let storage = binding_s.producer_storage().clone();
689
690        // Build the sink closure. It needs to reference itself for
691        // resubscription, so we use a shared slot.
692        let sink_slot: Arc<Mutex<Option<Sink>>> = Arc::new(Mutex::new(None));
693        let sink_slot_inner = sink_slot.clone();
694        let bb: Arc<dyn BindingBoundary> = binding_s.clone();
695        let core_sink = core_s.clone();
696        let storage_inner = storage.clone();
697
698        let source_sink: Sink = Arc::new(move |msgs| {
699            enum Act {
700                Emit(HandleId),
701                Error(HandleId),
702                Resubscribe,
703                Complete,
704            }
705            let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
706            {
707                let mut s = state.lock();
708                if s.terminated {
709                    return;
710                }
711                for m in msgs {
712                    if s.terminated {
713                        break;
714                    }
715                    match m.tier() {
716                        3 => {
717                            if let Some(h) = m.payload_handle() {
718                                bb.retain_handle(h);
719                                actions.push(Act::Emit(h));
720                            }
721                        }
722                        5 => {
723                            if let Some(h) = m.payload_handle() {
724                                // ERROR — terminate immediately.
725                                s.terminated = true;
726                                bb.retain_handle(h);
727                                actions.push(Act::Error(h));
728                            } else {
729                                // COMPLETE — resubscribe if remaining > 0.
730                                if s.remaining > 0 {
731                                    s.remaining -= 1;
732                                    actions.push(Act::Resubscribe);
733                                } else {
734                                    s.terminated = true;
735                                    actions.push(Act::Complete);
736                                }
737                            }
738                        }
739                        _ => {}
740                    }
741                }
742            }
743            for a in actions {
744                match a {
745                    Act::Emit(h) => core_sink.emit_or_defer(pid, h),
746                    Act::Error(h) => core_sink.error_or_defer(pid, h),
747                    Act::Complete => core_sink.complete_or_defer(pid),
748                    Act::Resubscribe => {
749                        // Get our own sink from the shared slot.
750                        let maybe_sink = sink_slot_inner.lock().clone();
751                        if let Some(new_sink) = maybe_sink {
752                            if let Ok(sub) = core_sink.try_subscribe(source, new_sink) {
753                                storage_inner.lock().entry(pid).or_default().subs.push(sub);
754                            } else {
755                                // Source is dead or partition violation —
756                                // treat as terminal complete.
757                                let mut s = state.lock();
758                                if !s.terminated {
759                                    s.terminated = true;
760                                    drop(s);
761                                    core_sink.complete_or_defer(pid);
762                                }
763                            }
764                        }
765                    }
766                }
767            }
768        });
769
770        // Store sink in the shared slot so resubscribe can access it.
771        *sink_slot.lock() = Some(source_sink.clone());
772
773        let outcome = ctx.subscribe_to(source, source_sink);
774        if matches!(outcome, SubscribeOutcome::Dead { .. }) {
775            // Dead source (non-resubscribable + terminated) will stay
776            // dead — resubscribing won't help. Complete immediately.
777            core_s.complete_or_defer(pid);
778        }
779    });
780
781    let fn_id = binding.register_producer_build(build);
782    core.register_producer(fn_id)
783        .expect("repeat: register_producer failed")
784}