graphrefly_operators/ops_impl.rs
1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Concrete implementations of the four subscription-managed combinators
11//! (zip / concat / race / takeUntil). Built on the
12//! [`super::producer::ProducerCtx`] substrate.
13
14// Each sink closure runs a small phase-1/phase-2 dance (lock state,
15// collect actions, drop lock, replay actions). The match-then-if
16// structure inside phase 1 is intentional for readability; collapsing
17// to `match { Some(...) if cond => ... }` obscures the lock-discipline.
18#![allow(clippy::collapsible_if, clippy::collapsible_match)]
19// `zip` is genuinely large (multi-arity tuple-pack with terminal-
20// cascade handling). Splitting would obscure the concurrent state-
21// machine logic across helpers without clear benefit.
22#![allow(clippy::too_many_lines)]
23
24use std::collections::VecDeque;
25use std::sync::{Arc, Mutex};
26
27use graphrefly_core::{Core, HandleId, NodeId, Sink};
28use smallvec::SmallVec;
29
30use super::error::OperatorFactoryError;
31use super::producer::{ProducerBinding, ProducerCtx};
32
33// =====================================================================
34// zip — pair handles N-wise across N sources
35// =====================================================================
36
37/// Per-zip-node state: one FIFO queue per source, plus a flag for each
38/// source's terminal. Lives behind `Arc<Mutex<_>>` captured by the
39/// build + sink closures.
40struct ZipState {
41 queues: Vec<VecDeque<HandleId>>,
42 completed: Vec<bool>,
43 errored: bool,
44 terminated: bool,
45}
46
47impl ZipState {
48 fn new(n: usize) -> Self {
49 Self {
50 queues: (0..n).map(|_| VecDeque::new()).collect(),
51 completed: vec![false; n],
52 errored: false,
53 terminated: false,
54 }
55 }
56}
57
58/// `zip(s1, s2, ..., sN)` — collect one value from each source, emit a
59/// tuple, repeat. Models RxJS / TS `zip`:
60///
61/// - Each upstream DATA pushes into that source's per-source queue.
62/// - When **every** queue has at least one entry, pop one from each,
63/// pack into a tuple via [`graphrefly_core::BindingBoundary::pack_tuple`],
64/// and emit on the producer.
65/// - On any source's COMPLETE: if its queue is empty, terminate the
66/// producer with COMPLETE. Otherwise continue draining; terminate
67/// when this source's queue becomes empty (zip can't produce a
68/// tuple without input from every source).
69/// - On any source's ERROR: terminate the producer with the same
70/// ERROR (first error wins, like merge per Slice C-2 D022).
71///
72/// Empty source list (`n == 0`) emits a single empty-tuple event then
73/// completes. Single source (`n == 1`) is identity-passthrough.
74///
75/// # Refcount discipline
76///
77/// Each upstream DATA handle is `retain_handle`-bumped before being
78/// pushed onto a queue (the inbound message's payload retain belongs
79/// to the wave-end-flush release path; we take our own share for the
80/// queue). On pop, component handles are passed to `pack_tuple` which
81/// must NOT consume or release them — the caller (zip) retains
82/// ownership throughout the call and releases each component handle's
83/// queue share after `pack_tuple` returns. The returned tuple handle
84/// has a pre-bumped retain (binding convention per D020 doc on
85/// [`BindingBoundary::pack_tuple`]).
86/// # Errors
87///
88/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
89/// (R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).
90pub fn zip(
91 core: &Core,
92 binding: &Arc<dyn ProducerBinding>,
93 sources: Vec<NodeId>,
94 pack_fn_id: graphrefly_core::FnId,
95) -> Result<NodeId, OperatorFactoryError> {
96 // R5.7.x — zip requires ≥1 source. Mirrors `combine::combine` (which
97 // raises the same factory-shape invariant) so all bindings can route
98 // through `operator_factory_error_to_napi` / equivalent.
99 if sources.is_empty() {
100 return Err(OperatorFactoryError::EmptySources);
101 }
102 let n = sources.len();
103 // Weak-Arc captures break the BenchBinding → registry → producer_builds
104 // → closure → strong-Arc<dyn ProducerBinding> cycle that would otherwise
105 // pin the entire graph state when the host BenchCore drops with active
106 // producer registrations. See `Core::weak_handle` doc + Slice Y close.
107
108 let build = Box::new(move |ctx: ProducerCtx<'_>| {
109 let producer_id = ctx.node_id();
110 let binding_clone = ctx.core().binding();
111 let em = ctx.emitter();
112 // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
113 let state: Arc<Mutex<ZipState>> = Arc::new(Mutex::new(ZipState::new(n)));
114
115 for (idx, &source) in sources.iter().enumerate() {
116 let state_inner = state.clone();
117 // Sinks live only while the producer is active (cleared via
118 // producer_deactivate on last-subscriber unsubscribe), so they
119 // can safely capture strong refs cloned from the upgraded weaks.
120 let core_inner = em.clone();
121 let binding_inner = binding_clone.clone();
122 let sink: Sink = Arc::new(move |msgs| {
123 // Phase 1 (lock held): mutate queues + collect actions.
124 // Phase 2 (lock released): pack tuples + re-enter Core.
125 enum PostLockAction {
126 /// Pack popped handles into a tuple, release components, emit.
127 PackAndEmit(Vec<HandleId>),
128 Complete,
129 Error(HandleId),
130 }
131 let mut post_actions: SmallVec<[PostLockAction; 4]> = SmallVec::new();
132 // Handles to release after the lock drops (P2:
133 // drain queues on terminate to avoid handle leaks).
134 let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
135 {
136 let mut s = state_inner.lock().unwrap();
137 if s.terminated {
138 return;
139 }
140 // Tier-based dispatch (canonical §4.2; see
141 // `feedback_use_tier_for_signal_routing.md`). Tier 3
142 // payload_handle.is_some() = DATA; tier 5
143 // payload_handle.is_some() = ERROR else COMPLETE.
144 for m in msgs {
145 match m.tier() {
146 3 => {
147 if let Some(h) = m.payload_handle() {
148 binding_inner.retain_handle(h);
149 s.queues[idx].push_back(h);
150 // Collect complete tuples — pack_tuple runs
151 // after the lock drops (P5).
152 while s.queues.iter().all(|q| !q.is_empty()) {
153 let popped: Vec<HandleId> = s
154 .queues
155 .iter_mut()
156 .map(|q| q.pop_front().unwrap())
157 .collect();
158 post_actions.push(PostLockAction::PackAndEmit(popped));
159 }
160 }
161 // else: Resolved on a source — no action.
162 }
163 5 => {
164 if let Some(h) = m.payload_handle() {
165 // Error
166 if !s.errored && !s.terminated {
167 s.errored = true;
168 s.terminated = true;
169 binding_inner.retain_handle(h);
170 // P2: release all remaining queued handles.
171 for q in &mut s.queues {
172 to_release.extend(q.drain(..));
173 }
174 post_actions.push(PostLockAction::Error(h));
175 }
176 } else {
177 // Complete
178 s.completed[idx] = true;
179 // If this source's queue is empty, no more
180 // tuples from it — terminate.
181 if s.queues[idx].is_empty() && !s.terminated {
182 s.terminated = true;
183 // P2: release all remaining queued handles.
184 for q in &mut s.queues {
185 to_release.extend(q.drain(..));
186 }
187 post_actions.push(PostLockAction::Complete);
188 }
189 }
190 }
191 _ => {} // Tiers 0/1/2/4/6 — no action.
192 }
193 }
194 }
195 // Release leaked queue handles outside the lock.
196 for h in to_release {
197 binding_inner.release_handle(h);
198 }
199 // Phase 2 (lock released): pack tuples + re-enter Core.
200 // P5: pack_tuple runs outside the per-zip lock to avoid
201 // deadlock if the binding's pack_tuple re-enters.
202 for action in post_actions {
203 match action {
204 PostLockAction::PackAndEmit(popped) => {
205 let tuple_h = binding_inner.pack_tuple(pack_fn_id, &popped);
206 for h in &popped {
207 binding_inner.release_handle(*h);
208 }
209 core_inner.emit_or_defer(producer_id, tuple_h);
210 }
211 PostLockAction::Complete => core_inner.complete_or_defer(producer_id),
212 PostLockAction::Error(h) => core_inner.error_or_defer(producer_id, h),
213 }
214 }
215 });
216 // F2 /qa: on Dead, synthesize the per-source Complete in
217 // zip's state machine — `s.completed[idx] = true` and (if
218 // queue is empty, which it always is at activation since
219 // DATA hasn't yet flowed) self-Complete the producer.
220 // Pre-F2 a Dead source left zip waiting on a queue that
221 // would never fill → silent wedge.
222 let outcome = ctx.subscribe_to(source, sink);
223 if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
224 let core_dead = em.clone();
225 let binding_dead = binding_clone.clone();
226 let mut should_complete = false;
227 let mut to_release: SmallVec<[HandleId; 8]> = SmallVec::new();
228 {
229 let mut s = state.lock().unwrap();
230 if !s.terminated {
231 s.completed[idx] = true;
232 if s.queues[idx].is_empty() {
233 s.terminated = true;
234 for q in &mut s.queues {
235 to_release.extend(q.drain(..));
236 }
237 should_complete = true;
238 }
239 }
240 }
241 for h in to_release {
242 binding_dead.release_handle(h);
243 }
244 if should_complete {
245 core_dead.complete_or_defer(producer_id);
246 }
247 }
248 }
249 });
250
251 let fn_id = binding.register_producer_build(build);
252 Ok(core
253 .register_producer(fn_id)
254 .expect("invariant: register_producer has no deps; no error variants reachable"))
255}
256
257// =====================================================================
258// concat — sequentially forward `first` then `second`
259// =====================================================================
260
261struct ConcatState {
262 /// 0 = forwarding `first`; 1 = `first` complete, forwarding `second`.
263 phase: u8,
264 /// Buffered DATA from `second` that arrived during phase 0 (before
265 /// `first` completed). Drained on phase transition.
266 pending: VecDeque<HandleId>,
267 /// Set to true if `second` completed during phase 0. On phase
268 /// transition, after draining `pending`, concat self-completes
269 /// because `second` won't emit Complete again (D041 / D-ops /qa D4).
270 second_completed: bool,
271 terminated: bool,
272}
273
274impl ConcatState {
275 fn new() -> Self {
276 Self {
277 phase: 0,
278 pending: VecDeque::new(),
279 second_completed: false,
280 terminated: false,
281 }
282 }
283}
284
285/// `concat(first, second)` — forward DATA from `first` until it
286/// completes, then drain any DATA `second` emitted during phase 1
287/// (buffered) and continue forwarding `second`. ERROR from either
288/// source terminates the producer with the same ERROR.
289///
290/// Subscribes to BOTH sources at activation time (matches TS impl in
291/// `extra/operators/combine.ts:332-379` so `second.subscribe` doesn't
292/// race after `first` completes). DATA from `second` during phase 0
293/// is buffered, not forwarded.
294#[must_use]
295pub fn concat(
296 core: &Core,
297 binding: &Arc<dyn ProducerBinding>,
298 first: NodeId,
299 second: NodeId,
300) -> NodeId {
301 // Weak captures break the producer-build Arc cycle (see `zip` doc).
302
303 let build = Box::new(move |ctx: ProducerCtx<'_>| {
304 let producer_id = ctx.node_id();
305 let binding_clone = ctx.core().binding();
306 let em = ctx.emitter();
307 let state: Arc<Mutex<ConcatState>> = Arc::new(Mutex::new(ConcatState::new()));
308
309 // Subscribe to second FIRST so phase-0 DATA buffering catches
310 // synchronous initial emissions. Sinks capture strong refs cloned
311 // from the upgraded weaks; sink lifetime tied to producer activation.
312 let state_for_second = state.clone();
313 let core_for_second = em.clone();
314 let binding_for_second = binding_clone.clone();
315 let second_sink: Sink = Arc::new(move |msgs| {
316 enum Action {
317 Emit(HandleId),
318 Complete,
319 Error(HandleId),
320 }
321 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
322 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
323 {
324 let mut s = state_for_second.lock().unwrap();
325 if s.terminated {
326 return;
327 }
328 // Tier-based dispatch (canonical §4.2).
329 for m in msgs {
330 match m.tier() {
331 3 => {
332 if let Some(h) = m.payload_handle() {
333 if s.phase == 0 {
334 // Buffer for later drain.
335 binding_for_second.retain_handle(h);
336 s.pending.push_back(h);
337 } else {
338 binding_for_second.retain_handle(h);
339 actions.push(Action::Emit(h));
340 }
341 }
342 // else: Resolved on second source — no action.
343 }
344 5 => {
345 if let Some(h) = m.payload_handle() {
346 // Error
347 if !s.terminated {
348 s.terminated = true;
349 binding_for_second.retain_handle(h);
350 // P2: release buffered pending handles.
351 to_release.extend(s.pending.drain(..));
352 actions.push(Action::Error(h));
353 }
354 } else {
355 // Complete
356 if s.phase == 1 && !s.terminated {
357 s.terminated = true;
358 actions.push(Action::Complete);
359 } else if s.phase == 0 {
360 // D041 / D4 fix: remember that second
361 // completed during phase 0. On phase
362 // transition, after draining `pending`,
363 // first_sink will self-complete
364 // (second's Complete fires once and
365 // won't be re-observed).
366 s.second_completed = true;
367 }
368 }
369 }
370 _ => {} // Tiers 0/1/2/4/6 — no action.
371 }
372 }
373 }
374 for h in to_release {
375 binding_for_second.release_handle(h);
376 }
377 for action in actions {
378 match action {
379 Action::Emit(h) => core_for_second.emit_or_defer(producer_id, h),
380 Action::Complete => core_for_second.complete_or_defer(producer_id),
381 Action::Error(h) => core_for_second.error_or_defer(producer_id, h),
382 }
383 }
384 });
385 // F2 /qa: Dead `second` is observed via `second_completed` flag.
386 // First-Complete drains pending and self-Completes if second
387 // already completed; same logic handles Dead second.
388 let second_outcome = ctx.subscribe_to(second, second_sink);
389 if matches!(
390 second_outcome,
391 crate::producer::SubscribeOutcome::Dead { .. }
392 ) {
393 let mut s = state.lock().unwrap();
394 s.second_completed = true;
395 // No additional action — the first-Complete path or first-Dead
396 // path below will trigger producer-Complete.
397 }
398
399 let state_for_first = state.clone();
400 let core_for_first = em.clone();
401 let binding_for_first = binding_clone.clone();
402 let first_sink: Sink = Arc::new(move |msgs| {
403 // first.Complete triggers the phase transition (handled
404 // via `s.phase = 1` + draining pending into `actions`),
405 // and may also self-complete the producer if `second`
406 // already completed during phase 0 (D041 / D4 fix).
407 enum Action {
408 Emit(HandleId),
409 Complete,
410 Error(HandleId),
411 }
412 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
413 let mut to_release: SmallVec<[HandleId; 4]> = SmallVec::new();
414 {
415 let mut s = state_for_first.lock().unwrap();
416 if s.terminated {
417 return;
418 }
419 if s.phase != 0 {
420 return; // first is done; ignore stale messages.
421 }
422 for m in msgs {
423 // Slice E /qa: defensive per-iteration `terminated`
424 // guard. The outer guard at the top of the lock
425 // block catches a `terminated` set BEFORE this
426 // batch arrived; this per-iteration check catches
427 // the case where an earlier message in the SAME
428 // batch transitioned us terminal (e.g., post-
429 // Complete the phase moved to 1, but a defensively-
430 // emitted stale `[Data]` later in the same batch
431 // would otherwise queue a useless retain that
432 // `core.emit` would discard on a terminal
433 // producer).
434 if s.terminated || s.phase != 0 {
435 break;
436 }
437 // Tier-based dispatch (canonical §4.2).
438 match m.tier() {
439 3 => {
440 if let Some(h) = m.payload_handle() {
441 binding_for_first.retain_handle(h);
442 actions.push(Action::Emit(h));
443 }
444 // else: Resolved on first source — no action.
445 }
446 5 => {
447 if let Some(h) = m.payload_handle() {
448 // Error
449 if !s.terminated {
450 s.terminated = true;
451 binding_for_first.retain_handle(h);
452 // P2: release buffered pending handles.
453 to_release.extend(s.pending.drain(..));
454 actions.push(Action::Error(h));
455 }
456 } else {
457 // Complete — phase transition: drain pending
458 // second-data, then continue forwarding from
459 // second.
460 s.phase = 1;
461 // Pending handles already retained at buffer time.
462 for h in s.pending.drain(..) {
463 actions.push(Action::Emit(h));
464 }
465 // D041 / D4 fix: if second already completed
466 // during phase 0, self-complete now (its
467 // Complete fired once and won't re-fire).
468 if s.second_completed && !s.terminated {
469 s.terminated = true;
470 actions.push(Action::Complete);
471 }
472 }
473 }
474 _ => {} // Tiers 0/1/2/4/6 — no action.
475 }
476 }
477 }
478 for h in to_release {
479 binding_for_first.release_handle(h);
480 }
481 for action in actions {
482 match action {
483 Action::Emit(h) => core_for_first.emit_or_defer(producer_id, h),
484 Action::Complete => core_for_first.complete_or_defer(producer_id),
485 Action::Error(h) => core_for_first.error_or_defer(producer_id, h),
486 }
487 }
488 });
489 // F2 /qa: Dead `first` triggers the phase transition immediately
490 // (treat as first-Complete). If `second` is also dead (or already
491 // completed in phase 0), self-Complete; else continue forwarding
492 // pending+future from second.
493 let first_outcome = ctx.subscribe_to(first, first_sink);
494 if matches!(
495 first_outcome,
496 crate::producer::SubscribeOutcome::Dead { .. }
497 ) {
498 let core_first_dead = em.clone();
499 let mut should_complete = false;
500 let mut pending_to_emit: Vec<HandleId> = Vec::new();
501 {
502 let mut s = state.lock().unwrap();
503 if !s.terminated && s.phase == 0 {
504 s.phase = 1;
505 // Drain pending second-DATA buffered during phase 0.
506 pending_to_emit.extend(s.pending.drain(..));
507 // If second already completed (or was Dead), self-Complete.
508 if s.second_completed && !s.terminated {
509 s.terminated = true;
510 should_complete = true;
511 }
512 }
513 }
514 for h in pending_to_emit {
515 core_first_dead.emit_or_defer(producer_id, h);
516 }
517 if should_complete {
518 core_first_dead.complete_or_defer(producer_id);
519 }
520 }
521 });
522
523 let fn_id = binding.register_producer_build(build);
524 core.register_producer(fn_id)
525 .expect("invariant: register_producer has no deps; no error variants reachable")
526}
527
528// =====================================================================
529// race — first source to emit DATA wins; losers are ignored
530// =====================================================================
531
532struct RaceState {
533 /// Index of the winning source, or `None` if no winner yet.
534 winner: Option<usize>,
535 /// Per-source completed flags. When all complete without a winner,
536 /// the producer completes (P4: no-winner all-complete termination).
537 completed: Vec<bool>,
538 terminated: bool,
539}
540
541impl RaceState {
542 fn new(n: usize) -> Self {
543 Self {
544 winner: None,
545 completed: vec![false; n],
546 terminated: false,
547 }
548 }
549}
550
551/// `race(s1, s2, ..., sN)` — subscribes to all sources; the first to
552/// emit DATA wins. Subsequent traffic from the winner is forwarded;
553/// losers' messages are no-ops (per Q4=(b) — losers stay subscribed
554/// but their sink callbacks short-circuit). Saves the dynamic
555/// rewiring cost of explicitly unsubscribing losers.
556///
557/// Empty source list completes immediately. Single source is
558/// identity-passthrough.
559/// # Errors
560///
561/// Returns [`OperatorFactoryError::EmptySources`] when `sources` is empty
562/// (R5.7.x — race requires ≥1 source; no-winner-possible rejected).
563pub fn race(
564 core: &Core,
565 binding: &Arc<dyn ProducerBinding>,
566 sources: Vec<NodeId>,
567) -> Result<NodeId, OperatorFactoryError> {
568 // R5.7.x — race requires ≥1 source. Mirrors `combine::combine`.
569 if sources.is_empty() {
570 return Err(OperatorFactoryError::EmptySources);
571 }
572 let n = sources.len();
573 // Weak captures break the producer-build Arc cycle (see `zip` doc).
574
575 let build = Box::new(move |ctx: ProducerCtx<'_>| {
576 let producer_id = ctx.node_id();
577 let binding_clone = ctx.core().binding();
578 let em = ctx.emitter();
579 // R5.7.x — n >= 1 guaranteed by factory-level empty-sources check.
580 let state: Arc<Mutex<RaceState>> = Arc::new(Mutex::new(RaceState::new(n)));
581
582 for (idx, &source) in sources.iter().enumerate() {
583 let state_inner = state.clone();
584 let core_inner = em.clone();
585 let binding_inner = binding_clone.clone();
586 let sink: Sink = Arc::new(move |msgs| {
587 enum Action {
588 Emit(HandleId),
589 Complete,
590 Error(HandleId),
591 }
592 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
593 {
594 let mut s = state_inner.lock().unwrap();
595 if s.terminated {
596 return;
597 }
598 // Tier-based dispatch (canonical §4.2).
599 for m in msgs {
600 match m.tier() {
601 3 => {
602 if let Some(h) = m.payload_handle() {
603 // P3: check s.winner live each iteration —
604 // this source may have just become the winner
605 // earlier in this batch.
606 if s.winner.is_none() {
607 s.winner = Some(idx);
608 binding_inner.retain_handle(h);
609 actions.push(Action::Emit(h));
610 } else if s.winner == Some(idx) {
611 binding_inner.retain_handle(h);
612 actions.push(Action::Emit(h));
613 }
614 // else: loser DATA — ignore.
615 }
616 // else: Resolved on a source — no action.
617 }
618 5 => {
619 if let Some(h) = m.payload_handle() {
620 // Error — from any source pre-winner OR from
621 // the winner cascade; loser errors post-winner
622 // are ignored.
623 if (s.winner.is_none() || s.winner == Some(idx))
624 && !s.terminated
625 {
626 s.terminated = true;
627 binding_inner.retain_handle(h);
628 actions.push(Action::Error(h));
629 }
630 } else {
631 // Complete
632 s.completed[idx] = true;
633 if s.winner == Some(idx) && !s.terminated {
634 s.terminated = true;
635 actions.push(Action::Complete);
636 } else if s.winner.is_none()
637 && s.completed.iter().all(|&c| c)
638 && !s.terminated
639 {
640 // P4: all sources completed without a
641 // winner — terminate the producer.
642 s.terminated = true;
643 actions.push(Action::Complete);
644 }
645 // else: loser complete — ignore.
646 }
647 }
648 _ => {} // Tiers 0/1/2/4/6 — no action.
649 }
650 }
651 }
652 for action in actions {
653 match action {
654 Action::Emit(h) => core_inner.emit_or_defer(producer_id, h),
655 Action::Complete => core_inner.complete_or_defer(producer_id),
656 Action::Error(h) => core_inner.error_or_defer(producer_id, h),
657 }
658 }
659 });
660 // F2 /qa: on Dead, treat as `completed[idx] = true`. If
661 // all sources are now completed without a winner, self-
662 // Complete (matches the P4 "all completed no winner"
663 // branch in the per-source sink above).
664 let outcome = ctx.subscribe_to(source, sink);
665 if matches!(outcome, crate::producer::SubscribeOutcome::Dead { .. }) {
666 let core_dead = em.clone();
667 let mut should_complete = false;
668 {
669 let mut s = state.lock().unwrap();
670 if !s.terminated && s.winner.is_none() {
671 s.completed[idx] = true;
672 if s.completed.iter().all(|&c| c) {
673 s.terminated = true;
674 should_complete = true;
675 }
676 }
677 }
678 if should_complete {
679 core_dead.complete_or_defer(producer_id);
680 }
681 }
682 }
683 });
684
685 let fn_id = binding.register_producer_build(build);
686 Ok(core
687 .register_producer(fn_id)
688 .expect("invariant: register_producer has no deps; no error variants reachable"))
689}
690
691// =====================================================================
692// takeUntil — terminate when notifier emits DATA
693// =====================================================================
694
695struct TakeUntilState {
696 terminated: bool,
697}
698
699impl TakeUntilState {
700 fn new() -> Self {
701 Self { terminated: false }
702 }
703}
704
705/// `take_until(source, notifier)` — forward DATA from `source` until
706/// `notifier` emits its first DATA, then terminate the producer with
707/// COMPLETE. Errors from either source cascade. Source COMPLETE
708/// terminates the producer.
709///
710/// Notifier DATA is consumed but never forwarded (zero-FFI on the
711/// notifier path — we don't dereference its payload, just use the
712/// emission as a signal).
713#[must_use]
714pub fn take_until(
715 core: &Core,
716 binding: &Arc<dyn ProducerBinding>,
717 source: NodeId,
718 notifier: NodeId,
719) -> NodeId {
720 // Weak captures break the producer-build Arc cycle (see `zip` doc).
721
722 let build = Box::new(move |ctx: ProducerCtx<'_>| {
723 let producer_id = ctx.node_id();
724 let binding_clone = ctx.core().binding();
725 let em = ctx.emitter();
726 let state: Arc<Mutex<TakeUntilState>> = Arc::new(Mutex::new(TakeUntilState::new()));
727
728 // Source sink: forward DATA, propagate terminals.
729 let state_for_source = state.clone();
730 let core_for_source = em.clone();
731 let binding_for_source = binding_clone.clone();
732 let source_sink: Sink = Arc::new(move |msgs| {
733 enum Action {
734 Emit(HandleId),
735 Complete,
736 Error(HandleId),
737 }
738 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
739 {
740 let mut s = state_for_source.lock().unwrap();
741 if s.terminated {
742 return;
743 }
744 // Tier-based dispatch (canonical §4.2).
745 for m in msgs {
746 match m.tier() {
747 3 => {
748 if let Some(h) = m.payload_handle() {
749 binding_for_source.retain_handle(h);
750 actions.push(Action::Emit(h));
751 }
752 // else: Resolved on source — no action.
753 }
754 5 => {
755 if let Some(h) = m.payload_handle() {
756 // Error
757 if !s.terminated {
758 s.terminated = true;
759 binding_for_source.retain_handle(h);
760 actions.push(Action::Error(h));
761 }
762 } else {
763 // Complete
764 if !s.terminated {
765 s.terminated = true;
766 actions.push(Action::Complete);
767 }
768 }
769 }
770 _ => {} // Tiers 0/1/2/4/6 — no action.
771 }
772 }
773 }
774 for action in actions {
775 match action {
776 Action::Emit(h) => core_for_source.emit_or_defer(producer_id, h),
777 Action::Complete => core_for_source.complete_or_defer(producer_id),
778 Action::Error(h) => core_for_source.error_or_defer(producer_id, h),
779 }
780 }
781 });
782 // F2 /qa: Dead `source` → self-Complete (source is permanently
783 // over, so take_until has nothing to forward and the producer
784 // is finished).
785 let source_outcome = ctx.subscribe_to(source, source_sink);
786 if matches!(
787 source_outcome,
788 crate::producer::SubscribeOutcome::Dead { .. }
789 ) {
790 let core_dead = em.clone();
791 let mut should_complete = false;
792 {
793 let mut s = state.lock().unwrap();
794 if !s.terminated {
795 s.terminated = true;
796 should_complete = true;
797 }
798 }
799 if should_complete {
800 core_dead.complete_or_defer(producer_id);
801 }
802 }
803
804 // Notifier sink: any DATA → terminate; ERROR → cascade.
805 let state_for_notifier = state.clone();
806 let core_for_notifier = em.clone();
807 let binding_for_notifier = binding_clone.clone();
808 let notifier_sink: Sink = Arc::new(move |msgs| {
809 enum Action {
810 Complete,
811 Error(HandleId),
812 }
813 let mut action: Option<Action> = None;
814 {
815 let mut s = state_for_notifier.lock().unwrap();
816 if s.terminated {
817 return;
818 }
819 // Tier-based dispatch (canonical §4.2).
820 for m in msgs {
821 match m.tier() {
822 3 => {
823 // Any DATA on notifier → complete the producer.
824 // (Resolved alone — no payload — no action.)
825 // We don't emit notifier DATA downstream, so we
826 // don't even need to extract the handle.
827 if m.payload_handle().is_some() && !s.terminated {
828 s.terminated = true;
829 action = Some(Action::Complete);
830 break;
831 }
832 }
833 5 => {
834 // Error: cascade. Complete (payload_handle.is_none())
835 // without prior DATA: nothing to do — source
836 // continues independently.
837 if let Some(h) = m.payload_handle() {
838 if !s.terminated {
839 s.terminated = true;
840 binding_for_notifier.retain_handle(h);
841 action = Some(Action::Error(h));
842 break;
843 }
844 }
845 }
846 _ => {} // Tiers 0/1/2/4/6 — no action.
847 }
848 }
849 }
850 if let Some(a) = action {
851 match a {
852 Action::Complete => core_for_notifier.complete_or_defer(producer_id),
853 Action::Error(h) => core_for_notifier.error_or_defer(producer_id, h),
854 }
855 }
856 });
857 // F2 /qa: Dead `notifier` → ignore. Notifier signal can never
858 // fire, so take_until reduces to a passthrough of source. The
859 // source's own Complete/Error will terminate the producer
860 // normally; if source is also Dead, the source-Dead branch
861 // above already self-Completed.
862 let _ = ctx.subscribe_to(notifier, notifier_sink);
863 });
864
865 let fn_id = binding.register_producer_build(build);
866 core.register_producer(fn_id)
867 .expect("invariant: register_producer has no deps; no error variants reachable")
868}