graphrefly_operators/control.rs
1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Control operators — side-effect, gating, error recovery, convergence.
11//!
12//! # Operators
13//!
14//! - [`tap`] / [`tap_observer`] — side-effect passthrough.
15//! - [`on_first_data`] — one-shot side-effect on first DATA.
16//! - [`rescue`] — error recovery via user callback.
17//! - [`valve`] — boolean-gated passthrough with optional cancellation.
18//! - [`settle`] — wave-count convergence detector.
19//! - [`repeat`] — sequential resubscribe loop.
20
21#![allow(clippy::too_many_lines, clippy::items_after_statements)]
22
23use std::sync::Arc;
24
25use parking_lot::Mutex;
26
27use graphrefly_core::{BindingBoundary, Core, FnId, HandleId, NodeId, Sink};
28use smallvec::SmallVec;
29
30use crate::producer::{ProducerBinding, ProducerBuildFn, ProducerCtx, SubscribeOutcome};
31
32// =========================================================================
33// tap(source, fn_id) — side-effect passthrough
34// =========================================================================
35
36/// Passthrough operator that forwards all DATA unchanged. On each DATA,
37/// calls `binding.invoke_tap_fn(fn_id, handle)` as a side-effect.
38/// Forwards COMPLETE and ERROR unchanged.
39#[must_use]
40pub fn tap(core: &Core, binding: &Arc<dyn ProducerBinding>, source: NodeId, fn_id: FnId) -> NodeId {
41 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
42 let core_s = ctx.core();
43 let binding_s = ctx.core().binding();
44 let em = ctx.emitter();
45 let pid = ctx.node_id();
46 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
47 let core_sink = em.clone();
48
49 let source_sink: Sink = Arc::new(move |msgs| {
50 enum Act {
51 EmitAndTap(HandleId),
52 Complete,
53 Error(HandleId),
54 }
55 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
56 for m in msgs {
57 match m.tier() {
58 3 => {
59 if let Some(h) = m.payload_handle() {
60 bb.retain_handle(h);
61 actions.push(Act::EmitAndTap(h));
62 }
63 }
64 5 => {
65 if let Some(h) = m.payload_handle() {
66 bb.retain_handle(h);
67 actions.push(Act::Error(h));
68 } else {
69 actions.push(Act::Complete);
70 }
71 }
72 _ => {}
73 }
74 }
75 for a in actions {
76 match a {
77 Act::EmitAndTap(h) => {
78 bb.invoke_tap_fn(fn_id, h);
79 core_sink.emit_or_defer(pid, h);
80 }
81 Act::Complete => core_sink.complete_or_defer(pid),
82 Act::Error(h) => core_sink.error_or_defer(pid, h),
83 }
84 }
85 });
86
87 let outcome = ctx.subscribe_to(source, source_sink);
88 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
89 core_s.complete_or_defer(pid);
90 }
91 });
92
93 let fn_id_reg = binding.register_producer_build(build);
94 core.register_producer(fn_id_reg)
95 .expect("tap: register_producer failed")
96}
97
98// =========================================================================
99// tap_observer(source, data_fn, error_fn, complete_fn)
100// =========================================================================
101
102/// Like [`tap`] but with lifecycle observer callbacks. Each callback is
103/// optional (`None` = skip that lifecycle event).
104///
105/// - `data_fn_id`: called on each DATA via `invoke_tap_fn`.
106/// - `error_fn_id`: called on ERROR via `invoke_tap_error_fn`.
107/// - `complete_fn_id`: called on COMPLETE via `invoke_tap_complete_fn`.
108///
109/// All messages are forwarded unchanged regardless of callback presence.
110#[must_use]
111pub fn tap_observer(
112 core: &Core,
113 binding: &Arc<dyn ProducerBinding>,
114 source: NodeId,
115 data_fn_id: Option<FnId>,
116 error_fn_id: Option<FnId>,
117 complete_fn_id: Option<FnId>,
118) -> NodeId {
119 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
120 let core_s = ctx.core();
121 let binding_s = ctx.core().binding();
122 let em = ctx.emitter();
123 let pid = ctx.node_id();
124 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
125 let core_sink = em.clone();
126
127 let source_sink: Sink = Arc::new(move |msgs| {
128 enum Act {
129 Emit(HandleId),
130 Complete,
131 Error(HandleId),
132 }
133 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
134 for m in msgs {
135 match m.tier() {
136 3 => {
137 if let Some(h) = m.payload_handle() {
138 bb.retain_handle(h);
139 actions.push(Act::Emit(h));
140 }
141 }
142 5 => {
143 if let Some(h) = m.payload_handle() {
144 bb.retain_handle(h);
145 actions.push(Act::Error(h));
146 } else {
147 actions.push(Act::Complete);
148 }
149 }
150 _ => {}
151 }
152 }
153 for a in actions {
154 match a {
155 Act::Emit(h) => {
156 if let Some(fid) = data_fn_id {
157 bb.invoke_tap_fn(fid, h);
158 }
159 core_sink.emit_or_defer(pid, h);
160 }
161 Act::Complete => {
162 if let Some(fid) = complete_fn_id {
163 bb.invoke_tap_complete_fn(fid);
164 }
165 core_sink.complete_or_defer(pid);
166 }
167 Act::Error(h) => {
168 if let Some(fid) = error_fn_id {
169 bb.invoke_tap_error_fn(fid, h);
170 }
171 core_sink.error_or_defer(pid, h);
172 }
173 }
174 }
175 });
176
177 let outcome = ctx.subscribe_to(source, source_sink);
178 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
179 if let Some(fid) = complete_fn_id {
180 binding_s.invoke_tap_complete_fn(fid);
181 }
182 core_s.complete_or_defer(pid);
183 }
184 });
185
186 let fn_id = binding.register_producer_build(build);
187 core.register_producer(fn_id)
188 .expect("tap_observer: register_producer failed")
189}
190
191// =========================================================================
192// on_first_data(source, fn_id) — one-shot tap
193// =========================================================================
194
195/// One-shot side-effect tap. Calls `invoke_tap_fn(fn_id, handle)` on
196/// the first DATA only, then becomes a pure passthrough. All messages
197/// are forwarded unchanged.
198#[must_use]
199pub fn on_first_data(
200 core: &Core,
201 binding: &Arc<dyn ProducerBinding>,
202 source: NodeId,
203 fn_id: FnId,
204) -> NodeId {
205 struct OnFirstState {
206 fired: bool,
207 }
208
209 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
210 let core_s = ctx.core();
211 let binding_s = ctx.core().binding();
212 let em = ctx.emitter();
213 let pid = ctx.node_id();
214 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
215 let core_sink = em.clone();
216 let state: Arc<Mutex<OnFirstState>> = Arc::new(Mutex::new(OnFirstState { fired: false }));
217
218 let source_sink: Sink = Arc::new(move |msgs| {
219 enum Act {
220 EmitWithTap(HandleId),
221 Emit(HandleId),
222 Complete,
223 Error(HandleId),
224 }
225 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
226 {
227 let mut s = state.lock();
228 for m in msgs {
229 match m.tier() {
230 3 => {
231 if let Some(h) = m.payload_handle() {
232 bb.retain_handle(h);
233 if s.fired {
234 actions.push(Act::Emit(h));
235 } else {
236 s.fired = true;
237 actions.push(Act::EmitWithTap(h));
238 }
239 }
240 }
241 5 => {
242 if let Some(h) = m.payload_handle() {
243 bb.retain_handle(h);
244 actions.push(Act::Error(h));
245 } else {
246 actions.push(Act::Complete);
247 }
248 }
249 _ => {}
250 }
251 }
252 }
253 for a in actions {
254 match a {
255 Act::EmitWithTap(h) => {
256 bb.invoke_tap_fn(fn_id, h);
257 core_sink.emit_or_defer(pid, h);
258 }
259 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
260 Act::Complete => core_sink.complete_or_defer(pid),
261 Act::Error(h) => core_sink.error_or_defer(pid, h),
262 }
263 }
264 });
265
266 let outcome = ctx.subscribe_to(source, source_sink);
267 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
268 core_s.complete_or_defer(pid);
269 }
270 });
271
272 let fn_id_reg = binding.register_producer_build(build);
273 core.register_producer(fn_id_reg)
274 .expect("on_first_data: register_producer failed")
275}
276
277// =========================================================================
278// rescue(source, fn_id) — error recovery
279// =========================================================================
280
281/// Error recovery operator. On ERROR, calls
282/// `binding.invoke_rescue_fn(fn_id, error_handle)`:
283///
284/// - `Ok(recovered_handle)` — emit DATA with the recovered value.
285/// - `Err(())` — forward the original ERROR unchanged.
286///
287/// DATA and COMPLETE pass through unchanged.
288#[must_use]
289pub fn rescue(
290 core: &Core,
291 binding: &Arc<dyn ProducerBinding>,
292 source: NodeId,
293 fn_id: FnId,
294) -> NodeId {
295 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
296 let core_s = ctx.core();
297 let binding_s = ctx.core().binding();
298 let em = ctx.emitter();
299 let pid = ctx.node_id();
300 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
301 let core_sink = em.clone();
302
303 let source_sink: Sink = Arc::new(move |msgs| {
304 enum Act {
305 Emit(HandleId),
306 Complete,
307 TryRescue(HandleId),
308 }
309 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
310 for m in msgs {
311 match m.tier() {
312 3 => {
313 if let Some(h) = m.payload_handle() {
314 bb.retain_handle(h);
315 actions.push(Act::Emit(h));
316 }
317 }
318 5 => {
319 if let Some(h) = m.payload_handle() {
320 bb.retain_handle(h);
321 actions.push(Act::TryRescue(h));
322 } else {
323 actions.push(Act::Complete);
324 }
325 }
326 _ => {}
327 }
328 }
329 for a in actions {
330 match a {
331 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
332 Act::Complete => core_sink.complete_or_defer(pid),
333 Act::TryRescue(err_h) => {
334 match bb.invoke_rescue_fn(fn_id, err_h) {
335 Ok(recovered_h) => {
336 // Recovery succeeded — release original error,
337 // emit recovered value as DATA.
338 bb.release_handle(err_h);
339 core_sink.emit_or_defer(pid, recovered_h);
340 }
341 Err(()) => {
342 // Recovery failed — forward original ERROR.
343 core_sink.error_or_defer(pid, err_h);
344 }
345 }
346 }
347 }
348 }
349 });
350
351 let outcome = ctx.subscribe_to(source, source_sink);
352 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
353 core_s.complete_or_defer(pid);
354 }
355 });
356
357 let fn_id_reg = binding.register_producer_build(build);
358 core.register_producer(fn_id_reg)
359 .expect("rescue: register_producer failed")
360}
361
362// =========================================================================
363// valve(source, control, gate_fn_id, cancel) — boolean gate
364// =========================================================================
365
366/// Boolean-gated passthrough. Subscribes to both `source` and `control`.
367///
368/// - **Control DATA**: evaluates `binding.predicate_each(gate_fn_id,
369/// &[handle])` to determine gate state. `true` = open, `false` = closed.
370/// On transition from open to closed, if `cancel` is `Some`, invokes
371/// `cancel.cancel()`.
372/// - **Source DATA**: if gate is open, retains + emits. If closed, drops
373/// the handle (source DATA while gate is closed is silently discarded).
374/// - **Source COMPLETE/ERROR**: forwarded unchanged.
375/// - **Control COMPLETE**: does NOT auto-complete the valve (per TS
376/// `completeWhenDepsComplete: false` equivalent).
377/// - **Control ERROR**: terminates the valve with ERROR.
378#[must_use]
379pub fn valve(
380 core: &Core,
381 binding: &Arc<dyn ProducerBinding>,
382 source: NodeId,
383 control: NodeId,
384 gate_fn_id: FnId,
385 cancel: Option<tokio_util::sync::CancellationToken>,
386) -> NodeId {
387 struct ValveState {
388 open: bool,
389 terminated: bool,
390 }
391
392 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
393 let core_s = ctx.core();
394 let binding_s = ctx.core().binding();
395 let em = ctx.emitter();
396 let pid = ctx.node_id();
397 let state: Arc<Mutex<ValveState>> = Arc::new(Mutex::new(ValveState {
398 open: false,
399 terminated: false,
400 }));
401
402 // --- control sink ---
403 let st_ctrl = state.clone();
404 let bb_ctrl: Arc<dyn BindingBoundary> = binding_s.clone();
405 let core_ctrl = em.clone();
406 let cancel_ctrl = cancel.clone();
407 let control_sink: Sink = Arc::new(move |msgs| {
408 let mut should_cancel = false;
409 let mut error_action: Option<HandleId> = None;
410 {
411 let mut s = st_ctrl.lock();
412 if s.terminated {
413 return;
414 }
415 for m in msgs {
416 match m.tier() {
417 3 => {
418 if let Some(h) = m.payload_handle() {
419 let results = bb_ctrl.predicate_each(gate_fn_id, &[h]);
420 let new_open = results.first().copied().unwrap_or(false);
421 let was_open = s.open;
422 s.open = new_open;
423 // Transition open -> closed: trigger cancel.
424 if was_open && !new_open && cancel_ctrl.is_some() {
425 should_cancel = true;
426 }
427 }
428 }
429 5 => {
430 if let Some(h) = m.payload_handle() {
431 // Control ERROR terminates valve.
432 if !s.terminated {
433 s.terminated = true;
434 bb_ctrl.retain_handle(h);
435 error_action = Some(h);
436 }
437 }
438 // Control COMPLETE: do NOT auto-complete.
439 }
440 _ => {}
441 }
442 }
443 }
444 if should_cancel {
445 if let Some(ref ct) = cancel_ctrl {
446 ct.cancel();
447 }
448 }
449 if let Some(h) = error_action {
450 core_ctrl.error_or_defer(pid, h);
451 }
452 });
453
454 let ctrl_outcome = ctx.subscribe_to(control, control_sink);
455 if matches!(ctrl_outcome, SubscribeOutcome::Dead { .. }) {
456 // Control is dead — gate state remains as-is (closed by default).
457 // No auto-complete; source can still flow if gate was already opened.
458 }
459
460 // --- source sink ---
461 let st_src = state.clone();
462 let bb_src: Arc<dyn BindingBoundary> = binding_s.clone();
463 let core_src = em.clone();
464 let source_sink: Sink = Arc::new(move |msgs| {
465 enum Act {
466 Emit(HandleId),
467 Complete,
468 Error(HandleId),
469 }
470 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
471 {
472 let s = st_src.lock();
473 if s.terminated {
474 return;
475 }
476 for m in msgs {
477 match m.tier() {
478 3 => {
479 if let Some(h) = m.payload_handle() {
480 if s.open {
481 bb_src.retain_handle(h);
482 actions.push(Act::Emit(h));
483 }
484 // Closed gate: silently discard.
485 }
486 }
487 5 => {
488 if let Some(h) = m.payload_handle() {
489 bb_src.retain_handle(h);
490 actions.push(Act::Error(h));
491 } else {
492 actions.push(Act::Complete);
493 }
494 }
495 _ => {}
496 }
497 }
498 }
499 for a in actions {
500 match a {
501 Act::Emit(h) => core_src.emit_or_defer(pid, h),
502 Act::Complete => core_src.complete_or_defer(pid),
503 Act::Error(h) => core_src.error_or_defer(pid, h),
504 }
505 }
506 });
507
508 let src_outcome = ctx.subscribe_to(source, source_sink);
509 if matches!(src_outcome, SubscribeOutcome::Dead { .. }) {
510 let mut s = state.lock();
511 if !s.terminated {
512 s.terminated = true;
513 drop(s);
514 core_s.complete_or_defer(pid);
515 }
516 }
517 });
518
519 let fn_id = binding.register_producer_build(build);
520 core.register_producer(fn_id)
521 .expect("valve: register_producer failed")
522}
523
524// =========================================================================
525// settle(source, quiet_waves, max_waves) — convergence detector
526// =========================================================================
527
528/// Wave-count convergence detector.
529///
530/// - On DATA: resets `quiet_count` to 0, increments `wave_count`. Forwards
531/// DATA unchanged. If `max_waves` is set and `wave_count >= max_waves`,
532/// completes.
533/// - On RESOLVED (tier 3, no payload): increments `quiet_count`. If
534/// `quiet_count >= quiet_waves`, completes.
535/// - On COMPLETE/ERROR: forwarded unchanged.
536#[must_use]
537pub fn settle(
538 core: &Core,
539 binding: &Arc<dyn ProducerBinding>,
540 source: NodeId,
541 quiet_waves: u32,
542 max_waves: Option<u32>,
543) -> NodeId {
544 struct SettleState {
545 wave_count: u32,
546 quiet_count: u32,
547 completed: bool,
548 }
549
550 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
551 let core_s = ctx.core();
552 let binding_s = ctx.core().binding();
553 let em = ctx.emitter();
554 let pid = ctx.node_id();
555 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
556 let core_sink = em.clone();
557 let state: Arc<Mutex<SettleState>> = Arc::new(Mutex::new(SettleState {
558 wave_count: 0,
559 quiet_count: 0,
560 completed: false,
561 }));
562
563 let source_sink: Sink = Arc::new(move |msgs| {
564 enum Act {
565 Emit(HandleId),
566 Complete,
567 Error(HandleId),
568 SelfComplete,
569 }
570 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
571 {
572 let mut s = state.lock();
573 if s.completed {
574 return;
575 }
576 for m in msgs {
577 if s.completed {
578 break;
579 }
580 match m.tier() {
581 3 => {
582 if let Some(h) = m.payload_handle() {
583 // DATA: reset quiet, increment waves.
584 s.quiet_count = 0;
585 s.wave_count += 1;
586 bb.retain_handle(h);
587 actions.push(Act::Emit(h));
588 // Check max_waves.
589 if let Some(max) = max_waves {
590 if s.wave_count >= max {
591 s.completed = true;
592 actions.push(Act::SelfComplete);
593 }
594 }
595 } else {
596 // RESOLVED (tier 3, no payload): quiet wave.
597 s.quiet_count += 1;
598 if s.quiet_count >= quiet_waves {
599 s.completed = true;
600 actions.push(Act::SelfComplete);
601 }
602 }
603 }
604 5 => {
605 if let Some(h) = m.payload_handle() {
606 s.completed = true;
607 bb.retain_handle(h);
608 actions.push(Act::Error(h));
609 } else {
610 s.completed = true;
611 actions.push(Act::Complete);
612 }
613 }
614 _ => {}
615 }
616 }
617 }
618 for a in actions {
619 match a {
620 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
621 Act::Complete | Act::SelfComplete => core_sink.complete_or_defer(pid),
622 Act::Error(h) => core_sink.error_or_defer(pid, h),
623 }
624 }
625 });
626
627 let outcome = ctx.subscribe_to(source, source_sink);
628 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
629 core_s.complete_or_defer(pid);
630 }
631 });
632
633 let fn_id = binding.register_producer_build(build);
634 core.register_producer(fn_id)
635 .expect("settle: register_producer failed")
636}
637
638// =========================================================================
639// repeat(source, count) — sequential resubscribe loop
640// =========================================================================
641
642/// Sequential resubscribe loop. Forwards all DATA from `source`. On
643/// source COMPLETE, if `remaining > 0`, resubscribes to `source` and
644/// decrements `remaining`. On ERROR, terminates immediately (no retry).
645///
646/// `count` is the number of ADDITIONAL subscriptions after the initial
647/// one. `count = 0` is identity passthrough. `count = 2` means the
648/// source will be subscribed up to 3 times total.
649#[must_use]
650pub fn repeat(
651 core: &Core,
652 binding: &Arc<dyn ProducerBinding>,
653 source: NodeId,
654 count: u32,
655) -> NodeId {
656 struct RepeatState {
657 remaining: u32,
658 terminated: bool,
659 }
660
661 let build: ProducerBuildFn = Box::new(move |ctx: ProducerCtx<'_>| {
662 let core_s = ctx.core();
663 let binding_s = ctx.core().binding();
664 let em = ctx.emitter();
665 let pid = ctx.node_id();
666 let state: Arc<Mutex<RepeatState>> = Arc::new(Mutex::new(RepeatState {
667 remaining: count,
668 terminated: false,
669 }));
670
671 // We need to store the subscription in producer storage and be able
672 // to replace it on resubscribe. S2b/D231: storage via the new
673 // `ProducerCtx::storage()` accessor (the build closure no longer
674 // holds a `ProducerBinding`). Resubscribe re-enters Core to
675 // `try_subscribe`, which a long-lived sink can only do via
676 // `em.defer` (D234) — see the `Act::Resubscribe` arm.
677 let storage = ctx.storage();
678
679 // Build the sink closure. It needs to reference itself for
680 // resubscription, so we use a shared slot.
681 let sink_slot: Arc<Mutex<Option<Sink>>> = Arc::new(Mutex::new(None));
682 let sink_slot_inner = sink_slot.clone();
683 let bb: Arc<dyn BindingBoundary> = binding_s.clone();
684 let core_sink = em.clone();
685 let storage_inner = storage.clone();
686
687 let source_sink: Sink = Arc::new(move |msgs| {
688 enum Act {
689 Emit(HandleId),
690 Error(HandleId),
691 Resubscribe,
692 Complete,
693 }
694 let mut actions: SmallVec<[Act; 4]> = SmallVec::new();
695 {
696 let mut s = state.lock();
697 if s.terminated {
698 return;
699 }
700 for m in msgs {
701 if s.terminated {
702 break;
703 }
704 match m.tier() {
705 3 => {
706 if let Some(h) = m.payload_handle() {
707 bb.retain_handle(h);
708 actions.push(Act::Emit(h));
709 }
710 }
711 5 => {
712 if let Some(h) = m.payload_handle() {
713 // ERROR — terminate immediately.
714 s.terminated = true;
715 bb.retain_handle(h);
716 actions.push(Act::Error(h));
717 } else {
718 // COMPLETE — resubscribe if remaining > 0.
719 if s.remaining > 0 {
720 s.remaining -= 1;
721 actions.push(Act::Resubscribe);
722 } else {
723 s.terminated = true;
724 actions.push(Act::Complete);
725 }
726 }
727 }
728 _ => {}
729 }
730 }
731 }
732 for a in actions {
733 match a {
734 Act::Emit(h) => core_sink.emit_or_defer(pid, h),
735 Act::Error(h) => core_sink.error_or_defer(pid, h),
736 Act::Complete => core_sink.complete_or_defer(pid),
737 Act::Resubscribe => {
738 // Get our own sink from the shared slot.
739 let maybe_sink = sink_slot_inner.lock().clone();
740 if let Some(new_sink) = maybe_sink {
741 // D234: a long-lived sink can't hold `&Core`;
742 // route the re-subscribe through `em.defer`
743 // (owner-side, in-wave, FIFO-ordered). The
744 // returned `SubscriptionId` is recorded
745 // (D229 `(source, sub)` pair) inside the
746 // closure; the dead/violation path completes
747 // self there too.
748 let storage_d = storage_inner.clone();
749 let state_d = state.clone();
750 let _ = core_sink.defer(move |c| {
751 if let Ok(sub) = c.try_subscribe(source, new_sink) {
752 storage_d
753 .lock()
754 .entry(pid)
755 .or_default()
756 .subs
757 .push((source, sub));
758 } else {
759 // Source dead / partition
760 // violation — terminal complete.
761 let mut s = state_d.lock();
762 if !s.terminated {
763 s.terminated = true;
764 drop(s);
765 c.complete(pid);
766 }
767 }
768 });
769 }
770 }
771 }
772 }
773 });
774
775 // Store sink in the shared slot so resubscribe can access it.
776 *sink_slot.lock() = Some(source_sink.clone());
777
778 let outcome = ctx.subscribe_to(source, source_sink);
779 if matches!(outcome, SubscribeOutcome::Dead { .. }) {
780 // Dead source (non-resubscribable + terminated) will stay
781 // dead — resubscribing won't help. Complete immediately.
782 core_s.complete_or_defer(pid);
783 }
784 });
785
786 let fn_id = binding.register_producer_build(build);
787 core.register_producer(fn_id)
788 .expect("repeat: register_producer failed")
789}