graphrefly_operators/control.rs
1//! Control operators — side-effect, gating, error recovery, convergence.
2//!
3//! # Operators
4//!
5//! - [`tap`] / [`tap_observer`] — side-effect passthrough.
6//! - [`on_first_data`] — one-shot side-effect on first DATA.
7//! - [`rescue`] — error recovery via user callback.
8//! - [`valve`] — boolean-gated passthrough with optional cancellation.
9//! - [`settle`] — wave-count convergence detector.
10//! - [`repeat`] — sequential resubscribe loop.
11
12#![allow(clippy::too_many_lines, clippy::items_after_statements)]
13
14use std::cell::RefCell;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
22
23// =========================================================================
24// tap(source, fn_id) — side-effect passthrough
25// =========================================================================
26
27/// Passthrough operator that forwards all DATA unchanged. On each DATA,
28/// calls `binding.invoke_tap_fn(fn_id, handle)` as a side-effect.
29/// Forwards COMPLETE and ERROR unchanged.
30#[must_use]
31pub fn tap(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, fn_id: FnId) -> NodeId {
32 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
33 let core_s = ctx.core();
34 let binding_s = ctx.core().binding();
35 let em = ctx.emitter();
36 let pid = ctx.node_id();
37 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
38 let core_sink = em.clone();
39
40 let source_sink: Sink = Rc::new(move |msgs| {
41 enum Act {
42 EmitAndTap(HandleId),
43 Complete,
44 Error(HandleId),
45 }
46 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
47 for m in msgs {
48 match m.tier() {
49 3 => {
50 if let Some(h) = m.payload_handle() {
51 bb.retain_handle(h);
52 actions.push(Act::EmitAndTap(h));
53 }
54 }
55 5 => {
56 if let Some(h) = m.payload_handle() {
57 bb.retain_handle(h);
58 actions.push(Act::Error(h));
59 } else {
60 actions.push(Act::Complete);
61 }
62 }
63 _ => {}
64 }
65 }
66 for a in actions {
67 match a {
68 Act::EmitAndTap(h) => {
69 bb.invoke_tap_fn(fn_id, h);
70 core_sink.emit(pid, h);
71 }
72 Act::Complete => core_sink.complete(pid),
73 Act::Error(h) => core_sink.error(pid, h),
74 }
75 }
76 });
77
78 let outcome = ctx.subscribe_to(source, source_sink);
79 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
80 core_s.complete(pid);
81 }
82 });
83
84 let fn_id_reg = binding.register_producer_build(build);
85 core.register_producer(fn_id_reg)
86 .expect("tap: register_producer failed")
87}
88
89// =========================================================================
90// tap_observer(source, data_fn, error_fn, complete_fn)
91// =========================================================================
92
93/// Like [`tap`] but with lifecycle observer callbacks. Each callback is
94/// optional (`None` = skip that lifecycle event).
95///
96/// - `data_fn_id`: called on each DATA via `invoke_tap_fn`.
97/// - `error_fn_id`: called on ERROR via `invoke_tap_error_fn`.
98/// - `complete_fn_id`: called on COMPLETE via `invoke_tap_complete_fn`.
99///
100/// All messages are forwarded unchanged regardless of callback presence.
101#[must_use]
102pub fn tap_observer(
103 core: &Core,
104 binding: &Arc<dyn ProducerBinding>,
105 source: NodeId,
106 data_fn_id: Option<FnId>,
107 error_fn_id: Option<FnId>,
108 complete_fn_id: Option<FnId>,
109) -> NodeId {
110 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
111 let core_s = ctx.core();
112 let binding_s = ctx.core().binding();
113 let em = ctx.emitter();
114 let pid = ctx.node_id();
115 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
116 let core_sink = em.clone();
117
118 let source_sink: Sink = Rc::new(move |msgs| {
119 enum Act {
120 Emit(HandleId),
121 Complete,
122 Error(HandleId),
123 }
124 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
125 for m in msgs {
126 match m.tier() {
127 3 => {
128 if let Some(h) = m.payload_handle() {
129 bb.retain_handle(h);
130 actions.push(Act::Emit(h));
131 }
132 }
133 5 => {
134 if let Some(h) = m.payload_handle() {
135 bb.retain_handle(h);
136 actions.push(Act::Error(h));
137 } else {
138 actions.push(Act::Complete);
139 }
140 }
141 _ => {}
142 }
143 }
144 for a in actions {
145 match a {
146 Act::Emit(h) => {
147 if let Some(fid) = data_fn_id {
148 bb.invoke_tap_fn(fid, h);
149 }
150 core_sink.emit(pid, h);
151 }
152 Act::Complete => {
153 if let Some(fid) = complete_fn_id {
154 bb.invoke_tap_complete_fn(fid);
155 }
156 core_sink.complete(pid);
157 }
158 Act::Error(h) => {
159 if let Some(fid) = error_fn_id {
160 bb.invoke_tap_error_fn(fid, h);
161 }
162 core_sink.error(pid, h);
163 }
164 }
165 }
166 });
167
168 let outcome = ctx.subscribe_to(source, source_sink);
169 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
170 if let Some(fid) = complete_fn_id {
171 binding_s.invoke_tap_complete_fn(fid);
172 }
173 core_s.complete(pid);
174 }
175 });
176
177 let fn_id = binding.register_producer_build(build);
178 core.register_producer(fn_id)
179 .expect("tap_observer: register_producer failed")
180}
181
182// =========================================================================
183// on_first_data(source, fn_id) — one-shot tap
184// =========================================================================
185
186/// One-shot side-effect tap. Calls `invoke_tap_fn(fn_id, handle)` on
187/// the first DATA only, then becomes a pure passthrough. All messages
188/// are forwarded unchanged.
189#[must_use]
190pub fn on_first_data(
191 core: &Core,
192 binding: &Arc<dyn ProducerBinding>,
193 source: NodeId,
194 fn_id: FnId,
195) -> NodeId {
196 struct OnFirstState {
197 fired: bool,
198 }
199
200 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
201 let core_s = ctx.core();
202 let binding_s = ctx.core().binding();
203 let em = ctx.emitter();
204 let pid = ctx.node_id();
205 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
206 let core_sink = em.clone();
207 let state: Rc<RefCell<OnFirstState>> = Rc::new(RefCell::new(OnFirstState { fired: false }));
208
209 let source_sink: Sink = Rc::new(move |msgs| {
210 enum Act {
211 EmitWithTap(HandleId),
212 Emit(HandleId),
213 Complete,
214 Error(HandleId),
215 }
216 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
217 {
218 let mut s = state.borrow_mut();
219 for m in msgs {
220 match m.tier() {
221 3 => {
222 if let Some(h) = m.payload_handle() {
223 bb.retain_handle(h);
224 if s.fired {
225 actions.push(Act::Emit(h));
226 } else {
227 s.fired = true;
228 actions.push(Act::EmitWithTap(h));
229 }
230 }
231 }
232 5 => {
233 if let Some(h) = m.payload_handle() {
234 bb.retain_handle(h);
235 actions.push(Act::Error(h));
236 } else {
237 actions.push(Act::Complete);
238 }
239 }
240 _ => {}
241 }
242 }
243 }
244 for a in actions {
245 match a {
246 Act::EmitWithTap(h) => {
247 bb.invoke_tap_fn(fn_id, h);
248 core_sink.emit(pid, h);
249 }
250 Act::Emit(h) => core_sink.emit(pid, h),
251 Act::Complete => core_sink.complete(pid),
252 Act::Error(h) => core_sink.error(pid, h),
253 }
254 }
255 });
256
257 let outcome = ctx.subscribe_to(source, source_sink);
258 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
259 core_s.complete(pid);
260 }
261 });
262
263 let fn_id_reg = binding.register_producer_build(build);
264 core.register_producer(fn_id_reg)
265 .expect("on_first_data: register_producer failed")
266}
267
268// =========================================================================
269// rescue(source, fn_id) — error recovery
270// =========================================================================
271
272/// Error recovery operator. On ERROR, calls
273/// `binding.invoke_rescue_fn(fn_id, error_handle)`:
274///
275/// - `Ok(recovered_handle)` — emit DATA with the recovered value.
276/// - `Err(())` — forward the original ERROR unchanged.
277///
278/// DATA and COMPLETE pass through unchanged.
279#[must_use]
280pub fn rescue(
281 core: &Core,
282 binding: &Arc<dyn ProducerBinding>,
283 source: NodeId,
284 fn_id: FnId,
285) -> NodeId {
286 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
287 let core_s = ctx.core();
288 let binding_s = ctx.core().binding();
289 let em = ctx.emitter();
290 let pid = ctx.node_id();
291 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
292 let core_sink = em.clone();
293
294 let source_sink: Sink = Rc::new(move |msgs| {
295 enum Act {
296 Emit(HandleId),
297 Complete,
298 TryRescue(HandleId),
299 }
300 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
301 for m in msgs {
302 match m.tier() {
303 3 => {
304 if let Some(h) = m.payload_handle() {
305 bb.retain_handle(h);
306 actions.push(Act::Emit(h));
307 }
308 }
309 5 => {
310 if let Some(h) = m.payload_handle() {
311 bb.retain_handle(h);
312 actions.push(Act::TryRescue(h));
313 } else {
314 actions.push(Act::Complete);
315 }
316 }
317 _ => {}
318 }
319 }
320 for a in actions {
321 match a {
322 Act::Emit(h) => core_sink.emit(pid, h),
323 Act::Complete => core_sink.complete(pid),
324 Act::TryRescue(err_h) => {
325 match bb.invoke_rescue_fn(fn_id, err_h) {
326 Ok(recovered_h) => {
327 // Recovery succeeded — release original error,
328 // emit recovered value as DATA.
329 bb.release_handle(err_h);
330 core_sink.emit(pid, recovered_h);
331 }
332 Err(()) => {
333 // Recovery failed — forward original ERROR.
334 core_sink.error(pid, err_h);
335 }
336 }
337 }
338 }
339 }
340 });
341
342 let outcome = ctx.subscribe_to(source, source_sink);
343 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
344 core_s.complete(pid);
345 }
346 });
347
348 let fn_id_reg = binding.register_producer_build(build);
349 core.register_producer(fn_id_reg)
350 .expect("rescue: register_producer failed")
351}
352
353// =========================================================================
354// valve(source, control, gate_fn_id, cancel) — boolean gate
355// =========================================================================
356
357/// Boolean-gated passthrough. Subscribes to both `source` and `control`.
358///
359/// - **Control DATA**: evaluates `binding.predicate_each(gate_fn_id,
360/// &[handle])` to determine gate state. `true` = open, `false` = closed.
361/// On transition from open to closed, if `cancel` is `Some`, invokes
362/// `cancel.cancel()`.
363/// - **Source DATA**: if gate is open, retains + emits. If closed, drops
364/// the handle (source DATA while gate is closed is silently discarded).
365/// - **Source COMPLETE/ERROR**: forwarded unchanged.
366/// - **Control COMPLETE**: does NOT auto-complete the valve (per TS
367/// `completeWhenDepsComplete: false` equivalent).
368/// - **Control ERROR**: terminates the valve with ERROR.
369#[must_use]
370pub fn valve(
371 core: &Core,
372 binding: &Arc<dyn ProducerBinding>,
373 source: NodeId,
374 control: NodeId,
375 gate_fn_id: FnId,
376 cancel: Option<tokio_util::sync::CancellationToken>,
377) -> NodeId {
378 struct ValveState {
379 open: bool,
380 terminated: bool,
381 }
382
383 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
384 let core_s = ctx.core();
385 let binding_s = ctx.core().binding();
386 let em = ctx.emitter();
387 let pid = ctx.node_id();
388 let state: Rc<RefCell<ValveState>> = Rc::new(RefCell::new(ValveState {
389 open: false,
390 terminated: false,
391 }));
392
393 // --- control sink ---
394 let st_ctrl = state.clone();
395 let bb_ctrl: Arc<dyn BindingBoundary> = binding_s.clone();
396 let core_ctrl = em.clone();
397 let cancel_ctrl = cancel.clone();
398 let control_sink: Sink = Rc::new(move |msgs| {
399 let mut should_cancel = false;
400 let mut error_action: Option<HandleId> = None;
401 {
402 let mut s = st_ctrl.borrow_mut();
403 if s.terminated {
404 return;
405 }
406 for m in msgs {
407 match m.tier() {
408 3 => {
409 if let Some(h) = m.payload_handle() {
410 let results = bb_ctrl.predicate_each(gate_fn_id, &[h]);
411 let new_open = results.first().copied().unwrap_or(false);
412 let was_open = s.open;
413 s.open = new_open;
414 // Transition open -> closed: trigger cancel.
415 if was_open && !new_open && cancel_ctrl.is_some() {
416 should_cancel = true;
417 }
418 }
419 }
420 5 => {
421 if let Some(h) = m.payload_handle() {
422 // Control ERROR terminates valve.
423 if !s.terminated {
424 s.terminated = true;
425 bb_ctrl.retain_handle(h);
426 error_action = Some(h);
427 }
428 }
429 // Control COMPLETE: do NOT auto-complete.
430 }
431 _ => {}
432 }
433 }
434 }
435 if should_cancel {
436 if let Some(ref ct) = cancel_ctrl {
437 ct.cancel();
438 }
439 }
440 if let Some(h) = error_action {
441 core_ctrl.error(pid, h);
442 }
443 });
444
445 let ctrl_outcome = ctx.subscribe_to(control, control_sink);
446 if matches!(ctrl_outcome, SubscribeOutcome::Dead { .. }) {
447 // Control is dead — gate state remains as-is (closed by default).
448 // No auto-complete; source can still flow if gate was already opened.
449 }
450
451 // --- source sink ---
452 let st_src = state.clone();
453 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
454 let core_src = em.clone();
455 let source_sink: Sink = Rc::new(move |msgs| {
456 enum Act {
457 Emit(HandleId),
458 Complete,
459 Error(HandleId),
460 }
461 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
462 {
463 let s = st_src.borrow();
464 if s.terminated {
465 return;
466 }
467 for m in msgs {
468 match m.tier() {
469 3 => {
470 if let Some(h) = m.payload_handle() {
471 if s.open {
472 bb_src.retain_handle(h);
473 actions.push(Act::Emit(h));
474 }
475 // Closed gate: silently discard.
476 }
477 }
478 5 => {
479 if let Some(h) = m.payload_handle() {
480 bb_src.retain_handle(h);
481 actions.push(Act::Error(h));
482 } else {
483 actions.push(Act::Complete);
484 }
485 }
486 _ => {}
487 }
488 }
489 }
490 for a in actions {
491 match a {
492 Act::Emit(h) => core_src.emit(pid, h),
493 Act::Complete => core_src.complete(pid),
494 Act::Error(h) => core_src.error(pid, h),
495 }
496 }
497 });
498
499 let src_outcome = ctx.subscribe_to(source, source_sink);
500 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
501 let mut s = state.borrow_mut();
502 if !s.terminated {
503 s.terminated = true;
504 drop(s);
505 core_s.complete(pid);
506 }
507 }
508 });
509
510 let fn_id = binding.register_producer_build(build);
511 core.register_producer(fn_id)
512 .expect("valve: register_producer failed")
513}
514
515// =========================================================================
516// settle(source, quiet_waves, max_waves) — convergence detector
517// =========================================================================
518
519/// Wave-count convergence detector.
520///
521/// - On DATA: resets `quiet_count` to 0, increments `wave_count`. Forwards
522/// DATA unchanged. If `max_waves` is set and `wave_count >= max_waves`,
523/// completes.
524/// - On RESOLVED (tier 3, no payload): increments `quiet_count`. If
525/// `quiet_count >= quiet_waves`, completes.
526/// - On COMPLETE/ERROR: forwarded unchanged.
527#[must_use]
528pub fn settle(
529 core: &Core,
530 binding: &Arc<dyn ProducerBinding>,
531 source: NodeId,
532 quiet_waves: u32,
533 max_waves: Option<u32>,
534) -> NodeId {
535 struct SettleState {
536 wave_count: u32,
537 quiet_count: u32,
538 completed: bool,
539 }
540
541 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
542 let core_s = ctx.core();
543 let binding_s = ctx.core().binding();
544 let em = ctx.emitter();
545 let pid = ctx.node_id();
546 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
547 let core_sink = em.clone();
548 let state: Rc<RefCell<SettleState>> = Rc::new(RefCell::new(SettleState {
549 wave_count: 0,
550 quiet_count: 0,
551 completed: false,
552 }));
553
554 let source_sink: Sink = Rc::new(move |msgs| {
555 enum Act {
556 Emit(HandleId),
557 Complete,
558 Error(HandleId),
559 SelfComplete,
560 }
561 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
562 {
563 let mut s = state.borrow_mut();
564 if s.completed {
565 return;
566 }
567 for m in msgs {
568 if s.completed {
569 break;
570 }
571 match m.tier() {
572 3 => {
573 if let Some(h) = m.payload_handle() {
574 // DATA: reset quiet, increment waves.
575 s.quiet_count = 0;
576 s.wave_count += 1;
577 bb.retain_handle(h);
578 actions.push(Act::Emit(h));
579 // Check max_waves.
580 if let Some(max) = max_waves {
581 if s.wave_count >= max {
582 s.completed = true;
583 actions.push(Act::SelfComplete);
584 }
585 }
586 } else {
587 // RESOLVED (tier 3, no payload): quiet wave.
588 s.quiet_count += 1;
589 if s.quiet_count >= quiet_waves {
590 s.completed = true;
591 actions.push(Act::SelfComplete);
592 }
593 }
594 }
595 5 => {
596 if let Some(h) = m.payload_handle() {
597 s.completed = true;
598 bb.retain_handle(h);
599 actions.push(Act::Error(h));
600 } else {
601 s.completed = true;
602 actions.push(Act::Complete);
603 }
604 }
605 _ => {}
606 }
607 }
608 }
609 for a in actions {
610 match a {
611 Act::Emit(h) => core_sink.emit(pid, h),
612 Act::Complete | Act::SelfComplete => core_sink.complete(pid),
613 Act::Error(h) => core_sink.error(pid, h),
614 }
615 }
616 });
617
618 let outcome = ctx.subscribe_to(source, source_sink);
619 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
620 core_s.complete(pid);
621 }
622 });
623
624 let fn_id = binding.register_producer_build(build);
625 core.register_producer(fn_id)
626 .expect("settle: register_producer failed")
627}
628
629// =========================================================================
630// repeat(source, count) — sequential resubscribe loop
631// =========================================================================
632
633/// Sequential resubscribe loop. Forwards all DATA from `source`. On
634/// source COMPLETE, if `remaining > 0`, resubscribes to `source` and
635/// decrements `remaining`. On ERROR, terminates immediately (no retry).
636///
637/// `count` is the number of ADDITIONAL subscriptions after the initial
638/// one. `count = 0` is identity passthrough. `count = 2` means the
639/// source will be subscribed up to 3 times total.
640#[must_use]
641pub fn repeat(
642 core: &Core,
643 binding: &Arc<dyn ProducerBinding>,
644 source: NodeId,
645 count: u32,
646) -> NodeId {
647 struct RepeatState {
648 remaining: u32,
649 terminated: bool,
650 }
651
652 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
653 let core_s = ctx.core();
654 let binding_s = ctx.core().binding();
655 let em = ctx.emitter();
656 let pid = ctx.node_id();
657 let state: Rc<RefCell<RepeatState>> = Rc::new(RefCell::new(RepeatState {
658 remaining: count,
659 terminated: false,
660 }));
661
662 // We need to store the subscription in producer storage and be able
663 // to replace it on resubscribe. S2b/D231: storage via the new
664 // `ProducerCtx::storage()` accessor (the build closure no longer
665 // holds a `ProducerBinding`). Resubscribe re-enters Core to
666 // `try_subscribe`, which a long-lived sink can only do via
667 // `em.defer` (D234) — see the `Act::Resubscribe` arm.
668 let storage = ctx.storage();
669
670 // Build the sink closure. It needs to reference itself for
671 // resubscription, so we use a shared slot. Single-owner since
672 // D248/D272 (`Sink = Rc<dyn Fn>`); the slot is owner-thread-only.
673 let sink_slot: Rc<RefCell<Option<Sink>>> = Rc::new(RefCell::new(None));
674 let sink_slot_inner = sink_slot.clone();
675 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
676 let core_sink = em.clone();
677 let storage_inner = storage.clone();
678
679 let source_sink: Sink = Rc::new(move |msgs| {
680 enum Act {
681 Emit(HandleId),
682 Error(HandleId),
683 Resubscribe,
684 Complete,
685 }
686 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
687 {
688 let mut s = state.borrow_mut();
689 if s.terminated {
690 return;
691 }
692 for m in msgs {
693 if s.terminated {
694 break;
695 }
696 match m.tier() {
697 3 => {
698 if let Some(h) = m.payload_handle() {
699 bb.retain_handle(h);
700 actions.push(Act::Emit(h));
701 }
702 }
703 5 => {
704 if let Some(h) = m.payload_handle() {
705 // ERROR — terminate immediately.
706 s.terminated = true;
707 bb.retain_handle(h);
708 actions.push(Act::Error(h));
709 } else {
710 // COMPLETE — resubscribe if remaining > 0.
711 if s.remaining > 0 {
712 s.remaining -= 1;
713 actions.push(Act::Resubscribe);
714 } else {
715 s.terminated = true;
716 actions.push(Act::Complete);
717 }
718 }
719 }
720 _ => {}
721 }
722 }
723 }
724 for a in actions {
725 match a {
726 Act::Emit(h) => core_sink.emit(pid, h),
727 Act::Error(h) => core_sink.error(pid, h),
728 Act::Complete => core_sink.complete(pid),
729 Act::Resubscribe => {
730 // Get our own sink from the shared slot.
731 let maybe_sink = sink_slot_inner.borrow_mut().clone();
732 if let Some(new_sink) = maybe_sink {
733 // D234: a long-lived sink can't hold `&Core`;
734 // route the re-subscribe through `em.defer`
735 // (owner-side, in-wave, FIFO-ordered). The
736 // returned `SubscriptionId` is recorded
737 // (D229 `(source, sub)` pair) inside the
738 // closure; the dead/violation path completes
739 // self there too.
740 let storage_d = storage_inner.clone();
741 let state_d = state.clone();
742 let _ = core_sink.defer(move |c| {
743 if let Ok(sub) = c.try_subscribe(source, new_sink) {
744 storage_d
745 .lock()
746 .entry(pid)
747 .or_default()
748 .subs
749 .push((source, sub));
750 } else {
751 // Source dead / partition
752 // violation — terminal complete.
753 let mut s = state_d.borrow_mut();
754 if !s.terminated {
755 s.terminated = true;
756 drop(s);
757 c.complete(pid);
758 }
759 }
760 });
761 }
762 }
763 }
764 }
765 });
766
767 // Store sink in the shared slot so resubscribe can access it.
768 *sink_slot.borrow_mut() = Some(source_sink.clone());
769
770 let outcome = ctx.subscribe_to(source, source_sink);
771 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
772 // Dead source (non-resubscribable + terminated) will stay
773 // dead — resubscribing won't help. Complete immediately.
774 core_s.complete(pid);
775 }
776 });
777
778 let fn_id = binding.register_producer_build(build);
779 core.register_producer(fn_id)
780 .expect("repeat: register_producer failed")
781}