graphrefly_operators/higher_order.rs
1//! Higher-order operators (Slice E, D044) — operators whose project fn
2//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
3//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
4//! `concatMap` / `mergeMap`).
5//!
6//! All four are producer-pattern nodes (no declared deps; subscribe to
7//! the outer source from inside the build closure; emit on themselves
8//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
9//! (zip / concat / race / takeUntil); the producer substrate handles
10//! auto-cleanup of upstream + inner subscriptions on producer
11//! deactivation (D031–D038).
12//!
13//! The four flavors differ in how they handle a new outer DATA while a
14//! prior inner is still active:
15//!
16//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
17//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
18//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
19//! [`merge_map_with_concurrency`] with `Some(1)`.)
20//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
21//! up to `concurrency`. `None` = unbounded.
22//!
23//! # Inner-sub tracking (Slice E /qa refactor)
24//!
25//! Each operator owns its inner [`Subscription`]s **inside its state
26//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
27//! `producer_storage[producer_id].subs` holds only the OUTER source
28//! subscription (one entry, no positional concerns). switch_map /
29//! exhaust_map keep `Option<Subscription>` (single active inner); merge
30//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
31//! `next_inner_id`. This avoids two bugs the original positional design
32//! exposed: (a) cached-outer source firing handshake before
33//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
34//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
35//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
36//! Complete; merge/concat remove specific id on inner Complete.
37//!
38//! # Drain discipline (iterative spawn)
39//!
40//! `merge_map` could spawn the next buffered DATA from inside an
41//! inner's `on_complete` callback. For pathological pre-completed
42//! inners (synchronous Complete during the subscribe handshake),
43//! recursive spawn would grow the stack proportionally to the buffer
44//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
45//! recursion: the outermost drain owns the loop; nested `on_complete`
46//! invocations only decrement state and return.
47//!
48//! # Project closure (D044)
49//!
50//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
51//! registered through [`HigherOrderBinding::register_project`]. Bindings
52//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
53//! WASM callbacks into this Rust shape; Rust-side users register a
54//! closure directly.
55
56#![allow(clippy::collapsible_if, clippy::collapsible_match)]
57#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
58
59use std::cell::Cell;
60use std::collections::VecDeque;
61use std::sync::{Arc, Mutex, Weak};
62
63use ahash::AHashMap;
64use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink, Subscription};
65use smallvec::SmallVec;
66
67use super::producer::{ProducerBinding, ProducerCtx};
68
69// =====================================================================
70// HigherOrderBinding — closure registration for project: T -> Node<R>
71// =====================================================================
72
73/// Project closure: takes an outer DATA handle, returns the
74/// [`NodeId`] of an inner node to subscribe to. Closure may register
75/// new state/derived nodes on the fly via captured [`Core`].
76pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
77
78/// Closure-registration interface for higher-order operators.
79///
80/// Extends [`ProducerBinding`] with one method that bindings shipping
81/// higher-order operators must implement.
82pub trait HigherOrderBinding: ProducerBinding {
83 /// Register a project closure. The returned [`FnId`] is captured by
84 /// the operator's build closure and looked up via
85 /// [`Self::invoke_project`] on each outer DATA fire.
86 fn register_project(&self, project: ProjectFn) -> FnId;
87
88 /// Invoke a registered project closure with the given outer DATA
89 /// handle. Returns the inner node's [`NodeId`].
90 ///
91 /// # Panics
92 ///
93 /// Implementations panic if `fn_id` is not a registered project
94 /// closure.
95 fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
96}
97
98// =====================================================================
99// build_inner_sink — shared inner-subscription handler
100// =====================================================================
101//
102// Each higher-order op subscribes to the inner node returned by its
103// project closure. The inner sink dispatches by `Message::tier()`
104// (R1.3.7.b — central message-tier utility, never hardcode variant
105// checks for forwarding gating per CLAUDE.md design invariant 4):
106//
107// - Tier 0 (Start) — drop.
108// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
109// acknowledged divergence from TS legacy; Rust producer relies on
110// Core's wave engine to generate DIRTY on the producer's own emits.)
111// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
112// propagated through higher-order ops by design.
113// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
114// `Resolved` drops (same wave-engine rationale as Dirty).
115// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
116// per R1.2.7. Inner cache invalidation surfaces to producer
117// subscribers.
118// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
119// callbacks (`on_inner_complete` / `on_inner_error`). Differs
120// per-op (switch_map clears active inner; merge_map decrements
121// active count; etc.).
122// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
123// per R2.6.4. Inner permanent destruction propagates.
124
125fn build_inner_sink(
126 core: Core,
127 producer_binding: Arc<dyn ProducerBinding>,
128 producer_id: NodeId,
129 on_inner_complete: Arc<dyn Fn() + Send + Sync>,
130 on_inner_error: Arc<dyn Fn(HandleId) + Send + Sync>,
131) -> Sink {
132 Arc::new(move |msgs: &[Message]| {
133 enum Action {
134 Emit(HandleId),
135 Complete,
136 Error(HandleId),
137 Invalidate,
138 Teardown,
139 }
140 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
141 for m in msgs {
142 match m.tier() {
143 3 => {
144 // Tier 3: Data/Resolved. Only Data carries a payload
145 // to forward; Resolved is dropped per the
146 // wave-engine divergence above.
147 if let Some(h) = m.payload_handle() {
148 producer_binding.retain_handle(h);
149 actions.push(Action::Emit(h));
150 }
151 }
152 4 => {
153 // Tier 4: Invalidate. Forward to producer.
154 actions.push(Action::Invalidate);
155 }
156 5 => {
157 // Tier 5: Complete or Error. Semantic dispatch via
158 // op-specific callbacks. `payload_handle()` is the
159 // canonical Error-vs-Complete discriminator at this
160 // tier.
161 if let Some(h) = m.payload_handle() {
162 producer_binding.retain_handle(h);
163 actions.push(Action::Error(h));
164 } else {
165 actions.push(Action::Complete);
166 }
167 }
168 6 => {
169 // Tier 6: Teardown. Forward to producer.
170 actions.push(Action::Teardown);
171 }
172 // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
173 _ => {}
174 }
175 }
176 for action in actions {
177 match action {
178 Action::Emit(h) => core.emit(producer_id, h),
179 Action::Complete => on_inner_complete(),
180 Action::Error(h) => on_inner_error(h),
181 Action::Invalidate => core.invalidate(producer_id),
182 Action::Teardown => core.teardown(producer_id),
183 }
184 }
185 })
186}
187
188// =====================================================================
189// switch_map — cancel previous inner on each new outer DATA
190// =====================================================================
191
192struct SwitchState {
193 /// Currently-active inner subscription (if any). Cancelling the
194 /// prior inner is `Option::take` + lock-released drop.
195 inner_sub: Option<Subscription>,
196 source_done: bool,
197 terminated: bool,
198}
199
200impl SwitchState {
201 fn new() -> Self {
202 Self {
203 inner_sub: None,
204 source_done: false,
205 terminated: false,
206 }
207 }
208}
209
210/// `switch_map(source, project)` — for each outer DATA, cancel the
211/// previous inner subscription and subscribe to the inner node returned
212/// by `project(value)`. Inner DATA flows through to downstream; inner
213/// COMPLETE clears the active slot; outer COMPLETE (with no active
214/// inner) self-completes the operator.
215#[must_use]
216pub fn switch_map(
217 core: &Core,
218 binding: &Arc<dyn HigherOrderBinding>,
219 source: NodeId,
220 project: ProjectFn,
221) -> NodeId {
222 let project_fn_id = binding.register_project(project);
223 // Weak captures break the producer-build Arc cycle (see
224 // `graphrefly_operators::ops_impl::zip` doc).
225 let core_weak = core.weak_handle();
226 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
227 let producer_binding_weak: Weak<dyn ProducerBinding> =
228 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
229
230 let build = Box::new(move |ctx: ProducerCtx<'_>| {
231 let producer_id = ctx.node_id();
232 let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
233 core_weak.upgrade(),
234 binding_weak.upgrade(),
235 producer_binding_weak.upgrade(),
236 ) else {
237 return;
238 };
239 let state: Arc<Mutex<SwitchState>> = Arc::new(Mutex::new(SwitchState::new()));
240
241 let state_for_outer = state.clone();
242 let core_for_outer = core_clone.clone();
243 let binding_for_outer = binding_clone.clone();
244 let producer_binding_for_outer = producer_binding.clone();
245
246 let outer_sink: Sink = Arc::new(move |msgs| {
247 // Phase 1: classify under state lock. Track whether we
248 // performed a retain so phase 2 can safely release it
249 // without underflow on `[Data(_), Error(_)]` same-batch
250 // (P1 /qa fix).
251 #[derive(Default)]
252 struct Plan {
253 latest_outer_h: Option<HandleId>,
254 latest_retained: bool,
255 self_complete: bool,
256 self_error: Option<HandleId>,
257 }
258 let mut plan = Plan::default();
259 {
260 let mut s = state_for_outer.lock().unwrap();
261 if s.terminated {
262 return;
263 }
264 // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
265 // and canonical §4.2 ("Always use the provided tier utilities
266 // rather than hardcoding type checks"). Tier 3 carries
267 // DATA/RESOLVED — only DATA has a payload_handle, so the
268 // `payload_handle().is_some()` test discriminates within
269 // the tier without referring to specific variants. Tier 5
270 // carries COMPLETE/ERROR — same shape (Error has handle,
271 // Complete doesn't).
272 for m in msgs {
273 match m.tier() {
274 3 => {
275 if let Some(h) = m.payload_handle() {
276 // switchMap: only the LATEST outer DATA in the
277 // batch matters (TS legacy "skip to last in
278 // the batch to avoid creating + immediately
279 // discarding N-1 inners"). Track the handle;
280 // we'll project + subscribe once after the
281 // lock drops. Skipped handles need no retain.
282 plan.latest_outer_h = Some(h);
283 }
284 // else: Resolved on outer source — no action.
285 }
286 5 => {
287 if let Some(h) = m.payload_handle() {
288 // Error
289 if !s.terminated {
290 s.terminated = true;
291 binding_for_outer.retain_handle(h);
292 plan.self_error = Some(h);
293 }
294 } else {
295 // Complete
296 s.source_done = true;
297 if s.inner_sub.is_none()
298 && plan.latest_outer_h.is_none()
299 && !s.terminated
300 {
301 s.terminated = true;
302 plan.self_complete = true;
303 }
304 }
305 }
306 _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
307 }
308 }
309 if let Some(h) = plan.latest_outer_h {
310 if !s.terminated {
311 // Retain ONE share for the chosen handle —
312 // released by phase 2 after invoke_project.
313 binding_for_outer.retain_handle(h);
314 plan.latest_retained = true;
315 }
316 }
317 }
318
319 // Phase 2: cancel prior inner sub, project, subscribe.
320 // Gated on `latest_retained` so that an `Error` arriving in
321 // the same batch (which sets terminated and skips the
322 // retain) does NOT trigger an unbalanced release.
323 if plan.latest_retained {
324 let outer_h = plan
325 .latest_outer_h
326 .expect("latest_retained implies latest_outer_h is Some");
327
328 // Cancel prior inner sub (if any) lock-released.
329 let prev_inner = {
330 let mut s = state_for_outer.lock().unwrap();
331 s.inner_sub.take()
332 };
333 drop(prev_inner);
334
335 // invoke_project lock-released. Releases our retain after.
336 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
337 binding_for_outer.release_handle(outer_h);
338
339 let on_complete = make_switch_on_complete(
340 state_for_outer.clone(),
341 core_for_outer.clone(),
342 producer_id,
343 );
344 let on_error = make_switch_on_error(
345 state_for_outer.clone(),
346 core_for_outer.clone(),
347 producer_id,
348 );
349 let inner_sink = build_inner_sink(
350 core_for_outer.clone(),
351 producer_binding_for_outer.clone(),
352 producer_id,
353 on_complete,
354 on_error,
355 );
356 let inner_sub = core_for_outer.subscribe(inner_node, inner_sink);
357
358 // Install (or drop, if terminated mid-project / inner
359 // already completed and on_complete cleared the slot).
360 // Prior slot is empty (we cleared above). If the inner
361 // already self-completed during the handshake,
362 // on_complete left `inner_sub: None`. Either way,
363 // `replace` returns None and we install. Defensively
364 // preserve any unexpected leftover for lock-released drop.
365 let to_drop = {
366 let mut s = state_for_outer.lock().unwrap();
367 if s.terminated {
368 Some(inner_sub)
369 } else {
370 s.inner_sub.replace(inner_sub)
371 }
372 };
373 drop(to_drop);
374 }
375
376 if plan.self_complete {
377 core_for_outer.complete(producer_id);
378 } else if let Some(h) = plan.self_error {
379 core_for_outer.error(producer_id, h);
380 }
381 });
382
383 ctx.subscribe_to(source, outer_sink);
384 });
385
386 let fn_id = binding.register_producer_build(build);
387 core.register_producer(fn_id)
388 .expect("invariant: register_producer has no deps; no error variants reachable")
389}
390
391fn make_switch_on_complete(
392 state: Arc<Mutex<SwitchState>>,
393 core: Core,
394 producer_id: NodeId,
395) -> Arc<dyn Fn() + Send + Sync> {
396 Arc::new(move || {
397 let prev_inner;
398 let mut should_complete = false;
399 {
400 let mut s = state.lock().unwrap();
401 if s.terminated {
402 return;
403 }
404 prev_inner = s.inner_sub.take();
405 if s.source_done && !s.terminated {
406 s.terminated = true;
407 should_complete = true;
408 }
409 }
410 drop(prev_inner);
411 if should_complete {
412 core.complete(producer_id);
413 }
414 })
415}
416
417fn make_switch_on_error(
418 state: Arc<Mutex<SwitchState>>,
419 core: Core,
420 producer_id: NodeId,
421) -> Arc<dyn Fn(HandleId) + Send + Sync> {
422 Arc::new(move |h| {
423 let prev_inner;
424 {
425 let mut s = state.lock().unwrap();
426 if s.terminated {
427 return;
428 }
429 s.terminated = true;
430 prev_inner = s.inner_sub.take();
431 }
432 drop(prev_inner);
433 core.error(producer_id, h);
434 })
435}
436
437// =====================================================================
438// exhaust_map — ignore outer DATA while inner is active
439// =====================================================================
440
441struct ExhaustState {
442 inner_sub: Option<Subscription>,
443 source_done: bool,
444 terminated: bool,
445}
446
447impl ExhaustState {
448 fn new() -> Self {
449 Self {
450 inner_sub: None,
451 source_done: false,
452 terminated: false,
453 }
454 }
455}
456
457/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
458/// outer DATA while an inner subscription is active. First outer DATA
459/// per "active window" wins; subsequent DATAs are discarded until the
460/// inner completes.
461#[must_use]
462pub fn exhaust_map(
463 core: &Core,
464 binding: &Arc<dyn HigherOrderBinding>,
465 source: NodeId,
466 project: ProjectFn,
467) -> NodeId {
468 let project_fn_id = binding.register_project(project);
469 // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
470 let core_weak = core.weak_handle();
471 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
472 let producer_binding_weak: Weak<dyn ProducerBinding> =
473 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
474
475 let build = Box::new(move |ctx: ProducerCtx<'_>| {
476 let producer_id = ctx.node_id();
477 let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
478 core_weak.upgrade(),
479 binding_weak.upgrade(),
480 producer_binding_weak.upgrade(),
481 ) else {
482 return;
483 };
484 let state: Arc<Mutex<ExhaustState>> = Arc::new(Mutex::new(ExhaustState::new()));
485
486 let state_for_outer = state.clone();
487 let core_for_outer = core_clone.clone();
488 let binding_for_outer = binding_clone.clone();
489 let producer_binding_for_outer = producer_binding.clone();
490
491 let outer_sink: Sink = Arc::new(move |msgs| {
492 #[derive(Default)]
493 struct Plan {
494 first_outer_h: Option<HandleId>,
495 first_retained: bool,
496 self_complete: bool,
497 self_error: Option<HandleId>,
498 }
499 let mut plan = Plan::default();
500 {
501 let mut s = state_for_outer.lock().unwrap();
502 if s.terminated {
503 return;
504 }
505 // Tier-based dispatch (canonical §4.2; see
506 // `feedback_use_tier_for_signal_routing.md`).
507 for m in msgs {
508 match m.tier() {
509 3 => {
510 if let Some(h) = m.payload_handle() {
511 // First DATA per active window wins.
512 // Remember the first one we accept; subsequent
513 // batch entries (or DATAs after) drop.
514 if s.inner_sub.is_none() && plan.first_outer_h.is_none() {
515 binding_for_outer.retain_handle(h);
516 plan.first_outer_h = Some(h);
517 plan.first_retained = true;
518 }
519 }
520 // else: Resolved on outer source — no action.
521 }
522 5 => {
523 if let Some(h) = m.payload_handle() {
524 // Error
525 if !s.terminated {
526 s.terminated = true;
527 // Release any retain we took for
528 // first_outer_h — we won't be projecting it.
529 if plan.first_retained {
530 if let Some(h0) = plan.first_outer_h.take() {
531 binding_for_outer.release_handle(h0);
532 plan.first_retained = false;
533 }
534 }
535 binding_for_outer.retain_handle(h);
536 plan.self_error = Some(h);
537 }
538 } else {
539 // Complete
540 s.source_done = true;
541 if s.inner_sub.is_none()
542 && plan.first_outer_h.is_none()
543 && !s.terminated
544 {
545 s.terminated = true;
546 plan.self_complete = true;
547 }
548 }
549 }
550 _ => {} // Tiers 0/1/2/4/6 — no action.
551 }
552 }
553 }
554
555 if plan.first_retained {
556 let outer_h = plan
557 .first_outer_h
558 .expect("first_retained implies first_outer_h is Some");
559 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
560 binding_for_outer.release_handle(outer_h);
561
562 let on_complete = make_exhaust_on_complete(
563 state_for_outer.clone(),
564 core_for_outer.clone(),
565 producer_id,
566 );
567 let on_error = make_exhaust_on_error(
568 state_for_outer.clone(),
569 core_for_outer.clone(),
570 producer_id,
571 );
572 let inner_sink = build_inner_sink(
573 core_for_outer.clone(),
574 producer_binding_for_outer.clone(),
575 producer_id,
576 on_complete,
577 on_error,
578 );
579 // If inner already pre-completed during the handshake,
580 // on_complete already cleared `inner_sub`. We `replace`
581 // either way; in the synchronous-completion path,
582 // `inner_sub` was None so our just-subscribed (and
583 // already-dead) sub is dropped on the next iteration.
584 let inner_sub = core_for_outer.subscribe(inner_node, inner_sink);
585 let to_drop = {
586 let mut s = state_for_outer.lock().unwrap();
587 if s.terminated {
588 Some(inner_sub)
589 } else {
590 s.inner_sub.replace(inner_sub)
591 }
592 };
593 drop(to_drop);
594 }
595
596 if plan.self_complete {
597 core_for_outer.complete(producer_id);
598 } else if let Some(h) = plan.self_error {
599 core_for_outer.error(producer_id, h);
600 }
601 });
602
603 ctx.subscribe_to(source, outer_sink);
604 });
605
606 let fn_id = binding.register_producer_build(build);
607 core.register_producer(fn_id)
608 .expect("invariant: register_producer has no deps; no error variants reachable")
609}
610
611fn make_exhaust_on_complete(
612 state: Arc<Mutex<ExhaustState>>,
613 core: Core,
614 producer_id: NodeId,
615) -> Arc<dyn Fn() + Send + Sync> {
616 Arc::new(move || {
617 let prev_inner;
618 let mut should_complete = false;
619 {
620 let mut s = state.lock().unwrap();
621 if s.terminated {
622 return;
623 }
624 prev_inner = s.inner_sub.take();
625 if s.source_done && !s.terminated {
626 s.terminated = true;
627 should_complete = true;
628 }
629 }
630 drop(prev_inner);
631 if should_complete {
632 core.complete(producer_id);
633 }
634 })
635}
636
637fn make_exhaust_on_error(
638 state: Arc<Mutex<ExhaustState>>,
639 core: Core,
640 producer_id: NodeId,
641) -> Arc<dyn Fn(HandleId) + Send + Sync> {
642 Arc::new(move |h| {
643 let prev_inner;
644 {
645 let mut s = state.lock().unwrap();
646 if s.terminated {
647 return;
648 }
649 s.terminated = true;
650 prev_inner = s.inner_sub.take();
651 }
652 drop(prev_inner);
653 core.error(producer_id, h);
654 })
655}
656
657// =====================================================================
658// merge_map — parallel inners up to `concurrency` cap
659// concat_map — wrapper for concurrency = Some(1)
660// =====================================================================
661
662thread_local! {
663 /// Per-thread guard preventing recursive drain of `MergeMapState`.
664 /// When an `on_complete` fires synchronously inside a
665 /// `Core::subscribe` handshake (pre-completed inner), it must not
666 /// re-enter the drain loop — instead it just decrements + removes
667 /// its sub and returns. The outermost drain owns the loop and
668 /// observes the freed-up cap on its next iteration.
669 static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
670}
671
672struct MergeMapState {
673 /// Number of currently-active inner subscriptions (spawned but
674 /// not yet completed/errored).
675 active: u32,
676 /// Outer DATAs waiting because `active >= concurrency`. Each
677 /// handle has one retain share (taken on enqueue, released on
678 /// dequeue + project).
679 buffer: VecDeque<HandleId>,
680 /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
681 /// inner's `on_complete` removes its entry by id (lock-released
682 /// drop).
683 inner_subs: AHashMap<u64, Subscription>,
684 /// Pending inner ids (between `subscribe` call and
685 /// `inner_subs.insert`). Used to detect synchronous-completion:
686 /// if `on_complete` runs during `subscribe`, it removes from
687 /// `pending_inner_ids`; the post-subscribe code checks the set
688 /// and skips inserting the now-dead sub.
689 pending_inner_ids: ahash::AHashSet<u64>,
690 next_inner_id: u64,
691 source_done: bool,
692 terminated: bool,
693}
694
695impl MergeMapState {
696 fn new() -> Self {
697 Self {
698 active: 0,
699 buffer: VecDeque::new(),
700 inner_subs: AHashMap::new(),
701 pending_inner_ids: ahash::AHashSet::new(),
702 next_inner_id: 0,
703 source_done: false,
704 terminated: false,
705 }
706 }
707}
708
709/// `merge_map(source, project)` — unbounded concurrency variant.
710/// Equivalent to [`merge_map_with_concurrency`] with `None`.
711#[must_use]
712pub fn merge_map(
713 core: &Core,
714 binding: &Arc<dyn HigherOrderBinding>,
715 source: NodeId,
716 project: ProjectFn,
717) -> NodeId {
718 merge_map_with_concurrency(core, binding, source, project, None)
719}
720
721/// `concat_map(source, project)` — sequential queue variant.
722/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
723/// outer DATA is enqueued and processed one-at-a-time.
724#[must_use]
725pub fn concat_map(
726 core: &Core,
727 binding: &Arc<dyn HigherOrderBinding>,
728 source: NodeId,
729 project: ProjectFn,
730) -> NodeId {
731 merge_map_with_concurrency(core, binding, source, project, Some(1))
732}
733
734/// `merge_map_with_concurrency(source, project, concurrency)` — projects
735/// each outer DATA to an inner Node and subscribes in parallel.
736///
737/// `concurrency`:
738/// - `None` → unbounded (every outer DATA spawns immediately).
739/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
740/// buffer until an active inner completes.
741///
742/// Per D043 / D040, this matches the
743/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
744/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
745/// (would buffer everything indefinitely without ever spawning) but
746/// accepted at the type level.
747#[must_use]
748pub fn merge_map_with_concurrency(
749 core: &Core,
750 binding: &Arc<dyn HigherOrderBinding>,
751 source: NodeId,
752 project: ProjectFn,
753 concurrency: Option<u32>,
754) -> NodeId {
755 let project_fn_id = binding.register_project(project);
756 // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
757 let core_weak = core.weak_handle();
758 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
759 let producer_binding_weak: Weak<dyn ProducerBinding> =
760 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
761
762 let build = Box::new(move |ctx: ProducerCtx<'_>| {
763 let producer_id = ctx.node_id();
764 let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
765 core_weak.upgrade(),
766 binding_weak.upgrade(),
767 producer_binding_weak.upgrade(),
768 ) else {
769 return;
770 };
771 let state: Arc<Mutex<MergeMapState>> = Arc::new(Mutex::new(MergeMapState::new()));
772
773 let state_for_outer = state.clone();
774 let core_for_outer = core_clone.clone();
775 let binding_for_outer = binding_clone.clone();
776 let producer_binding_for_outer = producer_binding.clone();
777
778 let outer_sink: Sink = Arc::new(move |msgs| {
779 // Phase 1: enqueue DATAs into the buffer (always — drain
780 // loop dequeues + spawns up to cap), classify terminal
781 // signals.
782 let mut error_action: Option<HandleId> = None;
783 let mut self_complete_now = false;
784 {
785 let mut s = state_for_outer.lock().unwrap();
786 if s.terminated {
787 return;
788 }
789 // Tier-based dispatch (canonical §4.2; see
790 // `feedback_use_tier_for_signal_routing.md`).
791 for m in msgs {
792 match m.tier() {
793 3 => {
794 if let Some(h) = m.payload_handle() {
795 // Retain on enqueue — released by drain
796 // after invoke_project.
797 binding_for_outer.retain_handle(h);
798 s.buffer.push_back(h);
799 }
800 // else: Resolved on outer source — no action.
801 }
802 5 => {
803 if let Some(h) = m.payload_handle() {
804 // Error
805 if !s.terminated {
806 s.terminated = true;
807 binding_for_outer.retain_handle(h);
808 while let Some(q) = s.buffer.pop_front() {
809 binding_for_outer.release_handle(q);
810 }
811 error_action = Some(h);
812 }
813 } else {
814 // Complete
815 s.source_done = true;
816 if s.active == 0 && s.buffer.is_empty() && !s.terminated {
817 s.terminated = true;
818 self_complete_now = true;
819 }
820 }
821 }
822 _ => {} // Tiers 0/1/2/4/6 — no action.
823 }
824 }
825 }
826
827 if let Some(h) = error_action {
828 core_for_outer.error(producer_id, h);
829 return;
830 }
831 if self_complete_now {
832 core_for_outer.complete(producer_id);
833 return;
834 }
835
836 // Phase 2: drain buffer iteratively up to concurrency cap.
837 drain_merge_buffer(
838 &state_for_outer,
839 &core_for_outer,
840 &binding_for_outer,
841 &producer_binding_for_outer,
842 producer_id,
843 project_fn_id,
844 concurrency,
845 );
846 });
847
848 ctx.subscribe_to(source, outer_sink);
849 });
850
851 let fn_id = binding.register_producer_build(build);
852 core.register_producer(fn_id)
853 .expect("invariant: register_producer has no deps; no error variants reachable")
854}
855
856/// Iteratively pop from `buffer` and spawn inners until cap is reached
857/// or buffer is empty. Re-entrance from a nested `on_complete` is
858/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
859/// the drain loop and picks up cap-frees on subsequent iterations.
860fn drain_merge_buffer(
861 state: &Arc<Mutex<MergeMapState>>,
862 core: &Core,
863 binding: &Arc<dyn HigherOrderBinding>,
864 producer_binding: &Arc<dyn ProducerBinding>,
865 producer_id: NodeId,
866 project_fn_id: FnId,
867 concurrency: Option<u32>,
868) {
869 if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
870 // Already draining on this thread; outer loop will drain
871 // remaining buffer.
872 return;
873 }
874
875 loop {
876 let h_and_id;
877 let mut should_self_complete = false;
878 {
879 let mut s = state.lock().unwrap();
880 if s.terminated {
881 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
882 return;
883 }
884 let allowed = match concurrency {
885 None => true,
886 Some(n) => s.active < n,
887 };
888 if !allowed {
889 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
890 return;
891 }
892 if let Some(h) = s.buffer.pop_front() {
893 s.active += 1;
894 let id = s.next_inner_id;
895 s.next_inner_id += 1;
896 s.pending_inner_ids.insert(id);
897 h_and_id = Some((h, id));
898 } else if s.source_done && s.active == 0 && !s.terminated {
899 s.terminated = true;
900 should_self_complete = true;
901 h_and_id = None;
902 } else {
903 h_and_id = None;
904 }
905 }
906
907 if should_self_complete {
908 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
909 core.complete(producer_id);
910 return;
911 }
912
913 let Some((outer_h, inner_id)) = h_and_id else {
914 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
915 return;
916 };
917
918 // Spawn lock-released.
919 let inner_node = binding.invoke_project(project_fn_id, outer_h);
920 binding.release_handle(outer_h);
921
922 let on_complete = make_merge_on_complete(
923 state.clone(),
924 core.clone(),
925 binding.clone(),
926 producer_binding.clone(),
927 producer_id,
928 project_fn_id,
929 inner_id,
930 concurrency,
931 );
932 let on_error =
933 make_merge_on_error(state.clone(), core.clone(), binding.clone(), producer_id);
934 let inner_sink = build_inner_sink(
935 core.clone(),
936 producer_binding.clone(),
937 producer_id,
938 on_complete,
939 on_error,
940 );
941 let inner_sub = core.subscribe(inner_node, inner_sink);
942
943 // Decide whether to install the sub: if `on_complete` fired
944 // synchronously inside `subscribe` (pre-completed inner), it
945 // already removed `inner_id` from `pending_inner_ids`.
946 let to_drop = {
947 let mut s = state.lock().unwrap();
948 if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
949 Some(inner_sub)
950 } else {
951 s.inner_subs.insert(inner_id, inner_sub);
952 None
953 }
954 };
955 drop(to_drop);
956
957 // Loop continues — pops next from buffer or returns.
958 }
959}
960
961fn make_merge_on_complete(
962 state: Arc<Mutex<MergeMapState>>,
963 core: Core,
964 binding: Arc<dyn HigherOrderBinding>,
965 producer_binding: Arc<dyn ProducerBinding>,
966 producer_id: NodeId,
967 project_fn_id: FnId,
968 this_inner_id: u64,
969 concurrency: Option<u32>,
970) -> Arc<dyn Fn() + Send + Sync> {
971 Arc::new(move || {
972 let removed_sub;
973 {
974 let mut s = state.lock().unwrap();
975 if s.terminated {
976 return;
977 }
978 s.active -= 1;
979 // Two cases:
980 // (a) Sync-completion during subscribe: `pending_inner_ids`
981 // still contains us; `inner_subs` does not. Remove from
982 // pending so the post-subscribe insert sees we're done
983 // and skips installing the dead sub.
984 // (b) Async completion: `inner_subs` contains us. Remove
985 // and drop lock-released.
986 s.pending_inner_ids.remove(&this_inner_id);
987 removed_sub = s.inner_subs.remove(&this_inner_id);
988 }
989 drop(removed_sub);
990
991 // Try to drain — if we're nested inside an outer drain loop
992 // (sync-completion path), this is a no-op and the outer drain
993 // continues.
994 drain_merge_buffer(
995 &state,
996 &core,
997 &binding,
998 &producer_binding,
999 producer_id,
1000 project_fn_id,
1001 concurrency,
1002 );
1003 })
1004}
1005
1006/// Inner-error path for merge_map: terminates the producer + drains
1007/// all inner subs (lock-released) + releases buffered DATA handles.
1008/// Captures `binding` so we can release buffered handles' retains
1009/// (taken on enqueue in the outer_sink's Data branch); without this,
1010/// inner-error before all buffered DATAs project would leak refcount
1011/// shares.
1012fn make_merge_on_error(
1013 state: Arc<Mutex<MergeMapState>>,
1014 core: Core,
1015 binding: Arc<dyn HigherOrderBinding>,
1016 producer_id: NodeId,
1017) -> Arc<dyn Fn(HandleId) + Send + Sync> {
1018 Arc::new(move |h| {
1019 let removed_subs;
1020 let buffered_to_release;
1021 {
1022 let mut s = state.lock().unwrap();
1023 if s.terminated {
1024 return;
1025 }
1026 s.terminated = true;
1027 removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1028 s.pending_inner_ids.clear();
1029 buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1030 }
1031 drop(removed_subs);
1032 for h_b in buffered_to_release {
1033 binding.release_handle(h_b);
1034 }
1035 core.error(producer_id, h);
1036 })
1037}
1038
1039// =====================================================================
1040// Send + Sync compile-time asserts (Slice E /qa)
1041// =====================================================================
1042
1043const _: fn() = || {
1044 fn assert_send_sync<T: Send + Sync>() {}
1045 assert_send_sync::<SwitchState>();
1046 assert_send_sync::<ExhaustState>();
1047 assert_send_sync::<MergeMapState>();
1048 assert_send_sync::<ProjectFn>();
1049};