Skip to main content

graphrefly_operators/
ops_impl.rs

1//! Concrete implementations of the four subscription-managed combinators
2//! (zip / concat / race / takeUntil). Built on the
3//! [`super::producer::ProducerCtx`] substrate.
4
5// Each sink closure runs a small phase-1/phase-2 dance (lock state,
6// collect actions, drop lock, replay actions). The match-then-if
7// structure inside phase 1 is intentional for readability; collapsing
8// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
9#![allow(clippy::collapsible_if, clippy::collapsible_match)]
10// `zip` is genuinely large (multi-arity tuple-pack with terminal-
11// cascade handling). Splitting would obscure the concurrent state-
12// machine logic across helpers without clear benefit.
13#![allow(clippy::too_many_lines)]
14
15use std::collections::VecDeque;
16use std::sync::{Arc, Mutex, Weak};
17
18use graphrefly_core::{Core, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use super::error::OperatorFactoryError;
22use super::producer::{ProducerBinding, ProducerCtx};
23
24// =====================================================================
25// zip — pair handles N-wise across N sources
26// =====================================================================
27
28/// Per-zip-node state: one FIFO queue per source, plus a flag for each
29/// source's terminal. Lives behind `Arc<Mutex<_>>` captured by the
30/// build + sink closures.
31struct ZipState {
32    queues: Vec<VecDeque<HandleId>>,
33    completed: Vec<bool>,
34    errored: bool,
35    terminated: bool,
36}
37
38impl ZipState {
39    fn new(n: usize) -> Self {
40        Self {
41            queues: (0..n).map(|_| VecDeque::new()).collect(),
42            completed: vec![false; n],
43            errored: false,
44            terminated: false,
45        }
46    }
47}
48
49/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
50/// tuple, repeat. Models RxJS / TS `zip`:
51///
52/// - Each upstream DATA pushes into that source's per-source queue.
53/// - When **every** queue has at least one entry, pop one from each,
54///   pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
55///   and emit on the producer.
56/// - On any source's COMPLETE: if its queue is empty, terminate the
57///   producer with COMPLETE. Otherwise continue draining; terminate
58///   when this source's queue becomes empty (zip can't produce a
59///   tuple without input from every source).
60/// - On any source's ERROR: terminate the producer with the same
61///   ERROR (first error wins, like merge per Slice C-2 D022).
62///
63/// Empty source list (`n == 0`) emits a single empty-tuple event then
64/// completes. Single source (`n == 1`) is identity-passthrough.
65///
66/// # Refcount discipline
67///
68/// Each upstream DATA handle is `retain_handle`-bumped before being
69/// pushed onto a queue (the inbound message's payload retain belongs
70/// to the wave-end-flush release path; we take our own share for the
71/// queue). On pop, component handles are passed to `pack_tuple` which
72/// must NOT consume or release them — the caller (zip) retains
73/// ownership throughout the call and releases each component handle's
74/// queue share after `pack_tuple` returns. The returned tuple handle
75/// has a pre-bumped retain (binding convention per D020 doc on
76/// [`BindingBoundary::pack_tuple`]).
77/// # Errors
78///
79/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
80/// (R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).
81pub fn zip(
82    core: &Core,
83    binding: &Arc<dyn ProducerBinding>,
84    sources: Vec<NodeId>,
85    pack_fn_id: graphrefly_core::FnId,
86) -> Result<NodeId, OperatorFactoryError> {
87    // R5.7.x — zip requires ≥1 source. Mirrors `combine::combine` (which
88    // raises the same factory-shape invariant) so all bindings can route
89    // through `operator_factory_error_to_napi` / equivalent.
90    if sources.is_empty() {
91        return Err(OperatorFactoryError::EmptySources);
92    }
93    let n = sources.len();
94    // Weak-Arc captures break the BenchBinding → registry → producer_builds
95    // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
96    // pin the entire graph state when the host BenchCore drops with active
97    // producer registrations. See `Core::weak_handle` doc + Slice Y close.
98    let core_weak = core.weak_handle();
99    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
100
101    let build = Box::new(move |ctx: ProducerCtx<'_>| {
102        let producer_id = ctx.node_id();
103        let (Some(core_for_build), Some(binding_clone)) =
104            (core_weak.upgrade(), binding_weak.upgrade())
105        else {
106            // Host Core / binding already dropped — no-op.
107            return;
108        };
109        // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
110        let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
111
112        for (idx, &source) in sources.iter().enumerate() {
113            let state_inner = state.clone();
114            // Sinks live only while the producer is active (cleared via
115            // producer_deactivate on last-subscriber unsubscribe), so they
116            // can safely capture strong refs cloned from the upgraded weaks.
117            let core_inner = core_for_build.clone();
118            let binding_inner = binding_clone.clone();
119            let sink: Sink = Arc::new(move |msgs| {
120                // Phase 1 (lock held): mutate queues + collect actions.
121                // Phase 2 (lock released): pack tuples + re-enter Core.
122                enum PostLockAction {
123                    /// Pack popped handles into a tuple, release components, emit.
124                    PackAndEmit(Vec<HandleId>),
125                    Complete,
126                    Error(HandleId),
127                }
128                let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
129                // Handles to release after the lock drops (P2:
130                // drain queues on terminate to avoid handle leaks).
131                let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
132                {
133                    let mut s = state_inner.lock().unwrap();
134                    if s.terminated {
135                        return;
136                    }
137                    // Tier-based dispatch (canonical §4.2; see
138                    // `feedback_use_tier_for_signal_routing.md`). Tier 3
139                    // payload_handle.is_some() = DATA; tier 5
140                    // payload_handle.is_some() = ERROR else COMPLETE.
141                    for m in msgs {
142                        match m.tier() {
143                            3 => {
144                                if let Some(h) = m.payload_handle() {
145                                    binding_inner.retain_handle(h);
146                                    s.queues[idx].push_back(h);
147                                    // Collect complete tuples — pack_tuple runs
148                                    // after the lock drops (P5).
149                                    while s.queues.iter().all(|q| !q.is_empty()) {
150                                        let popped: Vec<HandleId> = s
151                                            .queues
152                                            .iter_mut()
153                                            .map(|q| q.pop_front().unwrap())
154                                            .collect();
155                                        post_actions.push(PostLockAction::PackAndEmit(popped));
156                                    }
157                                }
158                                // else: Resolved on a source — no action.
159                            }
160                            5 => {
161                                if let Some(h) = m.payload_handle() {
162                                    // Error
163                                    if !s.errored && !s.terminated {
164                                        s.errored = true;
165                                        s.terminated = true;
166                                        binding_inner.retain_handle(h);
167                                        // P2: release all remaining queued handles.
168                                        for q in &mut s.queues {
169                                            to_release.extend(q.drain(..));
170                                        }
171                                        post_actions.push(PostLockAction::Error(h));
172                                    }
173                                } else {
174                                    // Complete
175                                    s.completed[idx] = true;
176                                    // If this source's queue is empty, no more
177                                    // tuples from it — terminate.
178                                    if s.queues[idx].is_empty() && !s.terminated {
179                                        s.terminated = true;
180                                        // P2: release all remaining queued handles.
181                                        for q in &mut s.queues {
182                                            to_release.extend(q.drain(..));
183                                        }
184                                        post_actions.push(PostLockAction::Complete);
185                                    }
186                                }
187                            }
188                            _ => {} // Tiers 0/1/2/4/6 — no action.
189                        }
190                    }
191                }
192                // Release leaked queue handles outside the lock.
193                for h in to_release {
194                    binding_inner.release_handle(h);
195                }
196                // Phase 2 (lock released): pack tuples + re-enter Core.
197                // P5: pack_tuple runs outside the per-zip lock to avoid
198                // deadlock if the binding's pack_tuple re-enters.
199                for action in post_actions {
200                    match action {
201                        PostLockAction::PackAndEmit(popped) => {
202                            let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
203                            for h in &popped {
204                                binding_inner.release_handle(*h);
205                            }
206                            core_inner.emit_or_defer(producer_id, tuple_h);
207                        }
208                        PostLockAction::Complete => core_inner.complete_or_defer(producer_id),
209                        PostLockAction::Error(h) => core_inner.error_or_defer(producer_id, h),
210                    }
211                }
212            });
213            // F2 /qa: on Dead, synthesize the per-source Complete in
214            // zip's state machine — `s.completed[idx] = true` and (if
215            // queue is empty, which it always is at activation since
216            // DATA hasn't yet flowed) self-Complete the producer.
217            // Pre-F2 a Dead source left zip waiting on a queue that
218            // would never fill → silent wedge.
219            let outcome = ctx.subscribe_to(source, sink);
220            if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
221                let core_dead = core_for_build.clone();
222                let binding_dead = binding_clone.clone();
223                let mut should_complete = false;
224                let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
225                {
226                    let mut s = state.lock().unwrap();
227                    if !s.terminated {
228                        s.completed[idx] = true;
229                        if s.queues[idx].is_empty() {
230                            s.terminated = true;
231                            for q in &mut s.queues {
232                                to_release.extend(q.drain(..));
233                            }
234                            should_complete = true;
235                        }
236                    }
237                }
238                for h in to_release {
239                    binding_dead.release_handle(h);
240                }
241                if should_complete {
242                    core_dead.complete_or_defer(producer_id);
243                }
244            }
245        }
246    });
247
248    let fn_id = binding.register_producer_build(build);
249    Ok(core
250        .register_producer(fn_id)
251        .expect("invariant: register_producer has no deps; no error variants reachable"))
252}
253
254// =====================================================================
255// concat — sequentially forward `first` then `second`
256// =====================================================================
257
258struct ConcatState {
259    /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
260    phase: u8,
261    /// Buffered DATA from `second` that arrived during phase 0 (before
262    /// `first` completed). Drained on phase transition.
263    pending: VecDeque<HandleId>,
264    /// Set to true if `second` completed during phase 0. On phase
265    /// transition, after draining `pending`, concat self-completes
266    /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
267    second_completed: bool,
268    terminated: bool,
269}
270
271impl ConcatState {
272    fn new() -> Self {
273        Self {
274            phase: 0,
275            pending: VecDeque::new(),
276            second_completed: false,
277            terminated: false,
278        }
279    }
280}
281
282/// `concat(first, second)` — forward DATA from `first` until it
283/// completes, then drain any DATA `second` emitted during phase 1
284/// (buffered) and continue forwarding `second`. ERROR from either
285/// source terminates the producer with the same ERROR.
286///
287/// Subscribes to BOTH sources at activation time (matches TS impl in
288/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
289/// race after `first` completes). DATA from `second` during phase 0
290/// is buffered, not forwarded.
291#[must_use]
292pub fn concat(
293    core: &Core,
294    binding: &Arc<dyn ProducerBinding>,
295    first: NodeId,
296    second: NodeId,
297) -> NodeId {
298    // Weak captures break the producer-build Arc cycle (see `zip` doc).
299    let core_weak = core.weak_handle();
300    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
301
302    let build = Box::new(move |ctx: ProducerCtx<'_>| {
303        let producer_id = ctx.node_id();
304        let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
305        else {
306            return;
307        };
308        let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
309
310        // Subscribe to second FIRST so phase-0 DATA buffering catches
311        // synchronous initial emissions. Sinks capture strong refs cloned
312        // from the upgraded weaks; sink lifetime tied to producer activation.
313        let state_for_second = state.clone();
314        let core_for_second = core_clone.clone();
315        let binding_for_second = binding_clone.clone();
316        let second_sink: Sink = Arc::new(move |msgs| {
317            enum Action {
318                Emit(HandleId),
319                Complete,
320                Error(HandleId),
321            }
322            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
323            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
324            {
325                let mut s = state_for_second.lock().unwrap();
326                if s.terminated {
327                    return;
328                }
329                // Tier-based dispatch (canonical §4.2).
330                for m in msgs {
331                    match m.tier() {
332                        3 => {
333                            if let Some(h) = m.payload_handle() {
334                                if s.phase == 0 {
335                                    // Buffer for later drain.
336                                    binding_for_second.retain_handle(h);
337                                    s.pending.push_back(h);
338                                } else {
339                                    binding_for_second.retain_handle(h);
340                                    actions.push(Action::Emit(h));
341                                }
342                            }
343                            // else: Resolved on second source — no action.
344                        }
345                        5 => {
346                            if let Some(h) = m.payload_handle() {
347                                // Error
348                                if !s.terminated {
349                                    s.terminated = true;
350                                    binding_for_second.retain_handle(h);
351                                    // P2: release buffered pending handles.
352                                    to_release.extend(s.pending.drain(..));
353                                    actions.push(Action::Error(h));
354                                }
355                            } else {
356                                // Complete
357                                if s.phase == 1 && !s.terminated {
358                                    s.terminated = true;
359                                    actions.push(Action::Complete);
360                                } else if s.phase == 0 {
361                                    // D041 / D4 fix: remember that second
362                                    // completed during phase 0. On phase
363                                    // transition, after draining `pending`,
364                                    // first_sink will self-complete
365                                    // (second's Complete fires once and
366                                    // won't be re-observed).
367                                    s.second_completed = true;
368                                }
369                            }
370                        }
371                        _ => {} // Tiers 0/1/2/4/6 — no action.
372                    }
373                }
374            }
375            for h in to_release {
376                binding_for_second.release_handle(h);
377            }
378            for action in actions {
379                match action {
380                    Action::Emit(h) => core_for_second.emit_or_defer(producer_id, h),
381                    Action::Complete => core_for_second.complete_or_defer(producer_id),
382                    Action::Error(h) => core_for_second.error_or_defer(producer_id, h),
383                }
384            }
385        });
386        // F2 /qa: Dead `second` is observed via `second_completed` flag.
387        // First-Complete drains pending and self-Completes if second
388        // already completed; same logic handles Dead second.
389        let second_outcome = ctx.subscribe_to(second, second_sink);
390        if matches!(
391            second_outcome,
392            crate::producer::SubscribeOutcome::Dead { .. }
393        ) {
394            let mut s = state.lock().unwrap();
395            s.second_completed = true;
396            // No additional action — the first-Complete path or first-Dead
397            // path below will trigger producer-Complete.
398        }
399
400        let state_for_first = state.clone();
401        let core_for_first = core_clone.clone();
402        let binding_for_first = binding_clone.clone();
403        let first_sink: Sink = Arc::new(move |msgs| {
404            // first.Complete triggers the phase transition (handled
405            // via `s.phase = 1` + draining pending into `actions`),
406            // and may also self-complete the producer if `second`
407            // already completed during phase 0 (D041 / D4 fix).
408            enum Action {
409                Emit(HandleId),
410                Complete,
411                Error(HandleId),
412            }
413            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
414            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
415            {
416                let mut s = state_for_first.lock().unwrap();
417                if s.terminated {
418                    return;
419                }
420                if s.phase != 0 {
421                    return; // first is done; ignore stale messages.
422                }
423                for m in msgs {
424                    // Slice E /qa: defensive per-iteration `terminated`
425                    // guard. The outer guard at the top of the lock
426                    // block catches a `terminated` set BEFORE this
427                    // batch arrived; this per-iteration check catches
428                    // the case where an earlier message in the SAME
429                    // batch transitioned us terminal (e.g., post-
430                    // Complete the phase moved to 1, but a defensively-
431                    // emitted stale `[Data]` later in the same batch
432                    // would otherwise queue a useless retain that
433                    // `core.emit` would discard on a terminal
434                    // producer).
435                    if s.terminated || s.phase != 0 {
436                        break;
437                    }
438                    // Tier-based dispatch (canonical §4.2).
439                    match m.tier() {
440                        3 => {
441                            if let Some(h) = m.payload_handle() {
442                                binding_for_first.retain_handle(h);
443                                actions.push(Action::Emit(h));
444                            }
445                            // else: Resolved on first source — no action.
446                        }
447                        5 => {
448                            if let Some(h) = m.payload_handle() {
449                                // Error
450                                if !s.terminated {
451                                    s.terminated = true;
452                                    binding_for_first.retain_handle(h);
453                                    // P2: release buffered pending handles.
454                                    to_release.extend(s.pending.drain(..));
455                                    actions.push(Action::Error(h));
456                                }
457                            } else {
458                                // Complete — phase transition: drain pending
459                                // second-data, then continue forwarding from
460                                // second.
461                                s.phase = 1;
462                                // Pending handles already retained at buffer time.
463                                for h in s.pending.drain(..) {
464                                    actions.push(Action::Emit(h));
465                                }
466                                // D041 / D4 fix: if second already completed
467                                // during phase 0, self-complete now (its
468                                // Complete fired once and won't re-fire).
469                                if s.second_completed && !s.terminated {
470                                    s.terminated = true;
471                                    actions.push(Action::Complete);
472                                }
473                            }
474                        }
475                        _ => {} // Tiers 0/1/2/4/6 — no action.
476                    }
477                }
478            }
479            for h in to_release {
480                binding_for_first.release_handle(h);
481            }
482            for action in actions {
483                match action {
484                    Action::Emit(h) => core_for_first.emit_or_defer(producer_id, h),
485                    Action::Complete => core_for_first.complete_or_defer(producer_id),
486                    Action::Error(h) => core_for_first.error_or_defer(producer_id, h),
487                }
488            }
489        });
490        // F2 /qa: Dead `first` triggers the phase transition immediately
491        // (treat as first-Complete). If `second` is also dead (or already
492        // completed in phase 0), self-Complete; else continue forwarding
493        // pending+future from second.
494        let first_outcome = ctx.subscribe_to(first, first_sink);
495        if matches!(
496            first_outcome,
497            crate::producer::SubscribeOutcome::Dead { .. }
498        ) {
499            let core_first_dead = core_clone.clone();
500            let mut should_complete = false;
501            let mut pending_to_emit: Vec<HandleId> = Vec::new();
502            {
503                let mut s = state.lock().unwrap();
504                if !s.terminated && s.phase == 0 {
505                    s.phase = 1;
506                    // Drain pending second-DATA buffered during phase 0.
507                    pending_to_emit.extend(s.pending.drain(..));
508                    // If second already completed (or was Dead), self-Complete.
509                    if s.second_completed && !s.terminated {
510                        s.terminated = true;
511                        should_complete = true;
512                    }
513                }
514            }
515            for h in pending_to_emit {
516                core_first_dead.emit_or_defer(producer_id, h);
517            }
518            if should_complete {
519                core_first_dead.complete_or_defer(producer_id);
520            }
521        }
522    });
523
524    let fn_id = binding.register_producer_build(build);
525    core.register_producer(fn_id)
526        .expect("invariant: register_producer has no deps; no error variants reachable")
527}
528
529// =====================================================================
530// race — first source to emit DATA wins; losers are ignored
531// =====================================================================
532
533struct RaceState {
534    /// Index of the winning source, or `None` if no winner yet.
535    winner: Option<usize>,
536    /// Per-source completed flags. When all complete without a winner,
537    /// the producer completes (P4: no-winner all-complete termination).
538    completed: Vec<bool>,
539    terminated: bool,
540}
541
542impl RaceState {
543    fn new(n: usize) -> Self {
544        Self {
545            winner: None,
546            completed: vec![false; n],
547            terminated: false,
548        }
549    }
550}
551
552/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
553/// emit DATA wins. Subsequent traffic from the winner is forwarded;
554/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
555/// but their sink callbacks short-circuit). Saves the dynamic
556/// rewiring cost of explicitly unsubscribing losers.
557///
558/// Empty source list completes immediately. Single source is
559/// identity-passthrough.
560/// # Errors
561///
562/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
563/// (R5.7.x — race requires ≥1 source; no-winner-possible rejected).
564pub fn race(
565    core: &Core,
566    binding: &Arc<dyn ProducerBinding>,
567    sources: Vec<NodeId>,
568) -> Result<NodeId, OperatorFactoryError> {
569    // R5.7.x — race requires ≥1 source. Mirrors `combine::combine`.
570    if sources.is_empty() {
571        return Err(OperatorFactoryError::EmptySources);
572    }
573    let n = sources.len();
574    // Weak captures break the producer-build Arc cycle (see `zip` doc).
575    let core_weak = core.weak_handle();
576    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
577
578    let build = Box::new(move |ctx: ProducerCtx<'_>| {
579        let producer_id = ctx.node_id();
580        let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
581        else {
582            return;
583        };
584        // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
585        let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
586
587        for (idx, &source) in sources.iter().enumerate() {
588            let state_inner = state.clone();
589            let core_inner = core_clone.clone();
590            let binding_inner = binding_clone.clone();
591            let sink: Sink = Arc::new(move |msgs| {
592                enum Action {
593                    Emit(HandleId),
594                    Complete,
595                    Error(HandleId),
596                }
597                let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
598                {
599                    let mut s = state_inner.lock().unwrap();
600                    if s.terminated {
601                        return;
602                    }
603                    // Tier-based dispatch (canonical §4.2).
604                    for m in msgs {
605                        match m.tier() {
606                            3 => {
607                                if let Some(h) = m.payload_handle() {
608                                    // P3: check s.winner live each iteration —
609                                    // this source may have just become the winner
610                                    // earlier in this batch.
611                                    if s.winner.is_none() {
612                                        s.winner = Some(idx);
613                                        binding_inner.retain_handle(h);
614                                        actions.push(Action::Emit(h));
615                                    } else if s.winner == Some(idx) {
616                                        binding_inner.retain_handle(h);
617                                        actions.push(Action::Emit(h));
618                                    }
619                                    // else: loser DATA — ignore.
620                                }
621                                // else: Resolved on a source — no action.
622                            }
623                            5 => {
624                                if let Some(h) = m.payload_handle() {
625                                    // Error — from any source pre-winner OR from
626                                    // the winner cascade; loser errors post-winner
627                                    // are ignored.
628                                    if (s.winner.is_none() || s.winner == Some(idx))
629                                        && !s.terminated
630                                    {
631                                        s.terminated = true;
632                                        binding_inner.retain_handle(h);
633                                        actions.push(Action::Error(h));
634                                    }
635                                } else {
636                                    // Complete
637                                    s.completed[idx] = true;
638                                    if s.winner == Some(idx) && !s.terminated {
639                                        s.terminated = true;
640                                        actions.push(Action::Complete);
641                                    } else if s.winner.is_none()
642                                        && s.completed.iter().all(|&c| c)
643                                        && !s.terminated
644                                    {
645                                        // P4: all sources completed without a
646                                        // winner — terminate the producer.
647                                        s.terminated = true;
648                                        actions.push(Action::Complete);
649                                    }
650                                    // else: loser complete — ignore.
651                                }
652                            }
653                            _ => {} // Tiers 0/1/2/4/6 — no action.
654                        }
655                    }
656                }
657                for action in actions {
658                    match action {
659                        Action::Emit(h) => core_inner.emit_or_defer(producer_id, h),
660                        Action::Complete => core_inner.complete_or_defer(producer_id),
661                        Action::Error(h) => core_inner.error_or_defer(producer_id, h),
662                    }
663                }
664            });
665            // F2 /qa: on Dead, treat as `completed[idx] = true`. If
666            // all sources are now completed without a winner, self-
667            // Complete (matches the P4 "all completed no winner"
668            // branch in the per-source sink above).
669            let outcome = ctx.subscribe_to(source, sink);
670            if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
671                let core_dead = core_clone.clone();
672                let mut should_complete = false;
673                {
674                    let mut s = state.lock().unwrap();
675                    if !s.terminated && s.winner.is_none() {
676                        s.completed[idx] = true;
677                        if s.completed.iter().all(|&c| c) {
678                            s.terminated = true;
679                            should_complete = true;
680                        }
681                    }
682                }
683                if should_complete {
684                    core_dead.complete_or_defer(producer_id);
685                }
686            }
687        }
688    });
689
690    let fn_id = binding.register_producer_build(build);
691    Ok(core
692        .register_producer(fn_id)
693        .expect("invariant: register_producer has no deps; no error variants reachable"))
694}
695
696// =====================================================================
697// takeUntil — terminate when notifier emits DATA
698// =====================================================================
699
700struct TakeUntilState {
701    terminated: bool,
702}
703
704impl TakeUntilState {
705    fn new() -> Self {
706        Self { terminated: false }
707    }
708}
709
710/// `take_until(source, notifier)` — forward DATA from `source` until
711/// `notifier` emits its first DATA, then terminate the producer with
712/// COMPLETE. Errors from either source cascade. Source COMPLETE
713/// terminates the producer.
714///
715/// Notifier DATA is consumed but never forwarded (zero-FFI on the
716/// notifier path — we don't dereference its payload, just use the
717/// emission as a signal).
718#[must_use]
719pub fn take_until(
720    core: &Core,
721    binding: &Arc<dyn ProducerBinding>,
722    source: NodeId,
723    notifier: NodeId,
724) -> NodeId {
725    // Weak captures break the producer-build Arc cycle (see `zip` doc).
726    let core_weak = core.weak_handle();
727    let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
728
729    let build = Box::new(move |ctx: ProducerCtx<'_>| {
730        let producer_id = ctx.node_id();
731        let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
732        else {
733            return;
734        };
735        let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
736
737        // Source sink: forward DATA, propagate terminals.
738        let state_for_source = state.clone();
739        let core_for_source = core_clone.clone();
740        let binding_for_source = binding_clone.clone();
741        let source_sink: Sink = Arc::new(move |msgs| {
742            enum Action {
743                Emit(HandleId),
744                Complete,
745                Error(HandleId),
746            }
747            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
748            {
749                let mut s = state_for_source.lock().unwrap();
750                if s.terminated {
751                    return;
752                }
753                // Tier-based dispatch (canonical §4.2).
754                for m in msgs {
755                    match m.tier() {
756                        3 => {
757                            if let Some(h) = m.payload_handle() {
758                                binding_for_source.retain_handle(h);
759                                actions.push(Action::Emit(h));
760                            }
761                            // else: Resolved on source — no action.
762                        }
763                        5 => {
764                            if let Some(h) = m.payload_handle() {
765                                // Error
766                                if !s.terminated {
767                                    s.terminated = true;
768                                    binding_for_source.retain_handle(h);
769                                    actions.push(Action::Error(h));
770                                }
771                            } else {
772                                // Complete
773                                if !s.terminated {
774                                    s.terminated = true;
775                                    actions.push(Action::Complete);
776                                }
777                            }
778                        }
779                        _ => {} // Tiers 0/1/2/4/6 — no action.
780                    }
781                }
782            }
783            for action in actions {
784                match action {
785                    Action::Emit(h) => core_for_source.emit_or_defer(producer_id, h),
786                    Action::Complete => core_for_source.complete_or_defer(producer_id),
787                    Action::Error(h) => core_for_source.error_or_defer(producer_id, h),
788                }
789            }
790        });
791        // F2 /qa: Dead `source` → self-Complete (source is permanently
792        // over, so take_until has nothing to forward and the producer
793        // is finished).
794        let source_outcome = ctx.subscribe_to(source, source_sink);
795        if matches!(
796            source_outcome,
797            crate::producer::SubscribeOutcome::Dead { .. }
798        ) {
799            let core_dead = core_clone.clone();
800            let mut should_complete = false;
801            {
802                let mut s = state.lock().unwrap();
803                if !s.terminated {
804                    s.terminated = true;
805                    should_complete = true;
806                }
807            }
808            if should_complete {
809                core_dead.complete_or_defer(producer_id);
810            }
811        }
812
813        // Notifier sink: any DATA → terminate; ERROR → cascade.
814        let state_for_notifier = state.clone();
815        let core_for_notifier = core_clone.clone();
816        let binding_for_notifier = binding_clone.clone();
817        let notifier_sink: Sink = Arc::new(move |msgs| {
818            enum Action {
819                Complete,
820                Error(HandleId),
821            }
822            let mut action: Option<Action> = None;
823            {
824                let mut s = state_for_notifier.lock().unwrap();
825                if s.terminated {
826                    return;
827                }
828                // Tier-based dispatch (canonical §4.2).
829                for m in msgs {
830                    match m.tier() {
831                        3 => {
832                            // Any DATA on notifier → complete the producer.
833                            // (Resolved alone — no payload — no action.)
834                            // We don't emit notifier DATA downstream, so we
835                            // don't even need to extract the handle.
836                            if m.payload_handle().is_some() && !s.terminated {
837                                s.terminated = true;
838                                action = Some(Action::Complete);
839                                break;
840                            }
841                        }
842                        5 => {
843                            // Error: cascade. Complete (payload_handle.is_none())
844                            // without prior DATA: nothing to do — source
845                            // continues independently.
846                            if let Some(h) = m.payload_handle() {
847                                if !s.terminated {
848                                    s.terminated = true;
849                                    binding_for_notifier.retain_handle(h);
850                                    action = Some(Action::Error(h));
851                                    break;
852                                }
853                            }
854                        }
855                        _ => {} // Tiers 0/1/2/4/6 — no action.
856                    }
857                }
858            }
859            if let Some(a) = action {
860                match a {
861                    Action::Complete => core_for_notifier.complete_or_defer(producer_id),
862                    Action::Error(h) => core_for_notifier.error_or_defer(producer_id, h),
863                }
864            }
865        });
866        // F2 /qa: Dead `notifier` → ignore. Notifier signal can never
867        // fire, so take_until reduces to a passthrough of source. The
868        // source's own Complete/Error will terminate the producer
869        // normally; if source is also Dead, the source-Dead branch
870        // above already self-Completed.
871        let _ = ctx.subscribe_to(notifier, notifier_sink);
872    });
873
874    let fn_id = binding.register_producer_build(build);
875    core.register_producer(fn_id)
876        .expect("invariant: register_producer has no deps; no error variants reachable")
877}