Skip to main content

graphrefly_operators/
ops_impl.rs

1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Concrete implementations of the four subscription-managed combinators
11//! (zip / concat / race / takeUntil). Built on the
12//! [`super::producer::ProducerCtx`] substrate.
13
14// Each sink closure runs a small phase-1/phase-2 dance (lock state,
15// collect actions, drop lock, replay actions). The match-then-if
16// structure inside phase 1 is intentional for readability; collapsing
17// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
18#![allow(clippy::collapsible_if, clippy::collapsible_match)]
19// `zip` is genuinely large (multi-arity tuple-pack with terminal-
20// cascade handling). Splitting would obscure the concurrent state-
21// machine logic across helpers without clear benefit.
22#![allow(clippy::too_many_lines)]
23
24use std::collections::VecDeque;
25use std::sync::{Arc, Mutex};
26
27use graphrefly_core::{Core, HandleId, NodeId, Sink};
28use smallvec::SmallVec;
29
30use super::error::OperatorFactoryError;
31use super::producer::{ProducerBinding, ProducerCtx};
32
33// =====================================================================
34// zip — pair handles N-wise across N sources
35// =====================================================================
36
37/// Per-zip-node state: one FIFO queue per source, plus a flag for each
38/// source's terminal. Lives behind `Arc<Mutex<_>>` captured by the
39/// build + sink closures.
40struct ZipState {
41    queues: Vec<VecDeque<HandleId>>,
42    completed: Vec<bool>,
43    errored: bool,
44    terminated: bool,
45}
46
47impl ZipState {
48    fn new(n: usize) -> Self {
49        Self {
50            queues: (0..n).map(|_| VecDeque::new()).collect(),
51            completed: vec![false; n],
52            errored: false,
53            terminated: false,
54        }
55    }
56}
57
58/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
59/// tuple, repeat. Models RxJS / TS `zip`:
60///
61/// - Each upstream DATA pushes into that source's per-source queue.
62/// - When **every** queue has at least one entry, pop one from each,
63///   pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
64///   and emit on the producer.
65/// - On any source's COMPLETE: if its queue is empty, terminate the
66///   producer with COMPLETE. Otherwise continue draining; terminate
67///   when this source's queue becomes empty (zip can't produce a
68///   tuple without input from every source).
69/// - On any source's ERROR: terminate the producer with the same
70///   ERROR (first error wins, like merge per Slice C-2 D022).
71///
72/// Empty source list (`n == 0`) emits a single empty-tuple event then
73/// completes. Single source (`n == 1`) is identity-passthrough.
74///
75/// # Refcount discipline
76///
77/// Each upstream DATA handle is `retain_handle`-bumped before being
78/// pushed onto a queue (the inbound message's payload retain belongs
79/// to the wave-end-flush release path; we take our own share for the
80/// queue). On pop, component handles are passed to `pack_tuple` which
81/// must NOT consume or release them — the caller (zip) retains
82/// ownership throughout the call and releases each component handle's
83/// queue share after `pack_tuple` returns. The returned tuple handle
84/// has a pre-bumped retain (binding convention per D020 doc on
85/// [`BindingBoundary::pack_tuple`]).
86/// # Errors
87///
88/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
89/// (R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).
90pub fn zip(
91    core: &Core,
92    binding: &Arc<dyn ProducerBinding>,
93    sources: Vec<NodeId>,
94    pack_fn_id: graphrefly_core::FnId,
95) -> Result<NodeId, OperatorFactoryError> {
96    // R5.7.x — zip requires ≥1 source. Mirrors `combine::combine` (which
97    // raises the same factory-shape invariant) so all bindings can route
98    // through `operator_factory_error_to_napi` / equivalent.
99    if sources.is_empty() {
100        return Err(OperatorFactoryError::EmptySources);
101    }
102    let n = sources.len();
103    // Weak-Arc captures break the BenchBinding → registry → producer_builds
104    // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
105    // pin the entire graph state when the host BenchCore drops with active
106    // producer registrations. See `Core::weak_handle` doc + Slice Y close.
107
108    let build = Box::new(move |ctx: ProducerCtx<'_>| {
109        let producer_id = ctx.node_id();
110        let binding_clone = ctx.core().binding();
111        let em = ctx.emitter();
112        // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
113        let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
114
115        for (idx, &source) in sources.iter().enumerate() {
116            let state_inner = state.clone();
117            // Sinks live only while the producer is active (cleared via
118            // producer_deactivate on last-subscriber unsubscribe), so they
119            // can safely capture strong refs cloned from the upgraded weaks.
120            let core_inner = em.clone();
121            let binding_inner = binding_clone.clone();
122            let sink: Sink = Arc::new(move |msgs| {
123                // Phase 1 (lock held): mutate queues + collect actions.
124                // Phase 2 (lock released): pack tuples + re-enter Core.
125                enum PostLockAction {
126                    /// Pack popped handles into a tuple, release components, emit.
127                    PackAndEmit(Vec<HandleId>),
128                    Complete,
129                    Error(HandleId),
130                }
131                let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
132                // Handles to release after the lock drops (P2:
133                // drain queues on terminate to avoid handle leaks).
134                let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
135                {
136                    let mut s = state_inner.lock().unwrap();
137                    if s.terminated {
138                        return;
139                    }
140                    // Tier-based dispatch (canonical §4.2; see
141                    // `feedback_use_tier_for_signal_routing.md`). Tier 3
142                    // payload_handle.is_some() = DATA; tier 5
143                    // payload_handle.is_some() = ERROR else COMPLETE.
144                    for m in msgs {
145                        match m.tier() {
146                            3 => {
147                                if let Some(h) = m.payload_handle() {
148                                    binding_inner.retain_handle(h);
149                                    s.queues[idx].push_back(h);
150                                    // Collect complete tuples — pack_tuple runs
151                                    // after the lock drops (P5).
152                                    while s.queues.iter().all(|q| !q.is_empty()) {
153                                        let popped: Vec<HandleId> = s
154                                            .queues
155                                            .iter_mut()
156                                            .map(|q| q.pop_front().unwrap())
157                                            .collect();
158                                        post_actions.push(PostLockAction::PackAndEmit(popped));
159                                    }
160                                }
161                                // else: Resolved on a source — no action.
162                            }
163                            5 => {
164                                if let Some(h) = m.payload_handle() {
165                                    // Error
166                                    if !s.errored && !s.terminated {
167                                        s.errored = true;
168                                        s.terminated = true;
169                                        binding_inner.retain_handle(h);
170                                        // P2: release all remaining queued handles.
171                                        for q in &mut s.queues {
172                                            to_release.extend(q.drain(..));
173                                        }
174                                        post_actions.push(PostLockAction::Error(h));
175                                    }
176                                } else {
177                                    // Complete
178                                    s.completed[idx] = true;
179                                    // If this source's queue is empty, no more
180                                    // tuples from it — terminate.
181                                    if s.queues[idx].is_empty() && !s.terminated {
182                                        s.terminated = true;
183                                        // P2: release all remaining queued handles.
184                                        for q in &mut s.queues {
185                                            to_release.extend(q.drain(..));
186                                        }
187                                        post_actions.push(PostLockAction::Complete);
188                                    }
189                                }
190                            }
191                            _ => {} // Tiers 0/1/2/4/6 — no action.
192                        }
193                    }
194                }
195                // Release leaked queue handles outside the lock.
196                for h in to_release {
197                    binding_inner.release_handle(h);
198                }
199                // Phase 2 (lock released): pack tuples + re-enter Core.
200                // P5: pack_tuple runs outside the per-zip lock to avoid
201                // deadlock if the binding's pack_tuple re-enters.
202                for action in post_actions {
203                    match action {
204                        PostLockAction::PackAndEmit(popped) => {
205                            let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
206                            for h in &popped {
207                                binding_inner.release_handle(*h);
208                            }
209                            core_inner.emit_or_defer(producer_id, tuple_h);
210                        }
211                        PostLockAction::Complete => core_inner.complete_or_defer(producer_id),
212                        PostLockAction::Error(h) => core_inner.error_or_defer(producer_id, h),
213                    }
214                }
215            });
216            // F2 /qa: on Dead, synthesize the per-source Complete in
217            // zip's state machine — `s.completed[idx] = true` and (if
218            // queue is empty, which it always is at activation since
219            // DATA hasn't yet flowed) self-Complete the producer.
220            // Pre-F2 a Dead source left zip waiting on a queue that
221            // would never fill → silent wedge.
222            let outcome = ctx.subscribe_to(source, sink);
223            if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
224                let core_dead = em.clone();
225                let binding_dead = binding_clone.clone();
226                let mut should_complete = false;
227                let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
228                {
229                    let mut s = state.lock().unwrap();
230                    if !s.terminated {
231                        s.completed[idx] = true;
232                        if s.queues[idx].is_empty() {
233                            s.terminated = true;
234                            for q in &mut s.queues {
235                                to_release.extend(q.drain(..));
236                            }
237                            should_complete = true;
238                        }
239                    }
240                }
241                for h in to_release {
242                    binding_dead.release_handle(h);
243                }
244                if should_complete {
245                    core_dead.complete_or_defer(producer_id);
246                }
247            }
248        }
249    });
250
251    let fn_id = binding.register_producer_build(build);
252    Ok(core
253        .register_producer(fn_id)
254        .expect("invariant: register_producer has no deps; no error variants reachable"))
255}
256
257// =====================================================================
258// concat — sequentially forward `first` then `second`
259// =====================================================================
260
261struct ConcatState {
262    /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
263    phase: u8,
264    /// Buffered DATA from `second` that arrived during phase 0 (before
265    /// `first` completed). Drained on phase transition.
266    pending: VecDeque<HandleId>,
267    /// Set to true if `second` completed during phase 0. On phase
268    /// transition, after draining `pending`, concat self-completes
269    /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
270    second_completed: bool,
271    terminated: bool,
272}
273
274impl ConcatState {
275    fn new() -> Self {
276        Self {
277            phase: 0,
278            pending: VecDeque::new(),
279            second_completed: false,
280            terminated: false,
281        }
282    }
283}
284
285/// `concat(first, second)` — forward DATA from `first` until it
286/// completes, then drain any DATA `second` emitted during phase 1
287/// (buffered) and continue forwarding `second`. ERROR from either
288/// source terminates the producer with the same ERROR.
289///
290/// Subscribes to BOTH sources at activation time (matches TS impl in
291/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
292/// race after `first` completes). DATA from `second` during phase 0
293/// is buffered, not forwarded.
294#[must_use]
295pub fn concat(
296    core: &Core,
297    binding: &Arc<dyn ProducerBinding>,
298    first: NodeId,
299    second: NodeId,
300) -> NodeId {
301    // Weak captures break the producer-build Arc cycle (see `zip` doc).
302
303    let build = Box::new(move |ctx: ProducerCtx<'_>| {
304        let producer_id = ctx.node_id();
305        let binding_clone = ctx.core().binding();
306        let em = ctx.emitter();
307        let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
308
309        // Subscribe to second FIRST so phase-0 DATA buffering catches
310        // synchronous initial emissions. Sinks capture strong refs cloned
311        // from the upgraded weaks; sink lifetime tied to producer activation.
312        let state_for_second = state.clone();
313        let core_for_second = em.clone();
314        let binding_for_second = binding_clone.clone();
315        let second_sink: Sink = Arc::new(move |msgs| {
316            enum Action {
317                Emit(HandleId),
318                Complete,
319                Error(HandleId),
320            }
321            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
322            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
323            {
324                let mut s = state_for_second.lock().unwrap();
325                if s.terminated {
326                    return;
327                }
328                // Tier-based dispatch (canonical §4.2).
329                for m in msgs {
330                    match m.tier() {
331                        3 => {
332                            if let Some(h) = m.payload_handle() {
333                                if s.phase == 0 {
334                                    // Buffer for later drain.
335                                    binding_for_second.retain_handle(h);
336                                    s.pending.push_back(h);
337                                } else {
338                                    binding_for_second.retain_handle(h);
339                                    actions.push(Action::Emit(h));
340                                }
341                            }
342                            // else: Resolved on second source — no action.
343                        }
344                        5 => {
345                            if let Some(h) = m.payload_handle() {
346                                // Error
347                                if !s.terminated {
348                                    s.terminated = true;
349                                    binding_for_second.retain_handle(h);
350                                    // P2: release buffered pending handles.
351                                    to_release.extend(s.pending.drain(..));
352                                    actions.push(Action::Error(h));
353                                }
354                            } else {
355                                // Complete
356                                if s.phase == 1 && !s.terminated {
357                                    s.terminated = true;
358                                    actions.push(Action::Complete);
359                                } else if s.phase == 0 {
360                                    // D041 / D4 fix: remember that second
361                                    // completed during phase 0. On phase
362                                    // transition, after draining `pending`,
363                                    // first_sink will self-complete
364                                    // (second's Complete fires once and
365                                    // won't be re-observed).
366                                    s.second_completed = true;
367                                }
368                            }
369                        }
370                        _ => {} // Tiers 0/1/2/4/6 — no action.
371                    }
372                }
373            }
374            for h in to_release {
375                binding_for_second.release_handle(h);
376            }
377            for action in actions {
378                match action {
379                    Action::Emit(h) => core_for_second.emit_or_defer(producer_id, h),
380                    Action::Complete => core_for_second.complete_or_defer(producer_id),
381                    Action::Error(h) => core_for_second.error_or_defer(producer_id, h),
382                }
383            }
384        });
385        // F2 /qa: Dead `second` is observed via `second_completed` flag.
386        // First-Complete drains pending and self-Completes if second
387        // already completed; same logic handles Dead second.
388        let second_outcome = ctx.subscribe_to(second, second_sink);
389        if matches!(
390            second_outcome,
391            crate::producer::SubscribeOutcome::Dead { .. }
392        ) {
393            let mut s = state.lock().unwrap();
394            s.second_completed = true;
395            // No additional action — the first-Complete path or first-Dead
396            // path below will trigger producer-Complete.
397        }
398
399        let state_for_first = state.clone();
400        let core_for_first = em.clone();
401        let binding_for_first = binding_clone.clone();
402        let first_sink: Sink = Arc::new(move |msgs| {
403            // first.Complete triggers the phase transition (handled
404            // via `s.phase = 1` + draining pending into `actions`),
405            // and may also self-complete the producer if `second`
406            // already completed during phase 0 (D041 / D4 fix).
407            enum Action {
408                Emit(HandleId),
409                Complete,
410                Error(HandleId),
411            }
412            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
413            let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
414            {
415                let mut s = state_for_first.lock().unwrap();
416                if s.terminated {
417                    return;
418                }
419                if s.phase != 0 {
420                    return; // first is done; ignore stale messages.
421                }
422                for m in msgs {
423                    // Slice E /qa: defensive per-iteration `terminated`
424                    // guard. The outer guard at the top of the lock
425                    // block catches a `terminated` set BEFORE this
426                    // batch arrived; this per-iteration check catches
427                    // the case where an earlier message in the SAME
428                    // batch transitioned us terminal (e.g., post-
429                    // Complete the phase moved to 1, but a defensively-
430                    // emitted stale `[Data]` later in the same batch
431                    // would otherwise queue a useless retain that
432                    // `core.emit` would discard on a terminal
433                    // producer).
434                    if s.terminated || s.phase != 0 {
435                        break;
436                    }
437                    // Tier-based dispatch (canonical §4.2).
438                    match m.tier() {
439                        3 => {
440                            if let Some(h) = m.payload_handle() {
441                                binding_for_first.retain_handle(h);
442                                actions.push(Action::Emit(h));
443                            }
444                            // else: Resolved on first source — no action.
445                        }
446                        5 => {
447                            if let Some(h) = m.payload_handle() {
448                                // Error
449                                if !s.terminated {
450                                    s.terminated = true;
451                                    binding_for_first.retain_handle(h);
452                                    // P2: release buffered pending handles.
453                                    to_release.extend(s.pending.drain(..));
454                                    actions.push(Action::Error(h));
455                                }
456                            } else {
457                                // Complete — phase transition: drain pending
458                                // second-data, then continue forwarding from
459                                // second.
460                                s.phase = 1;
461                                // Pending handles already retained at buffer time.
462                                for h in s.pending.drain(..) {
463                                    actions.push(Action::Emit(h));
464                                }
465                                // D041 / D4 fix: if second already completed
466                                // during phase 0, self-complete now (its
467                                // Complete fired once and won't re-fire).
468                                if s.second_completed && !s.terminated {
469                                    s.terminated = true;
470                                    actions.push(Action::Complete);
471                                }
472                            }
473                        }
474                        _ => {} // Tiers 0/1/2/4/6 — no action.
475                    }
476                }
477            }
478            for h in to_release {
479                binding_for_first.release_handle(h);
480            }
481            for action in actions {
482                match action {
483                    Action::Emit(h) => core_for_first.emit_or_defer(producer_id, h),
484                    Action::Complete => core_for_first.complete_or_defer(producer_id),
485                    Action::Error(h) => core_for_first.error_or_defer(producer_id, h),
486                }
487            }
488        });
489        // F2 /qa: Dead `first` triggers the phase transition immediately
490        // (treat as first-Complete). If `second` is also dead (or already
491        // completed in phase 0), self-Complete; else continue forwarding
492        // pending+future from second.
493        let first_outcome = ctx.subscribe_to(first, first_sink);
494        if matches!(
495            first_outcome,
496            crate::producer::SubscribeOutcome::Dead { .. }
497        ) {
498            let core_first_dead = em.clone();
499            let mut should_complete = false;
500            let mut pending_to_emit: Vec<HandleId> = Vec::new();
501            {
502                let mut s = state.lock().unwrap();
503                if !s.terminated && s.phase == 0 {
504                    s.phase = 1;
505                    // Drain pending second-DATA buffered during phase 0.
506                    pending_to_emit.extend(s.pending.drain(..));
507                    // If second already completed (or was Dead), self-Complete.
508                    if s.second_completed && !s.terminated {
509                        s.terminated = true;
510                        should_complete = true;
511                    }
512                }
513            }
514            for h in pending_to_emit {
515                core_first_dead.emit_or_defer(producer_id, h);
516            }
517            if should_complete {
518                core_first_dead.complete_or_defer(producer_id);
519            }
520        }
521    });
522
523    let fn_id = binding.register_producer_build(build);
524    core.register_producer(fn_id)
525        .expect("invariant: register_producer has no deps; no error variants reachable")
526}
527
528// =====================================================================
529// race — first source to emit DATA wins; losers are ignored
530// =====================================================================
531
532struct RaceState {
533    /// Index of the winning source, or `None` if no winner yet.
534    winner: Option<usize>,
535    /// Per-source completed flags. When all complete without a winner,
536    /// the producer completes (P4: no-winner all-complete termination).
537    completed: Vec<bool>,
538    terminated: bool,
539}
540
541impl RaceState {
542    fn new(n: usize) -> Self {
543        Self {
544            winner: None,
545            completed: vec![false; n],
546            terminated: false,
547        }
548    }
549}
550
551/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
552/// emit DATA wins. Subsequent traffic from the winner is forwarded;
553/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
554/// but their sink callbacks short-circuit). Saves the dynamic
555/// rewiring cost of explicitly unsubscribing losers.
556///
557/// Empty source list completes immediately. Single source is
558/// identity-passthrough.
559/// # Errors
560///
561/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
562/// (R5.7.x — race requires ≥1 source; no-winner-possible rejected).
563pub fn race(
564    core: &Core,
565    binding: &Arc<dyn ProducerBinding>,
566    sources: Vec<NodeId>,
567) -> Result<NodeId, OperatorFactoryError> {
568    // R5.7.x — race requires ≥1 source. Mirrors `combine::combine`.
569    if sources.is_empty() {
570        return Err(OperatorFactoryError::EmptySources);
571    }
572    let n = sources.len();
573    // Weak captures break the producer-build Arc cycle (see `zip` doc).
574
575    let build = Box::new(move |ctx: ProducerCtx<'_>| {
576        let producer_id = ctx.node_id();
577        let binding_clone = ctx.core().binding();
578        let em = ctx.emitter();
579        // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
580        let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
581
582        for (idx, &source) in sources.iter().enumerate() {
583            let state_inner = state.clone();
584            let core_inner = em.clone();
585            let binding_inner = binding_clone.clone();
586            let sink: Sink = Arc::new(move |msgs| {
587                enum Action {
588                    Emit(HandleId),
589                    Complete,
590                    Error(HandleId),
591                }
592                let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
593                {
594                    let mut s = state_inner.lock().unwrap();
595                    if s.terminated {
596                        return;
597                    }
598                    // Tier-based dispatch (canonical §4.2).
599                    for m in msgs {
600                        match m.tier() {
601                            3 => {
602                                if let Some(h) = m.payload_handle() {
603                                    // P3: check s.winner live each iteration —
604                                    // this source may have just become the winner
605                                    // earlier in this batch.
606                                    if s.winner.is_none() {
607                                        s.winner = Some(idx);
608                                        binding_inner.retain_handle(h);
609                                        actions.push(Action::Emit(h));
610                                    } else if s.winner == Some(idx) {
611                                        binding_inner.retain_handle(h);
612                                        actions.push(Action::Emit(h));
613                                    }
614                                    // else: loser DATA — ignore.
615                                }
616                                // else: Resolved on a source — no action.
617                            }
618                            5 => {
619                                if let Some(h) = m.payload_handle() {
620                                    // Error — from any source pre-winner OR from
621                                    // the winner cascade; loser errors post-winner
622                                    // are ignored.
623                                    if (s.winner.is_none() || s.winner == Some(idx))
624                                        && !s.terminated
625                                    {
626                                        s.terminated = true;
627                                        binding_inner.retain_handle(h);
628                                        actions.push(Action::Error(h));
629                                    }
630                                } else {
631                                    // Complete
632                                    s.completed[idx] = true;
633                                    if s.winner == Some(idx) && !s.terminated {
634                                        s.terminated = true;
635                                        actions.push(Action::Complete);
636                                    } else if s.winner.is_none()
637                                        && s.completed.iter().all(|&c| c)
638                                        && !s.terminated
639                                    {
640                                        // P4: all sources completed without a
641                                        // winner — terminate the producer.
642                                        s.terminated = true;
643                                        actions.push(Action::Complete);
644                                    }
645                                    // else: loser complete — ignore.
646                                }
647                            }
648                            _ => {} // Tiers 0/1/2/4/6 — no action.
649                        }
650                    }
651                }
652                for action in actions {
653                    match action {
654                        Action::Emit(h) => core_inner.emit_or_defer(producer_id, h),
655                        Action::Complete => core_inner.complete_or_defer(producer_id),
656                        Action::Error(h) => core_inner.error_or_defer(producer_id, h),
657                    }
658                }
659            });
660            // F2 /qa: on Dead, treat as `completed[idx] = true`. If
661            // all sources are now completed without a winner, self-
662            // Complete (matches the P4 "all completed no winner"
663            // branch in the per-source sink above).
664            let outcome = ctx.subscribe_to(source, sink);
665            if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
666                let core_dead = em.clone();
667                let mut should_complete = false;
668                {
669                    let mut s = state.lock().unwrap();
670                    if !s.terminated && s.winner.is_none() {
671                        s.completed[idx] = true;
672                        if s.completed.iter().all(|&c| c) {
673                            s.terminated = true;
674                            should_complete = true;
675                        }
676                    }
677                }
678                if should_complete {
679                    core_dead.complete_or_defer(producer_id);
680                }
681            }
682        }
683    });
684
685    let fn_id = binding.register_producer_build(build);
686    Ok(core
687        .register_producer(fn_id)
688        .expect("invariant: register_producer has no deps; no error variants reachable"))
689}
690
691// =====================================================================
692// takeUntil — terminate when notifier emits DATA
693// =====================================================================
694
695struct TakeUntilState {
696    terminated: bool,
697}
698
699impl TakeUntilState {
700    fn new() -> Self {
701        Self { terminated: false }
702    }
703}
704
705/// `take_until(source, notifier)` — forward DATA from `source` until
706/// `notifier` emits its first DATA, then terminate the producer with
707/// COMPLETE. Errors from either source cascade. Source COMPLETE
708/// terminates the producer.
709///
710/// Notifier DATA is consumed but never forwarded (zero-FFI on the
711/// notifier path — we don't dereference its payload, just use the
712/// emission as a signal).
713#[must_use]
714pub fn take_until(
715    core: &Core,
716    binding: &Arc<dyn ProducerBinding>,
717    source: NodeId,
718    notifier: NodeId,
719) -> NodeId {
720    // Weak captures break the producer-build Arc cycle (see `zip` doc).
721
722    let build = Box::new(move |ctx: ProducerCtx<'_>| {
723        let producer_id = ctx.node_id();
724        let binding_clone = ctx.core().binding();
725        let em = ctx.emitter();
726        let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
727
728        // Source sink: forward DATA, propagate terminals.
729        let state_for_source = state.clone();
730        let core_for_source = em.clone();
731        let binding_for_source = binding_clone.clone();
732        let source_sink: Sink = Arc::new(move |msgs| {
733            enum Action {
734                Emit(HandleId),
735                Complete,
736                Error(HandleId),
737            }
738            let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
739            {
740                let mut s = state_for_source.lock().unwrap();
741                if s.terminated {
742                    return;
743                }
744                // Tier-based dispatch (canonical §4.2).
745                for m in msgs {
746                    match m.tier() {
747                        3 => {
748                            if let Some(h) = m.payload_handle() {
749                                binding_for_source.retain_handle(h);
750                                actions.push(Action::Emit(h));
751                            }
752                            // else: Resolved on source — no action.
753                        }
754                        5 => {
755                            if let Some(h) = m.payload_handle() {
756                                // Error
757                                if !s.terminated {
758                                    s.terminated = true;
759                                    binding_for_source.retain_handle(h);
760                                    actions.push(Action::Error(h));
761                                }
762                            } else {
763                                // Complete
764                                if !s.terminated {
765                                    s.terminated = true;
766                                    actions.push(Action::Complete);
767                                }
768                            }
769                        }
770                        _ => {} // Tiers 0/1/2/4/6 — no action.
771                    }
772                }
773            }
774            for action in actions {
775                match action {
776                    Action::Emit(h) => core_for_source.emit_or_defer(producer_id, h),
777                    Action::Complete => core_for_source.complete_or_defer(producer_id),
778                    Action::Error(h) => core_for_source.error_or_defer(producer_id, h),
779                }
780            }
781        });
782        // F2 /qa: Dead `source` → self-Complete (source is permanently
783        // over, so take_until has nothing to forward and the producer
784        // is finished).
785        let source_outcome = ctx.subscribe_to(source, source_sink);
786        if matches!(
787            source_outcome,
788            crate::producer::SubscribeOutcome::Dead { .. }
789        ) {
790            let core_dead = em.clone();
791            let mut should_complete = false;
792            {
793                let mut s = state.lock().unwrap();
794                if !s.terminated {
795                    s.terminated = true;
796                    should_complete = true;
797                }
798            }
799            if should_complete {
800                core_dead.complete_or_defer(producer_id);
801            }
802        }
803
804        // Notifier sink: any DATA → terminate; ERROR → cascade.
805        let state_for_notifier = state.clone();
806        let core_for_notifier = em.clone();
807        let binding_for_notifier = binding_clone.clone();
808        let notifier_sink: Sink = Arc::new(move |msgs| {
809            enum Action {
810                Complete,
811                Error(HandleId),
812            }
813            let mut action: Option<Action> = None;
814            {
815                let mut s = state_for_notifier.lock().unwrap();
816                if s.terminated {
817                    return;
818                }
819                // Tier-based dispatch (canonical §4.2).
820                for m in msgs {
821                    match m.tier() {
822                        3 => {
823                            // Any DATA on notifier → complete the producer.
824                            // (Resolved alone — no payload — no action.)
825                            // We don't emit notifier DATA downstream, so we
826                            // don't even need to extract the handle.
827                            if m.payload_handle().is_some() && !s.terminated {
828                                s.terminated = true;
829                                action = Some(Action::Complete);
830                                break;
831                            }
832                        }
833                        5 => {
834                            // Error: cascade. Complete (payload_handle.is_none())
835                            // without prior DATA: nothing to do — source
836                            // continues independently.
837                            if let Some(h) = m.payload_handle() {
838                                if !s.terminated {
839                                    s.terminated = true;
840                                    binding_for_notifier.retain_handle(h);
841                                    action = Some(Action::Error(h));
842                                    break;
843                                }
844                            }
845                        }
846                        _ => {} // Tiers 0/1/2/4/6 — no action.
847                    }
848                }
849            }
850            if let Some(a) = action {
851                match a {
852                    Action::Complete => core_for_notifier.complete_or_defer(producer_id),
853                    Action::Error(h) => core_for_notifier.error_or_defer(producer_id, h),
854                }
855            }
856        });
857        // F2 /qa: Dead `notifier` → ignore. Notifier signal can never
858        // fire, so take_until reduces to a passthrough of source. The
859        // source's own Complete/Error will terminate the producer
860        // normally; if source is also Dead, the source-Dead branch
861        // above already self-Completed.
862        let _ = ctx.subscribe_to(notifier, notifier_sink);
863    });
864
865    let fn_id = binding.register_producer_build(build);
866    core.register_producer(fn_id)
867        .expect("invariant: register_producer has no deps; no error variants reachable")
868}