Skip to main content

graphrefly_operators/
ops_impl.rs

1//! Concrete implementations of the four subscription-managed combinators
2//! (zip / concat / race / takeUntil). Built on the
3//! [`super::producer::ProducerCtx`] substrate.
4
5// Each sink closure runs a small phase-1/phase-2 dance (lock state,
6// collect actions, drop lock, replay actions). The match-then-if
7// structure inside phase 1 is intentional for readability; collapsing
8// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
9#![allow(clippy::collapsible_if, clippy::collapsible_match)]
10// `zip` is genuinely large (multi-arity tuple-pack with terminal-
11// cascade handling). Splitting would obscure the concurrent state-
12// machine logic across helpers without clear benefit.
13#![allow(clippy::too_many_lines)]
14
15use std::cell::RefCell;
16use std::collections::VecDeque;
17use std::rc::Rc;
18use std::sync::Arc;
19
20use graphrefly_core::{Core, HandleId, NodeId, Sink};
21use smallvec::SmallVec;
22
23use super::error::OperatorFactoryError;
24use super::producer::{ProducerBinding, ProducerCtx};
25
26// =====================================================================
27// zip — pair handles N-wise across N sources
28// =====================================================================
29
30/// Per-zip-node state: one FIFO queue per source, plus a flag for each
31/// source's terminal. Lives behind `Rc<RefCell<_>>` captured by the
32/// build + sink closures.
33struct ZipState {
34    queues: Vec<VecDeque<HandleId>>,
35    completed: Vec<bool>,
36    errored: bool,
37    terminated: bool,
38}
39
40impl ZipState {
41    fn new(n: usize) -> Self {
42        Self {
43            queues: (0..n).map(|_| VecDeque::new()).collect(),
44            completed: vec![false; n],
45            errored: false,
46            terminated: false,
47        }
48    }
49}
50
51/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
52/// tuple, repeat. Models RxJS / TS `zip`:
53///
54/// - Each upstream DATA pushes into that source's per-source queue.
55/// - When **every** queue has at least one entry, pop one from each,
56///   pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
57///   and emit on the producer.
58/// - On any source's COMPLETE: if its queue is empty, terminate the
59///   producer with COMPLETE. Otherwise continue draining; terminate
60///   when this source's queue becomes empty (zip can't produce a
61///   tuple without input from every source).
62/// - On any source's ERROR: terminate the producer with the same
63///   ERROR (first error wins, like merge per Slice C-2 D022).
64///
65/// Empty source list (`n == 0`) emits a single empty-tuple event then
66/// completes. Single source (`n == 1`) is identity-passthrough.
67///
68/// # Refcount discipline
69///
70/// Each upstream DATA handle is `retain_handle`-bumped before being
71/// pushed onto a queue (the inbound message's payload retain belongs
72/// to the wave-end-flush release path; we take our own share for the
73/// queue). On pop, component handles are passed to `pack_tuple` which
74/// must NOT consume or release them — the caller (zip) retains
75/// ownership throughout the call and releases each component handle's
76/// queue share after `pack_tuple` returns. The returned tuple handle
77/// has a pre-bumped retain (binding convention per D020 doc on
78/// [`BindingBoundary::pack_tuple`]).
79/// # Errors
80///
81/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
82/// (R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).
83pub fn zip(
84    core: &Core,
85    binding: &Arc<dyn ProducerBinding>,
86    sources: Vec<NodeId>,
87    pack_fn_id: graphrefly_core::FnId,
88) -> Result<NodeId, OperatorFactoryError> {
89    // R5.7.x — zip requires ≥1 source. Mirrors `combine::combine` (which
90    // raises the same factory-shape invariant) so all bindings can route
91    // through `operator_factory_error_to_napi` / equivalent.
92    if sources.is_empty() {
93        return Err(OperatorFactoryError::EmptySources);
94    }
95    let n = sources.len();
96    // Weak-Arc captures break the BenchBinding → registry → producer_builds
97    // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
98    // pin the entire graph state when the host BenchCore drops with active
99    // producer registrations. See `Core::weak_handle` doc + Slice Y close.
100
101    let build = Box::new(move |ctx: ProducerCtx<'_>| {
102        let producer_id = ctx.node_id();
103        let binding_clone = ctx.core().binding();
104        let em = ctx.emitter();
105        // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
106        let state: Rc<RefCell<ZipState>> = Rc::new(RefCell::new(ZipState::new(n)));
107
108        for (idx, &source) in sources.iter().enumerate() {
109            let state_inner = state.clone();
110            // Sinks live only while the producer is active (cleared via
111            // producer_deactivate on last-subscriber unsubscribe), so they
112            // can safely capture strong refs cloned from the upgraded weaks.
113            let core_inner = em.clone();
114            let binding_inner = binding_clone.clone();
115            let sink: Sink = Rc::new(move |msgs| {
116                // Phase 1 (lock held): mutate queues + collect actions.
117                // Phase 2 (lock released): pack tuples + re-enter Core.
118                enum PostLockAction {
119                    /// Pack popped handles into a tuple, release components, emit.
120                    PackAndEmit(Vec<HandleId>),
121                    Complete,
122                    Error(HandleId),
123                }
124                let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
125                // Handles to release after the lock drops (P2:
126                // drain queues on terminate to avoid handle leaks).
127                let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
128                {
129                    let mut s = state_inner.borrow_mut();
130                    if s.terminated {
131                        return;
132                    }
133                    // Tier-based dispatch (canonical §4.2; see
134                    // `feedback_use_tier_for_signal_routing.md`). Tier 3
135                    // payload_handle.is_some() = DATA; tier 5
136                    // payload_handle.is_some() = ERROR else COMPLETE.
137                    for m in msgs {
138                        match m.tier() {
139                            3 => {
140                                if let Some(h) = m.payload_handle() {
141                                    binding_inner.retain_handle(h);
142                                    s.queues[idx].push_back(h);
143                                    // Collect complete tuples — pack_tuple runs
144                                    // after the lock drops (P5).
145                                    while s.queues.iter().all(|q| !q.is_empty()) {
146                                        let popped: Vec<HandleId> = s
147                                            .queues
148                                            .iter_mut()
149                                            .map(|q| q.pop_front().unwrap())
150                                            .collect();
151                                        post_actions.push(PostLockAction::PackAndEmit(popped));
152                                    }
153                                }
154                                // else: Resolved on a source — no action.
155                            }
156                            5 => {
157                                if let Some(h) = m.payload_handle() {
158                                    // Error
159                                    if !s.errored && !s.terminated {
160                                        s.errored = true;
161                                        s.terminated = true;
162                                        binding_inner.retain_handle(h);
163                                        // P2: release all remaining queued handles.
164                                        for q in &mut s.queues {
165                                            to_release.extend(q.drain(..));
166                                        }
167                                        post_actions.push(PostLockAction::Error(h));
168                                    }
169                                } else {
170                                    // Complete
171                                    s.completed[idx] = true;
172                                    // If this source's queue is empty, no more
173                                    // tuples from it — terminate.
174                                    if s.queues[idx].is_empty() && !s.terminated {
175                                        s.terminated = true;
176                                        // P2: release all remaining queued handles.
177                                        for q in &mut s.queues {
178                                            to_release.extend(q.drain(..));
179                                        }
180                                        post_actions.push(PostLockAction::Complete);
181                                    }
182                                }
183                            }
184                            _ => {} // Tiers 0/1/2/4/6 — no action.
185                        }
186                    }
187                }
188                // Release leaked queue handles outside the lock.
189                for h in to_release {
190                    binding_inner.release_handle(h);
191                }
192                // Phase 2 (lock released): pack tuples + re-enter Core.
193                // P5: pack_tuple runs outside the per-zip lock to avoid
194                // deadlock if the binding's pack_tuple re-enters.
195                for action in post_actions {
196                    match action {
197                        PostLockAction::PackAndEmit(popped) => {
198                            let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
199                            for h in &popped {
200                                binding_inner.release_handle(*h);
201                            }
202                            core_inner.emit(producer_id, tuple_h);
203                        }
204                        PostLockAction::Complete => core_inner.complete(producer_id),
205                        PostLockAction::Error(h) => core_inner.error(producer_id, h),
206                    }
207                }
208            });
209            // F2 /qa: on Dead, synthesize the per-source Complete in
210            // zip's state machine — `s.completed[idx] = true` and (if
211            // queue is empty, which it always is at activation since
212            // DATA hasn't yet flowed) self-Complete the producer.
213            // Pre-F2 a Dead source left zip waiting on a queue that
214            // would never fill → silent wedge.
215            let outcome = ctx.subscribe_to(source, sink);
216            if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
217                let core_dead = em.clone();
218                let binding_dead = binding_clone.clone();
219                let mut should_complete = false;
220                let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
221                {
222                    let mut s = state.borrow_mut();
223                    if !s.terminated {
224                        s.completed[idx] = true;
225                        if s.queues[idx].is_empty() {
226                            s.terminated = true;
227                            for q in &mut s.queues {
228                                to_release.extend(q.drain(..));
229                            }
230                            should_complete = true;
231                        }
232                    }
233                }
234                for h in to_release {
235                    binding_dead.release_handle(h);
236                }
237                if should_complete {
238                    core_dead.complete(producer_id);
239                }
240            }
241        }
242    });
243
244    let fn_id = binding.register_producer_build(build);
245    Ok(core
246        .register_producer(fn_id)
247        .expect("invariant: register_producer has no deps; no error variants reachable"))
248}
249
250// =====================================================================
251// concat — sequentially forward `first` then `second`
252// =====================================================================
253
254struct ConcatState {
255    /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
256    phase: u8,
257    /// Buffered DATA from `second` that arrived during phase 0 (before
258    /// `first` completed). Drained on phase transition.
259    pending: VecDeque<HandleId>,
260    /// Set to true if `second` completed during phase 0. On phase
261    /// transition, after draining `pending`, concat self-completes
262    /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
263    second_completed: bool,
264    terminated: bool,
265}
266
267impl ConcatState {
268    fn new() -> Self {
269        Self {
270            phase: 0,
271            pending: VecDeque::new(),
272            second_completed: false,
273            terminated: false,
274        }
275    }
276}
277
278/// `concat(first, second)` — forward DATA from `first` until it
279/// completes, then drain any DATA `second` emitted during phase 1
280/// (buffered) and continue forwarding `second`. ERROR from either
281/// source terminates the producer with the same ERROR.
282///
283/// Subscribes to BOTH sources at activation time (matches TS impl in
284/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
285/// race after `first` completes). DATA from `second` during phase 0
286/// is buffered, not forwarded.
287#[must_use]
288pub fn concat(
289    core: &Core,
290    binding: &Arc<dyn ProducerBinding>,
291    first: NodeId,
292    second: NodeId,
293) -> NodeId {
294    // Weak captures break the producer-build Arc cycle (see `zip` doc).
295
296    let build = Box::new(move |ctx: ProducerCtx<'_>| {
297        let producer_id = ctx.node_id();
298        let binding_clone = ctx.core().binding();
299        let em = ctx.emitter();
300        let state: Rc<RefCell<ConcatState>> = Rc::new(RefCell::new(ConcatState::new()));
301
302        // Subscribe to second FIRST so phase-0 DATA buffering catches
303        // synchronous initial emissions. Sinks capture strong refs cloned
304        // from the upgraded weaks; sink lifetime tied to producer activation.
305        let state_for_second = state.clone();
306        let core_for_second = em.clone();
307        let binding_for_second = binding_clone.clone();
308        let second_sink: Sink = Rc::new(move |msgs| {
309            enum Action {
310                Emit(HandleId),
311                Complete,
312                Error(HandleId),
313            }
314            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
315            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
316            {
317                let mut s = state_for_second.borrow_mut();
318                if s.terminated {
319                    return;
320                }
321                // Tier-based dispatch (canonical §4.2).
322                for m in msgs {
323                    match m.tier() {
324                        3 => {
325                            if let Some(h) = m.payload_handle() {
326                                if s.phase == 0 {
327                                    // Buffer for later drain.
328                                    binding_for_second.retain_handle(h);
329                                    s.pending.push_back(h);
330                                } else {
331                                    binding_for_second.retain_handle(h);
332                                    actions.push(Action::Emit(h));
333                                }
334                            }
335                            // else: Resolved on second source — no action.
336                        }
337                        5 => {
338                            if let Some(h) = m.payload_handle() {
339                                // Error
340                                if !s.terminated {
341                                    s.terminated = true;
342                                    binding_for_second.retain_handle(h);
343                                    // P2: release buffered pending handles.
344                                    to_release.extend(s.pending.drain(..));
345                                    actions.push(Action::Error(h));
346                                }
347                            } else {
348                                // Complete
349                                if s.phase == 1 && !s.terminated {
350                                    s.terminated = true;
351                                    actions.push(Action::Complete);
352                                } else if s.phase == 0 {
353                                    // D041 / D4 fix: remember that second
354                                    // completed during phase 0. On phase
355                                    // transition, after draining `pending`,
356                                    // first_sink will self-complete
357                                    // (second's Complete fires once and
358                                    // won't be re-observed).
359                                    s.second_completed = true;
360                                }
361                            }
362                        }
363                        _ => {} // Tiers 0/1/2/4/6 — no action.
364                    }
365                }
366            }
367            for h in to_release {
368                binding_for_second.release_handle(h);
369            }
370            for action in actions {
371                match action {
372                    Action::Emit(h) => core_for_second.emit(producer_id, h),
373                    Action::Complete => core_for_second.complete(producer_id),
374                    Action::Error(h) => core_for_second.error(producer_id, h),
375                }
376            }
377        });
378        // F2 /qa: Dead `second` is observed via `second_completed` flag.
379        // First-Complete drains pending and self-Completes if second
380        // already completed; same logic handles Dead second.
381        let second_outcome = ctx.subscribe_to(second, second_sink);
382        if matches!(
383            second_outcome,
384            crate::producer::SubscribeOutcome::Dead { .. }
385        ) {
386            let mut s = state.borrow_mut();
387            s.second_completed = true;
388            // No additional action — the first-Complete path or first-Dead
389            // path below will trigger producer-Complete.
390        }
391
392        let state_for_first = state.clone();
393        let core_for_first = em.clone();
394        let binding_for_first = binding_clone.clone();
395        let first_sink: Sink = Rc::new(move |msgs| {
396            // first.Complete triggers the phase transition (handled
397            // via `s.phase = 1` + draining pending into `actions`),
398            // and may also self-complete the producer if `second`
399            // already completed during phase 0 (D041 / D4 fix).
400            enum Action {
401                Emit(HandleId),
402                Complete,
403                Error(HandleId),
404            }
405            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
406            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
407            {
408                let mut s = state_for_first.borrow_mut();
409                if s.terminated {
410                    return;
411                }
412                if s.phase != 0 {
413                    return; // first is done; ignore stale messages.
414                }
415                for m in msgs {
416                    // Slice E /qa: defensive per-iteration `terminated`
417                    // guard. The outer guard at the top of the lock
418                    // block catches a `terminated` set BEFORE this
419                    // batch arrived; this per-iteration check catches
420                    // the case where an earlier message in the SAME
421                    // batch transitioned us terminal (e.g., post-
422                    // Complete the phase moved to 1, but a defensively-
423                    // emitted stale `[Data]` later in the same batch
424                    // would otherwise queue a useless retain that
425                    // `core.emit` would discard on a terminal
426                    // producer).
427                    if s.terminated || s.phase != 0 {
428                        break;
429                    }
430                    // Tier-based dispatch (canonical §4.2).
431                    match m.tier() {
432                        3 => {
433                            if let Some(h) = m.payload_handle() {
434                                binding_for_first.retain_handle(h);
435                                actions.push(Action::Emit(h));
436                            }
437                            // else: Resolved on first source — no action.
438                        }
439                        5 => {
440                            if let Some(h) = m.payload_handle() {
441                                // Error
442                                if !s.terminated {
443                                    s.terminated = true;
444                                    binding_for_first.retain_handle(h);
445                                    // P2: release buffered pending handles.
446                                    to_release.extend(s.pending.drain(..));
447                                    actions.push(Action::Error(h));
448                                }
449                            } else {
450                                // Complete — phase transition: drain pending
451                                // second-data, then continue forwarding from
452                                // second.
453                                s.phase = 1;
454                                // Pending handles already retained at buffer time.
455                                for h in s.pending.drain(..) {
456                                    actions.push(Action::Emit(h));
457                                }
458                                // D041 / D4 fix: if second already completed
459                                // during phase 0, self-complete now (its
460                                // Complete fired once and won't re-fire).
461                                if s.second_completed && !s.terminated {
462                                    s.terminated = true;
463                                    actions.push(Action::Complete);
464                                }
465                            }
466                        }
467                        _ => {} // Tiers 0/1/2/4/6 — no action.
468                    }
469                }
470            }
471            for h in to_release {
472                binding_for_first.release_handle(h);
473            }
474            for action in actions {
475                match action {
476                    Action::Emit(h) => core_for_first.emit(producer_id, h),
477                    Action::Complete => core_for_first.complete(producer_id),
478                    Action::Error(h) => core_for_first.error(producer_id, h),
479                }
480            }
481        });
482        // F2 /qa: Dead `first` triggers the phase transition immediately
483        // (treat as first-Complete). If `second` is also dead (or already
484        // completed in phase 0), self-Complete; else continue forwarding
485        // pending+future from second.
486        let first_outcome = ctx.subscribe_to(first, first_sink);
487        if matches!(
488            first_outcome,
489            crate::producer::SubscribeOutcome::Dead { .. }
490        ) {
491            let core_first_dead = em.clone();
492            let mut should_complete = false;
493            let mut pending_to_emit: Vec<HandleId> = Vec::new();
494            {
495                let mut s = state.borrow_mut();
496                if !s.terminated && s.phase == 0 {
497                    s.phase = 1;
498                    // Drain pending second-DATA buffered during phase 0.
499                    pending_to_emit.extend(s.pending.drain(..));
500                    // If second already completed (or was Dead), self-Complete.
501                    if s.second_completed && !s.terminated {
502                        s.terminated = true;
503                        should_complete = true;
504                    }
505                }
506            }
507            for h in pending_to_emit {
508                core_first_dead.emit(producer_id, h);
509            }
510            if should_complete {
511                core_first_dead.complete(producer_id);
512            }
513        }
514    });
515
516    let fn_id = binding.register_producer_build(build);
517    core.register_producer(fn_id)
518        .expect("invariant: register_producer has no deps; no error variants reachable")
519}
520
521// =====================================================================
522// race — first source to emit DATA wins; losers are ignored
523// =====================================================================
524
525struct RaceState {
526    /// Index of the winning source, or `None` if no winner yet.
527    winner: Option<usize>,
528    /// Per-source completed flags. When all complete without a winner,
529    /// the producer completes (P4: no-winner all-complete termination).
530    completed: Vec<bool>,
531    terminated: bool,
532}
533
534impl RaceState {
535    fn new(n: usize) -> Self {
536        Self {
537            winner: None,
538            completed: vec![false; n],
539            terminated: false,
540        }
541    }
542}
543
544/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
545/// emit DATA wins. Subsequent traffic from the winner is forwarded;
546/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
547/// but their sink callbacks short-circuit). Saves the dynamic
548/// rewiring cost of explicitly unsubscribing losers.
549///
550/// Empty source list completes immediately. Single source is
551/// identity-passthrough.
552/// # Errors
553///
554/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
555/// (R5.7.x — race requires ≥1 source; no-winner-possible rejected).
556pub fn race(
557    core: &Core,
558    binding: &Arc<dyn ProducerBinding>,
559    sources: Vec<NodeId>,
560) -> Result<NodeId, OperatorFactoryError> {
561    // R5.7.x — race requires ≥1 source. Mirrors `combine::combine`.
562    if sources.is_empty() {
563        return Err(OperatorFactoryError::EmptySources);
564    }
565    let n = sources.len();
566    // Weak captures break the producer-build Arc cycle (see `zip` doc).
567
568    let build = Box::new(move |ctx: ProducerCtx<'_>| {
569        let producer_id = ctx.node_id();
570        let binding_clone = ctx.core().binding();
571        let em = ctx.emitter();
572        // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
573        let state: Rc<RefCell<RaceState>> = Rc::new(RefCell::new(RaceState::new(n)));
574
575        for (idx, &source) in sources.iter().enumerate() {
576            let state_inner = state.clone();
577            let core_inner = em.clone();
578            let binding_inner = binding_clone.clone();
579            let sink: Sink = Rc::new(move |msgs| {
580                enum Action {
581                    Emit(HandleId),
582                    Complete,
583                    Error(HandleId),
584                }
585                let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
586                {
587                    let mut s = state_inner.borrow_mut();
588                    if s.terminated {
589                        return;
590                    }
591                    // Tier-based dispatch (canonical §4.2).
592                    for m in msgs {
593                        match m.tier() {
594                            3 => {
595                                if let Some(h) = m.payload_handle() {
596                                    // P3: check s.winner live each iteration —
597                                    // this source may have just become the winner
598                                    // earlier in this batch.
599                                    if s.winner.is_none() {
600                                        s.winner = Some(idx);
601                                        binding_inner.retain_handle(h);
602                                        actions.push(Action::Emit(h));
603                                    } else if s.winner == Some(idx) {
604                                        binding_inner.retain_handle(h);
605                                        actions.push(Action::Emit(h));
606                                    }
607                                    // else: loser DATA — ignore.
608                                }
609                                // else: Resolved on a source — no action.
610                            }
611                            5 => {
612                                if let Some(h) = m.payload_handle() {
613                                    // Error — from any source pre-winner OR from
614                                    // the winner cascade; loser errors post-winner
615                                    // are ignored.
616                                    if (s.winner.is_none() || s.winner == Some(idx))
617                                        && !s.terminated
618                                    {
619                                        s.terminated = true;
620                                        binding_inner.retain_handle(h);
621                                        actions.push(Action::Error(h));
622                                    }
623                                } else {
624                                    // Complete
625                                    s.completed[idx] = true;
626                                    if s.winner == Some(idx) && !s.terminated {
627                                        s.terminated = true;
628                                        actions.push(Action::Complete);
629                                    } else if s.winner.is_none()
630                                        && s.completed.iter().all(|&c| c)
631                                        && !s.terminated
632                                    {
633                                        // P4: all sources completed without a
634                                        // winner — terminate the producer.
635                                        s.terminated = true;
636                                        actions.push(Action::Complete);
637                                    }
638                                    // else: loser complete — ignore.
639                                }
640                            }
641                            _ => {} // Tiers 0/1/2/4/6 — no action.
642                        }
643                    }
644                }
645                for action in actions {
646                    match action {
647                        Action::Emit(h) => core_inner.emit(producer_id, h),
648                        Action::Complete => core_inner.complete(producer_id),
649                        Action::Error(h) => core_inner.error(producer_id, h),
650                    }
651                }
652            });
653            // F2 /qa: on Dead, treat as `completed[idx] = true`. If
654            // all sources are now completed without a winner, self-
655            // Complete (matches the P4 "all completed no winner"
656            // branch in the per-source sink above).
657            let outcome = ctx.subscribe_to(source, sink);
658            if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
659                let core_dead = em.clone();
660                let mut should_complete = false;
661                {
662                    let mut s = state.borrow_mut();
663                    if !s.terminated && s.winner.is_none() {
664                        s.completed[idx] = true;
665                        if s.completed.iter().all(|&c| c) {
666                            s.terminated = true;
667                            should_complete = true;
668                        }
669                    }
670                }
671                if should_complete {
672                    core_dead.complete(producer_id);
673                }
674            }
675        }
676    });
677
678    let fn_id = binding.register_producer_build(build);
679    Ok(core
680        .register_producer(fn_id)
681        .expect("invariant: register_producer has no deps; no error variants reachable"))
682}
683
684// =====================================================================
685// takeUntil — terminate when notifier emits DATA
686// =====================================================================
687
688struct TakeUntilState {
689    terminated: bool,
690}
691
692impl TakeUntilState {
693    fn new() -> Self {
694        Self { terminated: false }
695    }
696}
697
698/// `take_until(source, notifier)` — forward DATA from `source` until
699/// `notifier` emits its first DATA, then terminate the producer with
700/// COMPLETE. Errors from either source cascade. Source COMPLETE
701/// terminates the producer.
702///
703/// Notifier DATA is consumed but never forwarded (zero-FFI on the
704/// notifier path — we don't dereference its payload, just use the
705/// emission as a signal).
706#[must_use]
707pub fn take_until(
708    core: &Core,
709    binding: &Arc<dyn ProducerBinding>,
710    source: NodeId,
711    notifier: NodeId,
712) -> NodeId {
713    // Weak captures break the producer-build Arc cycle (see `zip` doc).
714
715    let build = Box::new(move |ctx: ProducerCtx<'_>| {
716        let producer_id = ctx.node_id();
717        let binding_clone = ctx.core().binding();
718        let em = ctx.emitter();
719        let state: Rc<RefCell<TakeUntilState>> = Rc::new(RefCell::new(TakeUntilState::new()));
720
721        // Source sink: forward DATA, propagate terminals.
722        let state_for_source = state.clone();
723        let core_for_source = em.clone();
724        let binding_for_source = binding_clone.clone();
725        let source_sink: Sink = Rc::new(move |msgs| {
726            enum Action {
727                Emit(HandleId),
728                Complete,
729                Error(HandleId),
730            }
731            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
732            {
733                let mut s = state_for_source.borrow_mut();
734                if s.terminated {
735                    return;
736                }
737                // Tier-based dispatch (canonical §4.2).
738                for m in msgs {
739                    match m.tier() {
740                        3 => {
741                            if let Some(h) = m.payload_handle() {
742                                binding_for_source.retain_handle(h);
743                                actions.push(Action::Emit(h));
744                            }
745                            // else: Resolved on source — no action.
746                        }
747                        5 => {
748                            if let Some(h) = m.payload_handle() {
749                                // Error
750                                if !s.terminated {
751                                    s.terminated = true;
752                                    binding_for_source.retain_handle(h);
753                                    actions.push(Action::Error(h));
754                                }
755                            } else {
756                                // Complete
757                                if !s.terminated {
758                                    s.terminated = true;
759                                    actions.push(Action::Complete);
760                                }
761                            }
762                        }
763                        _ => {} // Tiers 0/1/2/4/6 — no action.
764                    }
765                }
766            }
767            for action in actions {
768                match action {
769                    Action::Emit(h) => core_for_source.emit(producer_id, h),
770                    Action::Complete => core_for_source.complete(producer_id),
771                    Action::Error(h) => core_for_source.error(producer_id, h),
772                }
773            }
774        });
775        // F2 /qa: Dead `source` → self-Complete (source is permanently
776        // over, so take_until has nothing to forward and the producer
777        // is finished).
778        let source_outcome = ctx.subscribe_to(source, source_sink);
779        if matches!(
780            source_outcome,
781            crate::producer::SubscribeOutcome::Dead { .. }
782        ) {
783            let core_dead = em.clone();
784            let mut should_complete = false;
785            {
786                let mut s = state.borrow_mut();
787                if !s.terminated {
788                    s.terminated = true;
789                    should_complete = true;
790                }
791            }
792            if should_complete {
793                core_dead.complete(producer_id);
794            }
795        }
796
797        // Notifier sink: any DATA → terminate; ERROR → cascade.
798        let state_for_notifier = state.clone();
799        let core_for_notifier = em.clone();
800        let binding_for_notifier = binding_clone.clone();
801        let notifier_sink: Sink = Rc::new(move |msgs| {
802            enum Action {
803                Complete,
804                Error(HandleId),
805            }
806            let mut action: Option<Action> = None;
807            {
808                let mut s = state_for_notifier.borrow_mut();
809                if s.terminated {
810                    return;
811                }
812                // Tier-based dispatch (canonical §4.2).
813                for m in msgs {
814                    match m.tier() {
815                        3 => {
816                            // Any DATA on notifier → complete the producer.
817                            // (Resolved alone — no payload — no action.)
818                            // We don't emit notifier DATA downstream, so we
819                            // don't even need to extract the handle.
820                            if m.payload_handle().is_some() && !s.terminated {
821                                s.terminated = true;
822                                action = Some(Action::Complete);
823                                break;
824                            }
825                        }
826                        5 => {
827                            // Error: cascade. Complete (payload_handle.is_none())
828                            // without prior DATA: nothing to do — source
829                            // continues independently.
830                            if let Some(h) = m.payload_handle() {
831                                if !s.terminated {
832                                    s.terminated = true;
833                                    binding_for_notifier.retain_handle(h);
834                                    action = Some(Action::Error(h));
835                                    break;
836                                }
837                            }
838                        }
839                        _ => {} // Tiers 0/1/2/4/6 — no action.
840                    }
841                }
842            }
843            if let Some(a) = action {
844                match a {
845                    Action::Complete => core_for_notifier.complete(producer_id),
846                    Action::Error(h) => core_for_notifier.error(producer_id, h),
847                }
848            }
849        });
850        // F2 /qa: Dead `notifier` → ignore. Notifier signal can never
851        // fire, so take_until reduces to a passthrough of source. The
852        // source's own Complete/Error will terminate the producer
853        // normally; if source is also Dead, the source-Dead branch
854        // above already self-Completed.
855        let _ = ctx.subscribe_to(notifier, notifier_sink);
856    });
857
858    let fn_id = binding.register_producer_build(build);
859    core.register_producer(fn_id)
860        .expect("invariant: register_producer has no deps; no error variants reachable")
861}