graphrefly_operators/ops_impl.rs
1//! Concrete implementations of the four subscription-managed combinators
2//! (zip / concat / race / takeUntil). Built on the
3//! [`super::producer::ProducerCtx`] substrate.
4
5// Each sink closure runs a small phase-1/phase-2 dance (lock state,
6// collect actions, drop lock, replay actions). The match-then-if
7// structure inside phase 1 is intentional for readability; collapsing
8// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
9#![allow(clippy::collapsible_if, clippy::collapsible_match)]
10// `zip` is genuinely large (multi-arity tuple-pack with terminal-
11// cascade handling). Splitting would obscure the concurrent state-
12// machine logic across helpers without clear benefit.
13#![allow(clippy::too_many_lines)]
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::rc::Rc;
18use std::sync::Arc;
19
20use graphrefly_core::{Core, HandleId, NodeId, Sink};
21use smallvec::SmallVec;
22
23use super::error::OperatorFactoryError;
24use super::producer::{ProducerBinding, ProducerCtx};
25
26// =====================================================================
27// zip — pair handles N-wise across N sources
28// =====================================================================
29
30/// Per-zip-node state: one FIFO queue per source, plus a flag for each
31/// source's terminal. Lives behind `Rc<RefCell<_>>` captured by the
32/// build + sink closures.
33struct ZipState {
34 queues: Vec<VecDeque<HandleId>>,
35 completed: Vec<bool>,
36 errored: bool,
37 terminated: bool,
38}
39
40impl ZipState {
41 fn new(n: usize) -> Self {
42 Self {
43 queues: (0..n).map(|_| VecDeque::new()).collect(),
44 completed: vec![false; n],
45 errored: false,
46 terminated: false,
47 }
48 }
49}
50
51/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
52/// tuple, repeat. Models RxJS / TS `zip`:
53///
54/// - Each upstream DATA pushes into that source's per-source queue.
55/// - When **every** queue has at least one entry, pop one from each,
56/// pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
57/// and emit on the producer.
58/// - On any source's COMPLETE: if its queue is empty, terminate the
59/// producer with COMPLETE. Otherwise continue draining; terminate
60/// when this source's queue becomes empty (zip can't produce a
61/// tuple without input from every source).
62/// - On any source's ERROR: terminate the producer with the same
63/// ERROR (first error wins, like merge per Slice C-2 D022).
64///
65/// Empty source list (`n == 0`) emits a single empty-tuple event then
66/// completes. Single source (`n == 1`) is identity-passthrough.
67///
68/// # Refcount discipline
69///
70/// Each upstream DATA handle is `retain_handle`-bumped before being
71/// pushed onto a queue (the inbound message's payload retain belongs
72/// to the wave-end-flush release path; we take our own share for the
73/// queue). On pop, component handles are passed to `pack_tuple` which
74/// must NOT consume or release them — the caller (zip) retains
75/// ownership throughout the call and releases each component handle's
76/// queue share after `pack_tuple` returns. The returned tuple handle
77/// has a pre-bumped retain (binding convention per D020 doc on
78/// [`BindingBoundary::pack_tuple`]).
79/// # Errors
80///
81/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
82/// (R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).
83pub fn zip(
84 core: &Core,
85 binding: &Arc<dyn ProducerBinding>,
86 sources: Vec<NodeId>,
87 pack_fn_id: graphrefly_core::FnId,
88) -> Result<NodeId, OperatorFactoryError> {
89 // R5.7.x — zip requires ≥1 source. Mirrors `combine::combine` (which
90 // raises the same factory-shape invariant) so all bindings can route
91 // through `operator_factory_error_to_napi` / equivalent.
92 if sources.is_empty() {
93 return Err(OperatorFactoryError::EmptySources);
94 }
95 let n = sources.len();
96 // Weak-Arc captures break the BenchBinding → registry → producer_builds
97 // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
98 // pin the entire graph state when the host BenchCore drops with active
99 // producer registrations. See `Core::weak_handle` doc + Slice Y close.
100
101 let build = Box::new(move |ctx: ProducerCtx<'_>| {
102 let producer_id = ctx.node_id();
103 let binding_clone = ctx.core().binding();
104 let em = ctx.emitter();
105 // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
106 let state: Rc<RefCell<ZipState>> = Rc::new(RefCell::new(ZipState::new(n)));
107
108 for (idx, &source) in sources.iter().enumerate() {
109 let state_inner = state.clone();
110 // Sinks live only while the producer is active (cleared via
111 // producer_deactivate on last-subscriber unsubscribe), so they
112 // can safely capture strong refs cloned from the upgraded weaks.
113 let core_inner = em.clone();
114 let binding_inner = binding_clone.clone();
115 let sink: Sink = Rc::new(move |msgs| {
116 // Phase 1 (lock held): mutate queues + collect actions.
117 // Phase 2 (lock released): pack tuples + re-enter Core.
118 enum PostLockAction {
119 /// Pack popped handles into a tuple, release components, emit.
120 PackAndEmit(Vec<HandleId>),
121 Complete,
122 Error(HandleId),
123 }
124 let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
125 // Handles to release after the lock drops (P2:
126 // drain queues on terminate to avoid handle leaks).
127 let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
128 {
129 let mut s = state_inner.borrow_mut();
130 if s.terminated {
131 return;
132 }
133 // Tier-based dispatch (canonical §4.2; see
134 // `feedback_use_tier_for_signal_routing.md`). Tier 3
135 // payload_handle.is_some() = DATA; tier 5
136 // payload_handle.is_some() = ERROR else COMPLETE.
137 for m in msgs {
138 match m.tier() {
139 3 => {
140 if let Some(h) = m.payload_handle() {
141 binding_inner.retain_handle(h);
142 s.queues[idx].push_back(h);
143 // Collect complete tuples — pack_tuple runs
144 // after the lock drops (P5).
145 while s.queues.iter().all(|q| !q.is_empty()) {
146 let popped: Vec<HandleId> = s
147 .queues
148 .iter_mut()
149 .map(|q| q.pop_front().unwrap())
150 .collect();
151 post_actions.push(PostLockAction::PackAndEmit(popped));
152 }
153 }
154 // else: Resolved on a source — no action.
155 }
156 5 => {
157 if let Some(h) = m.payload_handle() {
158 // Error
159 if !s.errored && !s.terminated {
160 s.errored = true;
161 s.terminated = true;
162 binding_inner.retain_handle(h);
163 // P2: release all remaining queued handles.
164 for q in &mut s.queues {
165 to_release.extend(q.drain(..));
166 }
167 post_actions.push(PostLockAction::Error(h));
168 }
169 } else {
170 // Complete
171 s.completed[idx] = true;
172 // If this source's queue is empty, no more
173 // tuples from it — terminate.
174 if s.queues[idx].is_empty() && !s.terminated {
175 s.terminated = true;
176 // P2: release all remaining queued handles.
177 for q in &mut s.queues {
178 to_release.extend(q.drain(..));
179 }
180 post_actions.push(PostLockAction::Complete);
181 }
182 }
183 }
184 _ => {} // Tiers 0/1/2/4/6 — no action.
185 }
186 }
187 }
188 // Release leaked queue handles outside the lock.
189 for h in to_release {
190 binding_inner.release_handle(h);
191 }
192 // Phase 2 (lock released): pack tuples + re-enter Core.
193 // P5: pack_tuple runs outside the per-zip lock to avoid
194 // deadlock if the binding's pack_tuple re-enters.
195 for action in post_actions {
196 match action {
197 PostLockAction::PackAndEmit(popped) => {
198 let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
199 for h in &popped {
200 binding_inner.release_handle(*h);
201 }
202 core_inner.emit(producer_id, tuple_h);
203 }
204 PostLockAction::Complete => core_inner.complete(producer_id),
205 PostLockAction::Error(h) => core_inner.error(producer_id, h),
206 }
207 }
208 });
209 // F2 /qa: on Dead, synthesize the per-source Complete in
210 // zip's state machine — `s.completed[idx] = true` and (if
211 // queue is empty, which it always is at activation since
212 // DATA hasn't yet flowed) self-Complete the producer.
213 // Pre-F2 a Dead source left zip waiting on a queue that
214 // would never fill → silent wedge.
215 let outcome = ctx.subscribe_to(source, sink);
216 if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
217 let core_dead = em.clone();
218 let binding_dead = binding_clone.clone();
219 let mut should_complete = false;
220 let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
221 {
222 let mut s = state.borrow_mut();
223 if !s.terminated {
224 s.completed[idx] = true;
225 if s.queues[idx].is_empty() {
226 s.terminated = true;
227 for q in &mut s.queues {
228 to_release.extend(q.drain(..));
229 }
230 should_complete = true;
231 }
232 }
233 }
234 for h in to_release {
235 binding_dead.release_handle(h);
236 }
237 if should_complete {
238 core_dead.complete(producer_id);
239 }
240 }
241 }
242 });
243
244 let fn_id = binding.register_producer_build(build);
245 Ok(core
246 .register_producer(fn_id)
247 .expect("invariant: register_producer has no deps; no error variants reachable"))
248}
249
250// =====================================================================
251// concat — sequentially forward `first` then `second`
252// =====================================================================
253
254struct ConcatState {
255 /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
256 phase: u8,
257 /// Buffered DATA from `second` that arrived during phase 0 (before
258 /// `first` completed). Drained on phase transition.
259 pending: VecDeque<HandleId>,
260 /// Set to true if `second` completed during phase 0. On phase
261 /// transition, after draining `pending`, concat self-completes
262 /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
263 second_completed: bool,
264 terminated: bool,
265}
266
267impl ConcatState {
268 fn new() -> Self {
269 Self {
270 phase: 0,
271 pending: VecDeque::new(),
272 second_completed: false,
273 terminated: false,
274 }
275 }
276}
277
278/// `concat(first, second)` — forward DATA from `first` until it
279/// completes, then drain any DATA `second` emitted during phase 1
280/// (buffered) and continue forwarding `second`. ERROR from either
281/// source terminates the producer with the same ERROR.
282///
283/// Subscribes to BOTH sources at activation time (matches TS impl in
284/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
285/// race after `first` completes). DATA from `second` during phase 0
286/// is buffered, not forwarded.
287#[must_use]
288pub fn concat(
289 core: &Core,
290 binding: &Arc<dyn ProducerBinding>,
291 first: NodeId,
292 second: NodeId,
293) -> NodeId {
294 // Weak captures break the producer-build Arc cycle (see `zip` doc).
295
296 let build = Box::new(move |ctx: ProducerCtx<'_>| {
297 let producer_id = ctx.node_id();
298 let binding_clone = ctx.core().binding();
299 let em = ctx.emitter();
300 let state: Rc<RefCell<ConcatState>> = Rc::new(RefCell::new(ConcatState::new()));
301
302 // Subscribe to second FIRST so phase-0 DATA buffering catches
303 // synchronous initial emissions. Sinks capture strong refs cloned
304 // from the upgraded weaks; sink lifetime tied to producer activation.
305 let state_for_second = state.clone();
306 let core_for_second = em.clone();
307 let binding_for_second = binding_clone.clone();
308 let second_sink: Sink = Rc::new(move |msgs| {
309 enum Action {
310 Emit(HandleId),
311 Complete,
312 Error(HandleId),
313 }
314 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
315 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
316 {
317 let mut s = state_for_second.borrow_mut();
318 if s.terminated {
319 return;
320 }
321 // Tier-based dispatch (canonical §4.2).
322 for m in msgs {
323 match m.tier() {
324 3 => {
325 if let Some(h) = m.payload_handle() {
326 if s.phase == 0 {
327 // Buffer for later drain.
328 binding_for_second.retain_handle(h);
329 s.pending.push_back(h);
330 } else {
331 binding_for_second.retain_handle(h);
332 actions.push(Action::Emit(h));
333 }
334 }
335 // else: Resolved on second source — no action.
336 }
337 5 => {
338 if let Some(h) = m.payload_handle() {
339 // Error
340 if !s.terminated {
341 s.terminated = true;
342 binding_for_second.retain_handle(h);
343 // P2: release buffered pending handles.
344 to_release.extend(s.pending.drain(..));
345 actions.push(Action::Error(h));
346 }
347 } else {
348 // Complete
349 if s.phase == 1 && !s.terminated {
350 s.terminated = true;
351 actions.push(Action::Complete);
352 } else if s.phase == 0 {
353 // D041 / D4 fix: remember that second
354 // completed during phase 0. On phase
355 // transition, after draining `pending`,
356 // first_sink will self-complete
357 // (second's Complete fires once and
358 // won't be re-observed).
359 s.second_completed = true;
360 }
361 }
362 }
363 _ => {} // Tiers 0/1/2/4/6 — no action.
364 }
365 }
366 }
367 for h in to_release {
368 binding_for_second.release_handle(h);
369 }
370 for action in actions {
371 match action {
372 Action::Emit(h) => core_for_second.emit(producer_id, h),
373 Action::Complete => core_for_second.complete(producer_id),
374 Action::Error(h) => core_for_second.error(producer_id, h),
375 }
376 }
377 });
378 // F2 /qa: Dead `second` is observed via `second_completed` flag.
379 // First-Complete drains pending and self-Completes if second
380 // already completed; same logic handles Dead second.
381 let second_outcome = ctx.subscribe_to(second, second_sink);
382 if matches!(
383 second_outcome,
384 crate::producer::SubscribeOutcome::Dead { .. }
385 ) {
386 let mut s = state.borrow_mut();
387 s.second_completed = true;
388 // No additional action — the first-Complete path or first-Dead
389 // path below will trigger producer-Complete.
390 }
391
392 let state_for_first = state.clone();
393 let core_for_first = em.clone();
394 let binding_for_first = binding_clone.clone();
395 let first_sink: Sink = Rc::new(move |msgs| {
396 // first.Complete triggers the phase transition (handled
397 // via `s.phase = 1` + draining pending into `actions`),
398 // and may also self-complete the producer if `second`
399 // already completed during phase 0 (D041 / D4 fix).
400 enum Action {
401 Emit(HandleId),
402 Complete,
403 Error(HandleId),
404 }
405 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
406 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
407 {
408 let mut s = state_for_first.borrow_mut();
409 if s.terminated {
410 return;
411 }
412 if s.phase != 0 {
413 return; // first is done; ignore stale messages.
414 }
415 for m in msgs {
416 // Slice E /qa: defensive per-iteration `terminated`
417 // guard. The outer guard at the top of the lock
418 // block catches a `terminated` set BEFORE this
419 // batch arrived; this per-iteration check catches
420 // the case where an earlier message in the SAME
421 // batch transitioned us terminal (e.g., post-
422 // Complete the phase moved to 1, but a defensively-
423 // emitted stale `[Data]` later in the same batch
424 // would otherwise queue a useless retain that
425 // `core.emit` would discard on a terminal
426 // producer).
427 if s.terminated || s.phase != 0 {
428 break;
429 }
430 // Tier-based dispatch (canonical §4.2).
431 match m.tier() {
432 3 => {
433 if let Some(h) = m.payload_handle() {
434 binding_for_first.retain_handle(h);
435 actions.push(Action::Emit(h));
436 }
437 // else: Resolved on first source — no action.
438 }
439 5 => {
440 if let Some(h) = m.payload_handle() {
441 // Error
442 if !s.terminated {
443 s.terminated = true;
444 binding_for_first.retain_handle(h);
445 // P2: release buffered pending handles.
446 to_release.extend(s.pending.drain(..));
447 actions.push(Action::Error(h));
448 }
449 } else {
450 // Complete — phase transition: drain pending
451 // second-data, then continue forwarding from
452 // second.
453 s.phase = 1;
454 // Pending handles already retained at buffer time.
455 for h in s.pending.drain(..) {
456 actions.push(Action::Emit(h));
457 }
458 // D041 / D4 fix: if second already completed
459 // during phase 0, self-complete now (its
460 // Complete fired once and won't re-fire).
461 if s.second_completed && !s.terminated {
462 s.terminated = true;
463 actions.push(Action::Complete);
464 }
465 }
466 }
467 _ => {} // Tiers 0/1/2/4/6 — no action.
468 }
469 }
470 }
471 for h in to_release {
472 binding_for_first.release_handle(h);
473 }
474 for action in actions {
475 match action {
476 Action::Emit(h) => core_for_first.emit(producer_id, h),
477 Action::Complete => core_for_first.complete(producer_id),
478 Action::Error(h) => core_for_first.error(producer_id, h),
479 }
480 }
481 });
482 // F2 /qa: Dead `first` triggers the phase transition immediately
483 // (treat as first-Complete). If `second` is also dead (or already
484 // completed in phase 0), self-Complete; else continue forwarding
485 // pending+future from second.
486 let first_outcome = ctx.subscribe_to(first, first_sink);
487 if matches!(
488 first_outcome,
489 crate::producer::SubscribeOutcome::Dead { .. }
490 ) {
491 let core_first_dead = em.clone();
492 let mut should_complete = false;
493 let mut pending_to_emit: Vec<HandleId> = Vec::new();
494 {
495 let mut s = state.borrow_mut();
496 if !s.terminated && s.phase == 0 {
497 s.phase = 1;
498 // Drain pending second-DATA buffered during phase 0.
499 pending_to_emit.extend(s.pending.drain(..));
500 // If second already completed (or was Dead), self-Complete.
501 if s.second_completed && !s.terminated {
502 s.terminated = true;
503 should_complete = true;
504 }
505 }
506 }
507 for h in pending_to_emit {
508 core_first_dead.emit(producer_id, h);
509 }
510 if should_complete {
511 core_first_dead.complete(producer_id);
512 }
513 }
514 });
515
516 let fn_id = binding.register_producer_build(build);
517 core.register_producer(fn_id)
518 .expect("invariant: register_producer has no deps; no error variants reachable")
519}
520
521// =====================================================================
522// race — first source to emit DATA wins; losers are ignored
523// =====================================================================
524
525struct RaceState {
526 /// Index of the winning source, or `None` if no winner yet.
527 winner: Option<usize>,
528 /// Per-source completed flags. When all complete without a winner,
529 /// the producer completes (P4: no-winner all-complete termination).
530 completed: Vec<bool>,
531 terminated: bool,
532}
533
534impl RaceState {
535 fn new(n: usize) -> Self {
536 Self {
537 winner: None,
538 completed: vec![false; n],
539 terminated: false,
540 }
541 }
542}
543
544/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
545/// emit DATA wins. Subsequent traffic from the winner is forwarded;
546/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
547/// but their sink callbacks short-circuit). Saves the dynamic
548/// rewiring cost of explicitly unsubscribing losers.
549///
550/// Empty source list completes immediately. Single source is
551/// identity-passthrough.
552/// # Errors
553///
554/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
555/// (R5.7.x — race requires ≥1 source; no-winner-possible rejected).
556pub fn race(
557 core: &Core,
558 binding: &Arc<dyn ProducerBinding>,
559 sources: Vec<NodeId>,
560) -> Result<NodeId, OperatorFactoryError> {
561 // R5.7.x — race requires ≥1 source. Mirrors `combine::combine`.
562 if sources.is_empty() {
563 return Err(OperatorFactoryError::EmptySources);
564 }
565 let n = sources.len();
566 // Weak captures break the producer-build Arc cycle (see `zip` doc).
567
568 let build = Box::new(move |ctx: ProducerCtx<'_>| {
569 let producer_id = ctx.node_id();
570 let binding_clone = ctx.core().binding();
571 let em = ctx.emitter();
572 // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
573 let state: Rc<RefCell<RaceState>> = Rc::new(RefCell::new(RaceState::new(n)));
574
575 for (idx, &source) in sources.iter().enumerate() {
576 let state_inner = state.clone();
577 let core_inner = em.clone();
578 let binding_inner = binding_clone.clone();
579 let sink: Sink = Rc::new(move |msgs| {
580 enum Action {
581 Emit(HandleId),
582 Complete,
583 Error(HandleId),
584 }
585 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
586 {
587 let mut s = state_inner.borrow_mut();
588 if s.terminated {
589 return;
590 }
591 // Tier-based dispatch (canonical §4.2).
592 for m in msgs {
593 match m.tier() {
594 3 => {
595 if let Some(h) = m.payload_handle() {
596 // P3: check s.winner live each iteration —
597 // this source may have just become the winner
598 // earlier in this batch.
599 if s.winner.is_none() {
600 s.winner = Some(idx);
601 binding_inner.retain_handle(h);
602 actions.push(Action::Emit(h));
603 } else if s.winner == Some(idx) {
604 binding_inner.retain_handle(h);
605 actions.push(Action::Emit(h));
606 }
607 // else: loser DATA — ignore.
608 }
609 // else: Resolved on a source — no action.
610 }
611 5 => {
612 if let Some(h) = m.payload_handle() {
613 // Error — from any source pre-winner OR from
614 // the winner cascade; loser errors post-winner
615 // are ignored.
616 if (s.winner.is_none() || s.winner == Some(idx))
617 && !s.terminated
618 {
619 s.terminated = true;
620 binding_inner.retain_handle(h);
621 actions.push(Action::Error(h));
622 }
623 } else {
624 // Complete
625 s.completed[idx] = true;
626 if s.winner == Some(idx) && !s.terminated {
627 s.terminated = true;
628 actions.push(Action::Complete);
629 } else if s.winner.is_none()
630 && s.completed.iter().all(|&c| c)
631 && !s.terminated
632 {
633 // P4: all sources completed without a
634 // winner — terminate the producer.
635 s.terminated = true;
636 actions.push(Action::Complete);
637 }
638 // else: loser complete — ignore.
639 }
640 }
641 _ => {} // Tiers 0/1/2/4/6 — no action.
642 }
643 }
644 }
645 for action in actions {
646 match action {
647 Action::Emit(h) => core_inner.emit(producer_id, h),
648 Action::Complete => core_inner.complete(producer_id),
649 Action::Error(h) => core_inner.error(producer_id, h),
650 }
651 }
652 });
653 // F2 /qa: on Dead, treat as `completed[idx] = true`. If
654 // all sources are now completed without a winner, self-
655 // Complete (matches the P4 "all completed no winner"
656 // branch in the per-source sink above).
657 let outcome = ctx.subscribe_to(source, sink);
658 if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
659 let core_dead = em.clone();
660 let mut should_complete = false;
661 {
662 let mut s = state.borrow_mut();
663 if !s.terminated && s.winner.is_none() {
664 s.completed[idx] = true;
665 if s.completed.iter().all(|&c| c) {
666 s.terminated = true;
667 should_complete = true;
668 }
669 }
670 }
671 if should_complete {
672 core_dead.complete(producer_id);
673 }
674 }
675 }
676 });
677
678 let fn_id = binding.register_producer_build(build);
679 Ok(core
680 .register_producer(fn_id)
681 .expect("invariant: register_producer has no deps; no error variants reachable"))
682}
683
684// =====================================================================
685// takeUntil — terminate when notifier emits DATA
686// =====================================================================
687
688struct TakeUntilState {
689 terminated: bool,
690}
691
692impl TakeUntilState {
693 fn new() -> Self {
694 Self { terminated: false }
695 }
696}
697
698/// `take_until(source, notifier)` — forward DATA from `source` until
699/// `notifier` emits its first DATA, then terminate the producer with
700/// COMPLETE. Errors from either source cascade. Source COMPLETE
701/// terminates the producer.
702///
703/// Notifier DATA is consumed but never forwarded (zero-FFI on the
704/// notifier path — we don't dereference its payload, just use the
705/// emission as a signal).
706#[must_use]
707pub fn take_until(
708 core: &Core,
709 binding: &Arc<dyn ProducerBinding>,
710 source: NodeId,
711 notifier: NodeId,
712) -> NodeId {
713 // Weak captures break the producer-build Arc cycle (see `zip` doc).
714
715 let build = Box::new(move |ctx: ProducerCtx<'_>| {
716 let producer_id = ctx.node_id();
717 let binding_clone = ctx.core().binding();
718 let em = ctx.emitter();
719 let state: Rc<RefCell<TakeUntilState>> = Rc::new(RefCell::new(TakeUntilState::new()));
720
721 // Source sink: forward DATA, propagate terminals.
722 let state_for_source = state.clone();
723 let core_for_source = em.clone();
724 let binding_for_source = binding_clone.clone();
725 let source_sink: Sink = Rc::new(move |msgs| {
726 enum Action {
727 Emit(HandleId),
728 Complete,
729 Error(HandleId),
730 }
731 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
732 {
733 let mut s = state_for_source.borrow_mut();
734 if s.terminated {
735 return;
736 }
737 // Tier-based dispatch (canonical §4.2).
738 for m in msgs {
739 match m.tier() {
740 3 => {
741 if let Some(h) = m.payload_handle() {
742 binding_for_source.retain_handle(h);
743 actions.push(Action::Emit(h));
744 }
745 // else: Resolved on source — no action.
746 }
747 5 => {
748 if let Some(h) = m.payload_handle() {
749 // Error
750 if !s.terminated {
751 s.terminated = true;
752 binding_for_source.retain_handle(h);
753 actions.push(Action::Error(h));
754 }
755 } else {
756 // Complete
757 if !s.terminated {
758 s.terminated = true;
759 actions.push(Action::Complete);
760 }
761 }
762 }
763 _ => {} // Tiers 0/1/2/4/6 — no action.
764 }
765 }
766 }
767 for action in actions {
768 match action {
769 Action::Emit(h) => core_for_source.emit(producer_id, h),
770 Action::Complete => core_for_source.complete(producer_id),
771 Action::Error(h) => core_for_source.error(producer_id, h),
772 }
773 }
774 });
775 // F2 /qa: Dead `source` → self-Complete (source is permanently
776 // over, so take_until has nothing to forward and the producer
777 // is finished).
778 let source_outcome = ctx.subscribe_to(source, source_sink);
779 if matches!(
780 source_outcome,
781 crate::producer::SubscribeOutcome::Dead { .. }
782 ) {
783 let core_dead = em.clone();
784 let mut should_complete = false;
785 {
786 let mut s = state.borrow_mut();
787 if !s.terminated {
788 s.terminated = true;
789 should_complete = true;
790 }
791 }
792 if should_complete {
793 core_dead.complete(producer_id);
794 }
795 }
796
797 // Notifier sink: any DATA → terminate; ERROR → cascade.
798 let state_for_notifier = state.clone();
799 let core_for_notifier = em.clone();
800 let binding_for_notifier = binding_clone.clone();
801 let notifier_sink: Sink = Rc::new(move |msgs| {
802 enum Action {
803 Complete,
804 Error(HandleId),
805 }
806 let mut action: Option<Action> = None;
807 {
808 let mut s = state_for_notifier.borrow_mut();
809 if s.terminated {
810 return;
811 }
812 // Tier-based dispatch (canonical §4.2).
813 for m in msgs {
814 match m.tier() {
815 3 => {
816 // Any DATA on notifier → complete the producer.
817 // (Resolved alone — no payload — no action.)
818 // We don't emit notifier DATA downstream, so we
819 // don't even need to extract the handle.
820 if m.payload_handle().is_some() && !s.terminated {
821 s.terminated = true;
822 action = Some(Action::Complete);
823 break;
824 }
825 }
826 5 => {
827 // Error: cascade. Complete (payload_handle.is_none())
828 // without prior DATA: nothing to do — source
829 // continues independently.
830 if let Some(h) = m.payload_handle() {
831 if !s.terminated {
832 s.terminated = true;
833 binding_for_notifier.retain_handle(h);
834 action = Some(Action::Error(h));
835 break;
836 }
837 }
838 }
839 _ => {} // Tiers 0/1/2/4/6 — no action.
840 }
841 }
842 }
843 if let Some(a) = action {
844 match a {
845 Action::Complete => core_for_notifier.complete(producer_id),
846 Action::Error(h) => core_for_notifier.error(producer_id, h),
847 }
848 }
849 });
850 // F2 /qa: Dead `notifier` → ignore. Notifier signal can never
851 // fire, so take_until reduces to a passthrough of source. The
852 // source's own Complete/Error will terminate the producer
853 // normally; if source is also Dead, the source-Dead branch
854 // above already self-Completed.
855 let _ = ctx.subscribe_to(notifier, notifier_sink);
856 });
857
858 let fn_id = binding.register_producer_build(build);
859 core.register_producer(fn_id)
860 .expect("invariant: register_producer has no deps; no error variants reachable")
861}