graphrefly_operators/ops_impl.rs
1//! Concrete implementations of the four subscription-managed combinators
2//! (zip / concat / race / takeUntil). Built on the
3//! [`super::producer::ProducerCtx`] substrate.
4
5// Each sink closure runs a small phase-1/phase-2 dance (lock state,
6// collect actions, drop lock, replay actions). The match-then-if
7// structure inside phase 1 is intentional for readability; collapsing
8// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
9#![allow(clippy::collapsible_if, clippy::collapsible_match)]
10// `zip` is genuinely large (multi-arity tuple-pack with terminal-
11// cascade handling). Splitting would obscure the concurrent state-
12// machine logic across helpers without clear benefit.
13#![allow(clippy::too_many_lines)]
14
15use std::collections::VecDeque;
16use std::sync::{Arc, Mutex, Weak};
17
18use graphrefly_core::{Core, HandleId, NodeId, Sink};
19use smallvec::SmallVec;
20
21use super::error::OperatorFactoryError;
22use super::producer::{ProducerBinding, ProducerCtx};
23
24// =====================================================================
25// zip — pair handles N-wise across N sources
26// =====================================================================
27
28/// Per-zip-node state: one FIFO queue per source, plus a flag for each
29/// source's terminal. Lives behind `Arc<Mutex<_>>` captured by the
30/// build + sink closures.
31struct ZipState {
32 queues: Vec<VecDeque<HandleId>>,
33 completed: Vec<bool>,
34 errored: bool,
35 terminated: bool,
36}
37
38impl ZipState {
39 fn new(n: usize) -> Self {
40 Self {
41 queues: (0..n).map(|_| VecDeque::new()).collect(),
42 completed: vec![false; n],
43 errored: false,
44 terminated: false,
45 }
46 }
47}
48
49/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
50/// tuple, repeat. Models RxJS / TS `zip`:
51///
52/// - Each upstream DATA pushes into that source's per-source queue.
53/// - When **every** queue has at least one entry, pop one from each,
54/// pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
55/// and emit on the producer.
56/// - On any source's COMPLETE: if its queue is empty, terminate the
57/// producer with COMPLETE. Otherwise continue draining; terminate
58/// when this source's queue becomes empty (zip can't produce a
59/// tuple without input from every source).
60/// - On any source's ERROR: terminate the producer with the same
61/// ERROR (first error wins, like merge per Slice C-2 D022).
62///
63/// Empty source list (`n == 0`) emits a single empty-tuple event then
64/// completes. Single source (`n == 1`) is identity-passthrough.
65///
66/// # Refcount discipline
67///
68/// Each upstream DATA handle is `retain_handle`-bumped before being
69/// pushed onto a queue (the inbound message's payload retain belongs
70/// to the wave-end-flush release path; we take our own share for the
71/// queue). On pop, component handles are passed to `pack_tuple` which
72/// must NOT consume or release them — the caller (zip) retains
73/// ownership throughout the call and releases each component handle's
74/// queue share after `pack_tuple` returns. The returned tuple handle
75/// has a pre-bumped retain (binding convention per D020 doc on
76/// [`BindingBoundary::pack_tuple`]).
77/// # Errors
78///
79/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
80/// (R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).
81pub fn zip(
82 core: &Core,
83 binding: &Arc<dyn ProducerBinding>,
84 sources: Vec<NodeId>,
85 pack_fn_id: graphrefly_core::FnId,
86) -> Result<NodeId, OperatorFactoryError> {
87 // R5.7.x — zip requires ≥1 source. Mirrors `combine::combine` (which
88 // raises the same factory-shape invariant) so all bindings can route
89 // through `operator_factory_error_to_napi` / equivalent.
90 if sources.is_empty() {
91 return Err(OperatorFactoryError::EmptySources);
92 }
93 let n = sources.len();
94 // Weak-Arc captures break the BenchBinding → registry → producer_builds
95 // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
96 // pin the entire graph state when the host BenchCore drops with active
97 // producer registrations. See `Core::weak_handle` doc + Slice Y close.
98 let core_weak = core.weak_handle();
99 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
100
101 let build = Box::new(move |ctx: ProducerCtx<'_>| {
102 let producer_id = ctx.node_id();
103 let (Some(core_for_build), Some(binding_clone)) =
104 (core_weak.upgrade(), binding_weak.upgrade())
105 else {
106 // Host Core / binding already dropped — no-op.
107 return;
108 };
109 // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
110 let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
111
112 for (idx, &source) in sources.iter().enumerate() {
113 let state_inner = state.clone();
114 // Sinks live only while the producer is active (cleared via
115 // producer_deactivate on last-subscriber unsubscribe), so they
116 // can safely capture strong refs cloned from the upgraded weaks.
117 let core_inner = core_for_build.clone();
118 let binding_inner = binding_clone.clone();
119 let sink: Sink = Arc::new(move |msgs| {
120 // Phase 1 (lock held): mutate queues + collect actions.
121 // Phase 2 (lock released): pack tuples + re-enter Core.
122 enum PostLockAction {
123 /// Pack popped handles into a tuple, release components, emit.
124 PackAndEmit(Vec<HandleId>),
125 Complete,
126 Error(HandleId),
127 }
128 let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
129 // Handles to release after the lock drops (P2:
130 // drain queues on terminate to avoid handle leaks).
131 let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
132 {
133 let mut s = state_inner.lock().unwrap();
134 if s.terminated {
135 return;
136 }
137 // Tier-based dispatch (canonical §4.2; see
138 // `feedback_use_tier_for_signal_routing.md`). Tier 3
139 // payload_handle.is_some() = DATA; tier 5
140 // payload_handle.is_some() = ERROR else COMPLETE.
141 for m in msgs {
142 match m.tier() {
143 3 => {
144 if let Some(h) = m.payload_handle() {
145 binding_inner.retain_handle(h);
146 s.queues[idx].push_back(h);
147 // Collect complete tuples — pack_tuple runs
148 // after the lock drops (P5).
149 while s.queues.iter().all(|q| !q.is_empty()) {
150 let popped: Vec<HandleId> = s
151 .queues
152 .iter_mut()
153 .map(|q| q.pop_front().unwrap())
154 .collect();
155 post_actions.push(PostLockAction::PackAndEmit(popped));
156 }
157 }
158 // else: Resolved on a source — no action.
159 }
160 5 => {
161 if let Some(h) = m.payload_handle() {
162 // Error
163 if !s.errored && !s.terminated {
164 s.errored = true;
165 s.terminated = true;
166 binding_inner.retain_handle(h);
167 // P2: release all remaining queued handles.
168 for q in &mut s.queues {
169 to_release.extend(q.drain(..));
170 }
171 post_actions.push(PostLockAction::Error(h));
172 }
173 } else {
174 // Complete
175 s.completed[idx] = true;
176 // If this source's queue is empty, no more
177 // tuples from it — terminate.
178 if s.queues[idx].is_empty() && !s.terminated {
179 s.terminated = true;
180 // P2: release all remaining queued handles.
181 for q in &mut s.queues {
182 to_release.extend(q.drain(..));
183 }
184 post_actions.push(PostLockAction::Complete);
185 }
186 }
187 }
188 _ => {} // Tiers 0/1/2/4/6 — no action.
189 }
190 }
191 }
192 // Release leaked queue handles outside the lock.
193 for h in to_release {
194 binding_inner.release_handle(h);
195 }
196 // Phase 2 (lock released): pack tuples + re-enter Core.
197 // P5: pack_tuple runs outside the per-zip lock to avoid
198 // deadlock if the binding's pack_tuple re-enters.
199 for action in post_actions {
200 match action {
201 PostLockAction::PackAndEmit(popped) => {
202 let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
203 for h in &popped {
204 binding_inner.release_handle(*h);
205 }
206 core_inner.emit_or_defer(producer_id, tuple_h);
207 }
208 PostLockAction::Complete => core_inner.complete_or_defer(producer_id),
209 PostLockAction::Error(h) => core_inner.error_or_defer(producer_id, h),
210 }
211 }
212 });
213 // F2 /qa: on Dead, synthesize the per-source Complete in
214 // zip's state machine — `s.completed[idx] = true` and (if
215 // queue is empty, which it always is at activation since
216 // DATA hasn't yet flowed) self-Complete the producer.
217 // Pre-F2 a Dead source left zip waiting on a queue that
218 // would never fill → silent wedge.
219 let outcome = ctx.subscribe_to(source, sink);
220 if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
221 let core_dead = core_for_build.clone();
222 let binding_dead = binding_clone.clone();
223 let mut should_complete = false;
224 let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
225 {
226 let mut s = state.lock().unwrap();
227 if !s.terminated {
228 s.completed[idx] = true;
229 if s.queues[idx].is_empty() {
230 s.terminated = true;
231 for q in &mut s.queues {
232 to_release.extend(q.drain(..));
233 }
234 should_complete = true;
235 }
236 }
237 }
238 for h in to_release {
239 binding_dead.release_handle(h);
240 }
241 if should_complete {
242 core_dead.complete_or_defer(producer_id);
243 }
244 }
245 }
246 });
247
248 let fn_id = binding.register_producer_build(build);
249 Ok(core
250 .register_producer(fn_id)
251 .expect("invariant: register_producer has no deps; no error variants reachable"))
252}
253
254// =====================================================================
255// concat — sequentially forward `first` then `second`
256// =====================================================================
257
258struct ConcatState {
259 /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
260 phase: u8,
261 /// Buffered DATA from `second` that arrived during phase 0 (before
262 /// `first` completed). Drained on phase transition.
263 pending: VecDeque<HandleId>,
264 /// Set to true if `second` completed during phase 0. On phase
265 /// transition, after draining `pending`, concat self-completes
266 /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
267 second_completed: bool,
268 terminated: bool,
269}
270
271impl ConcatState {
272 fn new() -> Self {
273 Self {
274 phase: 0,
275 pending: VecDeque::new(),
276 second_completed: false,
277 terminated: false,
278 }
279 }
280}
281
282/// `concat(first, second)` — forward DATA from `first` until it
283/// completes, then drain any DATA `second` emitted during phase 1
284/// (buffered) and continue forwarding `second`. ERROR from either
285/// source terminates the producer with the same ERROR.
286///
287/// Subscribes to BOTH sources at activation time (matches TS impl in
288/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
289/// race after `first` completes). DATA from `second` during phase 0
290/// is buffered, not forwarded.
291#[must_use]
292pub fn concat(
293 core: &Core,
294 binding: &Arc<dyn ProducerBinding>,
295 first: NodeId,
296 second: NodeId,
297) -> NodeId {
298 // Weak captures break the producer-build Arc cycle (see `zip` doc).
299 let core_weak = core.weak_handle();
300 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
301
302 let build = Box::new(move |ctx: ProducerCtx<'_>| {
303 let producer_id = ctx.node_id();
304 let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
305 else {
306 return;
307 };
308 let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
309
310 // Subscribe to second FIRST so phase-0 DATA buffering catches
311 // synchronous initial emissions. Sinks capture strong refs cloned
312 // from the upgraded weaks; sink lifetime tied to producer activation.
313 let state_for_second = state.clone();
314 let core_for_second = core_clone.clone();
315 let binding_for_second = binding_clone.clone();
316 let second_sink: Sink = Arc::new(move |msgs| {
317 enum Action {
318 Emit(HandleId),
319 Complete,
320 Error(HandleId),
321 }
322 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
323 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
324 {
325 let mut s = state_for_second.lock().unwrap();
326 if s.terminated {
327 return;
328 }
329 // Tier-based dispatch (canonical §4.2).
330 for m in msgs {
331 match m.tier() {
332 3 => {
333 if let Some(h) = m.payload_handle() {
334 if s.phase == 0 {
335 // Buffer for later drain.
336 binding_for_second.retain_handle(h);
337 s.pending.push_back(h);
338 } else {
339 binding_for_second.retain_handle(h);
340 actions.push(Action::Emit(h));
341 }
342 }
343 // else: Resolved on second source — no action.
344 }
345 5 => {
346 if let Some(h) = m.payload_handle() {
347 // Error
348 if !s.terminated {
349 s.terminated = true;
350 binding_for_second.retain_handle(h);
351 // P2: release buffered pending handles.
352 to_release.extend(s.pending.drain(..));
353 actions.push(Action::Error(h));
354 }
355 } else {
356 // Complete
357 if s.phase == 1 && !s.terminated {
358 s.terminated = true;
359 actions.push(Action::Complete);
360 } else if s.phase == 0 {
361 // D041 / D4 fix: remember that second
362 // completed during phase 0. On phase
363 // transition, after draining `pending`,
364 // first_sink will self-complete
365 // (second's Complete fires once and
366 // won't be re-observed).
367 s.second_completed = true;
368 }
369 }
370 }
371 _ => {} // Tiers 0/1/2/4/6 — no action.
372 }
373 }
374 }
375 for h in to_release {
376 binding_for_second.release_handle(h);
377 }
378 for action in actions {
379 match action {
380 Action::Emit(h) => core_for_second.emit_or_defer(producer_id, h),
381 Action::Complete => core_for_second.complete_or_defer(producer_id),
382 Action::Error(h) => core_for_second.error_or_defer(producer_id, h),
383 }
384 }
385 });
386 // F2 /qa: Dead `second` is observed via `second_completed` flag.
387 // First-Complete drains pending and self-Completes if second
388 // already completed; same logic handles Dead second.
389 let second_outcome = ctx.subscribe_to(second, second_sink);
390 if matches!(
391 second_outcome,
392 crate::producer::SubscribeOutcome::Dead { .. }
393 ) {
394 let mut s = state.lock().unwrap();
395 s.second_completed = true;
396 // No additional action — the first-Complete path or first-Dead
397 // path below will trigger producer-Complete.
398 }
399
400 let state_for_first = state.clone();
401 let core_for_first = core_clone.clone();
402 let binding_for_first = binding_clone.clone();
403 let first_sink: Sink = Arc::new(move |msgs| {
404 // first.Complete triggers the phase transition (handled
405 // via `s.phase = 1` + draining pending into `actions`),
406 // and may also self-complete the producer if `second`
407 // already completed during phase 0 (D041 / D4 fix).
408 enum Action {
409 Emit(HandleId),
410 Complete,
411 Error(HandleId),
412 }
413 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
414 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
415 {
416 let mut s = state_for_first.lock().unwrap();
417 if s.terminated {
418 return;
419 }
420 if s.phase != 0 {
421 return; // first is done; ignore stale messages.
422 }
423 for m in msgs {
424 // Slice E /qa: defensive per-iteration `terminated`
425 // guard. The outer guard at the top of the lock
426 // block catches a `terminated` set BEFORE this
427 // batch arrived; this per-iteration check catches
428 // the case where an earlier message in the SAME
429 // batch transitioned us terminal (e.g., post-
430 // Complete the phase moved to 1, but a defensively-
431 // emitted stale `[Data]` later in the same batch
432 // would otherwise queue a useless retain that
433 // `core.emit` would discard on a terminal
434 // producer).
435 if s.terminated || s.phase != 0 {
436 break;
437 }
438 // Tier-based dispatch (canonical §4.2).
439 match m.tier() {
440 3 => {
441 if let Some(h) = m.payload_handle() {
442 binding_for_first.retain_handle(h);
443 actions.push(Action::Emit(h));
444 }
445 // else: Resolved on first source — no action.
446 }
447 5 => {
448 if let Some(h) = m.payload_handle() {
449 // Error
450 if !s.terminated {
451 s.terminated = true;
452 binding_for_first.retain_handle(h);
453 // P2: release buffered pending handles.
454 to_release.extend(s.pending.drain(..));
455 actions.push(Action::Error(h));
456 }
457 } else {
458 // Complete — phase transition: drain pending
459 // second-data, then continue forwarding from
460 // second.
461 s.phase = 1;
462 // Pending handles already retained at buffer time.
463 for h in s.pending.drain(..) {
464 actions.push(Action::Emit(h));
465 }
466 // D041 / D4 fix: if second already completed
467 // during phase 0, self-complete now (its
468 // Complete fired once and won't re-fire).
469 if s.second_completed && !s.terminated {
470 s.terminated = true;
471 actions.push(Action::Complete);
472 }
473 }
474 }
475 _ => {} // Tiers 0/1/2/4/6 — no action.
476 }
477 }
478 }
479 for h in to_release {
480 binding_for_first.release_handle(h);
481 }
482 for action in actions {
483 match action {
484 Action::Emit(h) => core_for_first.emit_or_defer(producer_id, h),
485 Action::Complete => core_for_first.complete_or_defer(producer_id),
486 Action::Error(h) => core_for_first.error_or_defer(producer_id, h),
487 }
488 }
489 });
490 // F2 /qa: Dead `first` triggers the phase transition immediately
491 // (treat as first-Complete). If `second` is also dead (or already
492 // completed in phase 0), self-Complete; else continue forwarding
493 // pending+future from second.
494 let first_outcome = ctx.subscribe_to(first, first_sink);
495 if matches!(
496 first_outcome,
497 crate::producer::SubscribeOutcome::Dead { .. }
498 ) {
499 let core_first_dead = core_clone.clone();
500 let mut should_complete = false;
501 let mut pending_to_emit: Vec<HandleId> = Vec::new();
502 {
503 let mut s = state.lock().unwrap();
504 if !s.terminated && s.phase == 0 {
505 s.phase = 1;
506 // Drain pending second-DATA buffered during phase 0.
507 pending_to_emit.extend(s.pending.drain(..));
508 // If second already completed (or was Dead), self-Complete.
509 if s.second_completed && !s.terminated {
510 s.terminated = true;
511 should_complete = true;
512 }
513 }
514 }
515 for h in pending_to_emit {
516 core_first_dead.emit_or_defer(producer_id, h);
517 }
518 if should_complete {
519 core_first_dead.complete_or_defer(producer_id);
520 }
521 }
522 });
523
524 let fn_id = binding.register_producer_build(build);
525 core.register_producer(fn_id)
526 .expect("invariant: register_producer has no deps; no error variants reachable")
527}
528
529// =====================================================================
530// race — first source to emit DATA wins; losers are ignored
531// =====================================================================
532
533struct RaceState {
534 /// Index of the winning source, or `None` if no winner yet.
535 winner: Option<usize>,
536 /// Per-source completed flags. When all complete without a winner,
537 /// the producer completes (P4: no-winner all-complete termination).
538 completed: Vec<bool>,
539 terminated: bool,
540}
541
542impl RaceState {
543 fn new(n: usize) -> Self {
544 Self {
545 winner: None,
546 completed: vec![false; n],
547 terminated: false,
548 }
549 }
550}
551
552/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
553/// emit DATA wins. Subsequent traffic from the winner is forwarded;
554/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
555/// but their sink callbacks short-circuit). Saves the dynamic
556/// rewiring cost of explicitly unsubscribing losers.
557///
558/// Empty source list completes immediately. Single source is
559/// identity-passthrough.
560/// # Errors
561///
562/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
563/// (R5.7.x — race requires ≥1 source; no-winner-possible rejected).
564pub fn race(
565 core: &Core,
566 binding: &Arc<dyn ProducerBinding>,
567 sources: Vec<NodeId>,
568) -> Result<NodeId, OperatorFactoryError> {
569 // R5.7.x — race requires ≥1 source. Mirrors `combine::combine`.
570 if sources.is_empty() {
571 return Err(OperatorFactoryError::EmptySources);
572 }
573 let n = sources.len();
574 // Weak captures break the producer-build Arc cycle (see `zip` doc).
575 let core_weak = core.weak_handle();
576 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
577
578 let build = Box::new(move |ctx: ProducerCtx<'_>| {
579 let producer_id = ctx.node_id();
580 let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
581 else {
582 return;
583 };
584 // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
585 let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
586
587 for (idx, &source) in sources.iter().enumerate() {
588 let state_inner = state.clone();
589 let core_inner = core_clone.clone();
590 let binding_inner = binding_clone.clone();
591 let sink: Sink = Arc::new(move |msgs| {
592 enum Action {
593 Emit(HandleId),
594 Complete,
595 Error(HandleId),
596 }
597 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
598 {
599 let mut s = state_inner.lock().unwrap();
600 if s.terminated {
601 return;
602 }
603 // Tier-based dispatch (canonical §4.2).
604 for m in msgs {
605 match m.tier() {
606 3 => {
607 if let Some(h) = m.payload_handle() {
608 // P3: check s.winner live each iteration —
609 // this source may have just become the winner
610 // earlier in this batch.
611 if s.winner.is_none() {
612 s.winner = Some(idx);
613 binding_inner.retain_handle(h);
614 actions.push(Action::Emit(h));
615 } else if s.winner == Some(idx) {
616 binding_inner.retain_handle(h);
617 actions.push(Action::Emit(h));
618 }
619 // else: loser DATA — ignore.
620 }
621 // else: Resolved on a source — no action.
622 }
623 5 => {
624 if let Some(h) = m.payload_handle() {
625 // Error — from any source pre-winner OR from
626 // the winner cascade; loser errors post-winner
627 // are ignored.
628 if (s.winner.is_none() || s.winner == Some(idx))
629 && !s.terminated
630 {
631 s.terminated = true;
632 binding_inner.retain_handle(h);
633 actions.push(Action::Error(h));
634 }
635 } else {
636 // Complete
637 s.completed[idx] = true;
638 if s.winner == Some(idx) && !s.terminated {
639 s.terminated = true;
640 actions.push(Action::Complete);
641 } else if s.winner.is_none()
642 && s.completed.iter().all(|&c| c)
643 && !s.terminated
644 {
645 // P4: all sources completed without a
646 // winner — terminate the producer.
647 s.terminated = true;
648 actions.push(Action::Complete);
649 }
650 // else: loser complete — ignore.
651 }
652 }
653 _ => {} // Tiers 0/1/2/4/6 — no action.
654 }
655 }
656 }
657 for action in actions {
658 match action {
659 Action::Emit(h) => core_inner.emit_or_defer(producer_id, h),
660 Action::Complete => core_inner.complete_or_defer(producer_id),
661 Action::Error(h) => core_inner.error_or_defer(producer_id, h),
662 }
663 }
664 });
665 // F2 /qa: on Dead, treat as `completed[idx] = true`. If
666 // all sources are now completed without a winner, self-
667 // Complete (matches the P4 "all completed no winner"
668 // branch in the per-source sink above).
669 let outcome = ctx.subscribe_to(source, sink);
670 if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
671 let core_dead = core_clone.clone();
672 let mut should_complete = false;
673 {
674 let mut s = state.lock().unwrap();
675 if !s.terminated && s.winner.is_none() {
676 s.completed[idx] = true;
677 if s.completed.iter().all(|&c| c) {
678 s.terminated = true;
679 should_complete = true;
680 }
681 }
682 }
683 if should_complete {
684 core_dead.complete_or_defer(producer_id);
685 }
686 }
687 }
688 });
689
690 let fn_id = binding.register_producer_build(build);
691 Ok(core
692 .register_producer(fn_id)
693 .expect("invariant: register_producer has no deps; no error variants reachable"))
694}
695
696// =====================================================================
697// takeUntil — terminate when notifier emits DATA
698// =====================================================================
699
700struct TakeUntilState {
701 terminated: bool,
702}
703
704impl TakeUntilState {
705 fn new() -> Self {
706 Self { terminated: false }
707 }
708}
709
710/// `take_until(source, notifier)` — forward DATA from `source` until
711/// `notifier` emits its first DATA, then terminate the producer with
712/// COMPLETE. Errors from either source cascade. Source COMPLETE
713/// terminates the producer.
714///
715/// Notifier DATA is consumed but never forwarded (zero-FFI on the
716/// notifier path — we don't dereference its payload, just use the
717/// emission as a signal).
718#[must_use]
719pub fn take_until(
720 core: &Core,
721 binding: &Arc<dyn ProducerBinding>,
722 source: NodeId,
723 notifier: NodeId,
724) -> NodeId {
725 // Weak captures break the producer-build Arc cycle (see `zip` doc).
726 let core_weak = core.weak_handle();
727 let binding_weak: Weak<dyn ProducerBinding> = Arc::downgrade(binding);
728
729 let build = Box::new(move |ctx: ProducerCtx<'_>| {
730 let producer_id = ctx.node_id();
731 let (Some(core_clone), Some(binding_clone)) = (core_weak.upgrade(), binding_weak.upgrade())
732 else {
733 return;
734 };
735 let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
736
737 // Source sink: forward DATA, propagate terminals.
738 let state_for_source = state.clone();
739 let core_for_source = core_clone.clone();
740 let binding_for_source = binding_clone.clone();
741 let source_sink: Sink = Arc::new(move |msgs| {
742 enum Action {
743 Emit(HandleId),
744 Complete,
745 Error(HandleId),
746 }
747 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
748 {
749 let mut s = state_for_source.lock().unwrap();
750 if s.terminated {
751 return;
752 }
753 // Tier-based dispatch (canonical §4.2).
754 for m in msgs {
755 match m.tier() {
756 3 => {
757 if let Some(h) = m.payload_handle() {
758 binding_for_source.retain_handle(h);
759 actions.push(Action::Emit(h));
760 }
761 // else: Resolved on source — no action.
762 }
763 5 => {
764 if let Some(h) = m.payload_handle() {
765 // Error
766 if !s.terminated {
767 s.terminated = true;
768 binding_for_source.retain_handle(h);
769 actions.push(Action::Error(h));
770 }
771 } else {
772 // Complete
773 if !s.terminated {
774 s.terminated = true;
775 actions.push(Action::Complete);
776 }
777 }
778 }
779 _ => {} // Tiers 0/1/2/4/6 — no action.
780 }
781 }
782 }
783 for action in actions {
784 match action {
785 Action::Emit(h) => core_for_source.emit_or_defer(producer_id, h),
786 Action::Complete => core_for_source.complete_or_defer(producer_id),
787 Action::Error(h) => core_for_source.error_or_defer(producer_id, h),
788 }
789 }
790 });
791 // F2 /qa: Dead `source` → self-Complete (source is permanently
792 // over, so take_until has nothing to forward and the producer
793 // is finished).
794 let source_outcome = ctx.subscribe_to(source, source_sink);
795 if matches!(
796 source_outcome,
797 crate::producer::SubscribeOutcome::Dead { .. }
798 ) {
799 let core_dead = core_clone.clone();
800 let mut should_complete = false;
801 {
802 let mut s = state.lock().unwrap();
803 if !s.terminated {
804 s.terminated = true;
805 should_complete = true;
806 }
807 }
808 if should_complete {
809 core_dead.complete_or_defer(producer_id);
810 }
811 }
812
813 // Notifier sink: any DATA → terminate; ERROR → cascade.
814 let state_for_notifier = state.clone();
815 let core_for_notifier = core_clone.clone();
816 let binding_for_notifier = binding_clone.clone();
817 let notifier_sink: Sink = Arc::new(move |msgs| {
818 enum Action {
819 Complete,
820 Error(HandleId),
821 }
822 let mut action: Option<Action> = None;
823 {
824 let mut s = state_for_notifier.lock().unwrap();
825 if s.terminated {
826 return;
827 }
828 // Tier-based dispatch (canonical §4.2).
829 for m in msgs {
830 match m.tier() {
831 3 => {
832 // Any DATA on notifier → complete the producer.
833 // (Resolved alone — no payload — no action.)
834 // We don't emit notifier DATA downstream, so we
835 // don't even need to extract the handle.
836 if m.payload_handle().is_some() && !s.terminated {
837 s.terminated = true;
838 action = Some(Action::Complete);
839 break;
840 }
841 }
842 5 => {
843 // Error: cascade. Complete (payload_handle.is_none())
844 // without prior DATA: nothing to do — source
845 // continues independently.
846 if let Some(h) = m.payload_handle() {
847 if !s.terminated {
848 s.terminated = true;
849 binding_for_notifier.retain_handle(h);
850 action = Some(Action::Error(h));
851 break;
852 }
853 }
854 }
855 _ => {} // Tiers 0/1/2/4/6 — no action.
856 }
857 }
858 }
859 if let Some(a) = action {
860 match a {
861 Action::Complete => core_for_notifier.complete_or_defer(producer_id),
862 Action::Error(h) => core_for_notifier.error_or_defer(producer_id, h),
863 }
864 }
865 });
866 // F2 /qa: Dead `notifier` → ignore. Notifier signal can never
867 // fire, so take_until reduces to a passthrough of source. The
868 // source's own Complete/Error will terminate the producer
869 // normally; if source is also Dead, the source-Dead branch
870 // above already self-Completed.
871 let _ = ctx.subscribe_to(notifier, notifier_sink);
872 });
873
874 let fn_id = binding.register_producer_build(build);
875 core.register_producer(fn_id)
876 .expect("invariant: register_producer has no deps; no error variants reachable")
877}