Skip to main content

graphrefly_operators/
higher_order.rs

1//! Higher-order operators (Slice E, D044) — operators whose project fn
2//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
3//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
4//! `concatMap` / `mergeMap`).
5//!
6//! All four are producer-pattern nodes (no declared deps; subscribe to
7//! the outer source from inside the build closure; emit on themselves
8//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
9//! (zip / concat / race / takeUntil); the producer substrate handles
10//! auto-cleanup of upstream + inner subscriptions on producer
11//! deactivation (D031–D038).
12//!
13//! The four flavors differ in how they handle a new outer DATA while a
14//! prior inner is still active:
15//!
16//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
17//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
18//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
19//!   [`merge_map_with_concurrency`] with `Some(1)`.)
20//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
21//!   up to `concurrency`. `None` = unbounded.
22//!
23//! # Inner-sub tracking (Slice E /qa refactor)
24//!
25//! Each operator owns its inner [`Subscription`]s **inside its state
26//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
27//! `producer_storage[producer_id].subs` holds only the OUTER source
28//! subscription (one entry, no positional concerns). switch_map /
29//! exhaust_map keep `Option<Subscription>` (single active inner); merge
30//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
31//! `next_inner_id`. This avoids two bugs the original positional design
32//! exposed: (a) cached-outer source firing handshake before
33//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
34//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
35//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
36//! Complete; merge/concat remove specific id on inner Complete.
37//!
38//! # Drain discipline (iterative spawn)
39//!
40//! `merge_map` could spawn the next buffered DATA from inside an
41//! inner's `on_complete` callback. For pathological pre-completed
42//! inners (synchronous Complete during the subscribe handshake),
43//! recursive spawn would grow the stack proportionally to the buffer
44//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
45//! recursion: the outermost drain owns the loop; nested `on_complete`
46//! invocations only decrement state and return.
47//!
48//! # Project closure (D044)
49//!
50//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
51//! registered through [`HigherOrderBinding::register_project`]. Bindings
52//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
53//! WASM callbacks into this Rust shape; Rust-side users register a
54//! closure directly.
55
56#![allow(clippy::collapsible_if, clippy::collapsible_match)]
57#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
58
59use std::cell::Cell;
60use std::collections::VecDeque;
61use std::sync::{Arc, Mutex, Weak};
62
63use ahash::AHashMap;
64use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink, Subscription};
65use smallvec::SmallVec;
66
67use super::producer::{ProducerBinding, ProducerCtx};
68
69// =====================================================================
70// HigherOrderBinding — closure registration for project: T -> Node<R>
71// =====================================================================
72
73/// Project closure: takes an outer DATA handle, returns the
74/// [`NodeId`] of an inner node to subscribe to. Closure may register
75/// new state/derived nodes on the fly via captured [`Core`].
76pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
77
78/// Closure-registration interface for higher-order operators.
79///
80/// Extends [`ProducerBinding`] with one method that bindings shipping
81/// higher-order operators must implement.
82pub trait HigherOrderBinding: ProducerBinding {
83    /// Register a project closure. The returned [`FnId`] is captured by
84    /// the operator's build closure and looked up via
85    /// [`Self::invoke_project`] on each outer DATA fire.
86    fn register_project(&self, project: ProjectFn) -> FnId;
87
88    /// Invoke a registered project closure with the given outer DATA
89    /// handle. Returns the inner node's [`NodeId`].
90    ///
91    /// # Panics
92    ///
93    /// Implementations panic if `fn_id` is not a registered project
94    /// closure.
95    fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
96}
97
98// =====================================================================
99// build_inner_sink — shared inner-subscription handler
100// =====================================================================
101//
102// Each higher-order op subscribes to the inner node returned by its
103// project closure. The inner sink dispatches by `Message::tier()`
104// (R1.3.7.b — central message-tier utility, never hardcode variant
105// checks for forwarding gating per CLAUDE.md design invariant 4):
106//
107// - Tier 0 (Start) — drop.
108// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
109//   acknowledged divergence from TS legacy; Rust producer relies on
110//   Core's wave engine to generate DIRTY on the producer's own emits.)
111// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
112//   propagated through higher-order ops by design.
113// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
114//   `Resolved` drops (same wave-engine rationale as Dirty).
115// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
116//   per R1.2.7. Inner cache invalidation surfaces to producer
117//   subscribers.
118// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
119//   callbacks (`on_inner_complete` / `on_inner_error`). Differs
120//   per-op (switch_map clears active inner; merge_map decrements
121//   active count; etc.).
122// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
123//   per R2.6.4. Inner permanent destruction propagates.
124
125fn build_inner_sink(
126    core: Core,
127    producer_binding: Arc<dyn ProducerBinding>,
128    producer_id: NodeId,
129    on_inner_complete: Arc<dyn Fn() + Send + Sync>,
130    on_inner_error: Arc<dyn Fn(HandleId) + Send + Sync>,
131) -> Sink {
132    Arc::new(move |msgs: &[Message]| {
133        enum Action {
134            Emit(HandleId),
135            Complete,
136            Error(HandleId),
137            Invalidate,
138            Teardown,
139        }
140        let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
141        for m in msgs {
142            match m.tier() {
143                3 => {
144                    // Tier 3: Data/Resolved. Only Data carries a payload
145                    // to forward; Resolved is dropped per the
146                    // wave-engine divergence above.
147                    if let Some(h) = m.payload_handle() {
148                        producer_binding.retain_handle(h);
149                        actions.push(Action::Emit(h));
150                    }
151                }
152                4 => {
153                    // Tier 4: Invalidate. Forward to producer.
154                    actions.push(Action::Invalidate);
155                }
156                5 => {
157                    // Tier 5: Complete or Error. Semantic dispatch via
158                    // op-specific callbacks. `payload_handle()` is the
159                    // canonical Error-vs-Complete discriminator at this
160                    // tier.
161                    if let Some(h) = m.payload_handle() {
162                        producer_binding.retain_handle(h);
163                        actions.push(Action::Error(h));
164                    } else {
165                        actions.push(Action::Complete);
166                    }
167                }
168                6 => {
169                    // Tier 6: Teardown. Forward to producer.
170                    actions.push(Action::Teardown);
171                }
172                // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
173                _ => {}
174            }
175        }
176        for action in actions {
177            match action {
178                Action::Emit(h) => core.emit(producer_id, h),
179                Action::Complete => on_inner_complete(),
180                Action::Error(h) => on_inner_error(h),
181                Action::Invalidate => core.invalidate(producer_id),
182                Action::Teardown => core.teardown(producer_id),
183            }
184        }
185    })
186}
187
188// =====================================================================
189// switch_map — cancel previous inner on each new outer DATA
190// =====================================================================
191
192struct SwitchState {
193    /// Currently-active inner subscription (if any). Cancelling the
194    /// prior inner is `Option::take` + lock-released drop.
195    inner_sub: Option<Subscription>,
196    source_done: bool,
197    terminated: bool,
198}
199
200impl SwitchState {
201    fn new() -> Self {
202        Self {
203            inner_sub: None,
204            source_done: false,
205            terminated: false,
206        }
207    }
208}
209
210/// `switch_map(source, project)` — for each outer DATA, cancel the
211/// previous inner subscription and subscribe to the inner node returned
212/// by `project(value)`. Inner DATA flows through to downstream; inner
213/// COMPLETE clears the active slot; outer COMPLETE (with no active
214/// inner) self-completes the operator.
215#[must_use]
216pub fn switch_map(
217    core: &Core,
218    binding: &Arc<dyn HigherOrderBinding>,
219    source: NodeId,
220    project: ProjectFn,
221) -> NodeId {
222    let project_fn_id = binding.register_project(project);
223    // Weak captures break the producer-build Arc cycle (see
224    // `graphrefly_operators::ops_impl::zip` doc).
225    let core_weak = core.weak_handle();
226    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
227    let producer_binding_weak: Weak<dyn ProducerBinding> =
228        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
229
230    let build = Box::new(move |ctx: ProducerCtx<'_>| {
231        let producer_id = ctx.node_id();
232        let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
233            core_weak.upgrade(),
234            binding_weak.upgrade(),
235            producer_binding_weak.upgrade(),
236        ) else {
237            return;
238        };
239        let state: Arc<Mutex<SwitchState>> = Arc::new(Mutex::new(SwitchState::new()));
240
241        let state_for_outer = state.clone();
242        let core_for_outer = core_clone.clone();
243        let binding_for_outer = binding_clone.clone();
244        let producer_binding_for_outer = producer_binding.clone();
245
246        let outer_sink: Sink = Arc::new(move |msgs| {
247            // Phase 1: classify under state lock. Track whether we
248            // performed a retain so phase 2 can safely release it
249            // without underflow on `[Data(_), Error(_)]` same-batch
250            // (P1 /qa fix).
251            #[derive(Default)]
252            struct Plan {
253                latest_outer_h: Option<HandleId>,
254                latest_retained: bool,
255                self_complete: bool,
256                self_error: Option<HandleId>,
257            }
258            let mut plan = Plan::default();
259            {
260                let mut s = state_for_outer.lock().unwrap();
261                if s.terminated {
262                    return;
263                }
264                // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
265                // and canonical §4.2 ("Always use the provided tier utilities
266                // rather than hardcoding type checks"). Tier 3 carries
267                // DATA/RESOLVED — only DATA has a payload_handle, so the
268                // `payload_handle().is_some()` test discriminates within
269                // the tier without referring to specific variants. Tier 5
270                // carries COMPLETE/ERROR — same shape (Error has handle,
271                // Complete doesn't).
272                for m in msgs {
273                    match m.tier() {
274                        3 => {
275                            if let Some(h) = m.payload_handle() {
276                                // switchMap: only the LATEST outer DATA in the
277                                // batch matters (TS legacy "skip to last in
278                                // the batch to avoid creating + immediately
279                                // discarding N-1 inners"). Track the handle;
280                                // we'll project + subscribe once after the
281                                // lock drops. Skipped handles need no retain.
282                                plan.latest_outer_h = Some(h);
283                            }
284                            // else: Resolved on outer source — no action.
285                        }
286                        5 => {
287                            if let Some(h) = m.payload_handle() {
288                                // Error
289                                if !s.terminated {
290                                    s.terminated = true;
291                                    binding_for_outer.retain_handle(h);
292                                    plan.self_error = Some(h);
293                                }
294                            } else {
295                                // Complete
296                                s.source_done = true;
297                                if s.inner_sub.is_none()
298                                    && plan.latest_outer_h.is_none()
299                                    && !s.terminated
300                                {
301                                    s.terminated = true;
302                                    plan.self_complete = true;
303                                }
304                            }
305                        }
306                        _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
307                    }
308                }
309                if let Some(h) = plan.latest_outer_h {
310                    if !s.terminated {
311                        // Retain ONE share for the chosen handle —
312                        // released by phase 2 after invoke_project.
313                        binding_for_outer.retain_handle(h);
314                        plan.latest_retained = true;
315                    }
316                }
317            }
318
319            // Phase 2: cancel prior inner sub, project, subscribe.
320            // Gated on `latest_retained` so that an `Error` arriving in
321            // the same batch (which sets terminated and skips the
322            // retain) does NOT trigger an unbalanced release.
323            if plan.latest_retained {
324                let outer_h = plan
325                    .latest_outer_h
326                    .expect("latest_retained implies latest_outer_h is Some");
327
328                // Cancel prior inner sub (if any) lock-released.
329                let prev_inner = {
330                    let mut s = state_for_outer.lock().unwrap();
331                    s.inner_sub.take()
332                };
333                drop(prev_inner);
334
335                // invoke_project lock-released. Releases our retain after.
336                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
337                binding_for_outer.release_handle(outer_h);
338
339                let on_complete = make_switch_on_complete(
340                    state_for_outer.clone(),
341                    core_for_outer.clone(),
342                    producer_id,
343                );
344                let on_error = make_switch_on_error(
345                    state_for_outer.clone(),
346                    core_for_outer.clone(),
347                    producer_id,
348                );
349                let inner_sink = build_inner_sink(
350                    core_for_outer.clone(),
351                    producer_binding_for_outer.clone(),
352                    producer_id,
353                    on_complete,
354                    on_error,
355                );
356                let inner_sub = core_for_outer.subscribe(inner_node, inner_sink);
357
358                // Install (or drop, if terminated mid-project / inner
359                // already completed and on_complete cleared the slot).
360                // Prior slot is empty (we cleared above). If the inner
361                // already self-completed during the handshake,
362                // on_complete left `inner_sub: None`. Either way,
363                // `replace` returns None and we install. Defensively
364                // preserve any unexpected leftover for lock-released drop.
365                let to_drop = {
366                    let mut s = state_for_outer.lock().unwrap();
367                    if s.terminated {
368                        Some(inner_sub)
369                    } else {
370                        s.inner_sub.replace(inner_sub)
371                    }
372                };
373                drop(to_drop);
374            }
375
376            if plan.self_complete {
377                core_for_outer.complete(producer_id);
378            } else if let Some(h) = plan.self_error {
379                core_for_outer.error(producer_id, h);
380            }
381        });
382
383        ctx.subscribe_to(source, outer_sink);
384    });
385
386    let fn_id = binding.register_producer_build(build);
387    core.register_producer(fn_id)
388        .expect("invariant: register_producer has no deps; no error variants reachable")
389}
390
391fn make_switch_on_complete(
392    state: Arc<Mutex<SwitchState>>,
393    core: Core,
394    producer_id: NodeId,
395) -> Arc<dyn Fn() + Send + Sync> {
396    Arc::new(move || {
397        let prev_inner;
398        let mut should_complete = false;
399        {
400            let mut s = state.lock().unwrap();
401            if s.terminated {
402                return;
403            }
404            prev_inner = s.inner_sub.take();
405            if s.source_done && !s.terminated {
406                s.terminated = true;
407                should_complete = true;
408            }
409        }
410        drop(prev_inner);
411        if should_complete {
412            core.complete(producer_id);
413        }
414    })
415}
416
417fn make_switch_on_error(
418    state: Arc<Mutex<SwitchState>>,
419    core: Core,
420    producer_id: NodeId,
421) -> Arc<dyn Fn(HandleId) + Send + Sync> {
422    Arc::new(move |h| {
423        let prev_inner;
424        {
425            let mut s = state.lock().unwrap();
426            if s.terminated {
427                return;
428            }
429            s.terminated = true;
430            prev_inner = s.inner_sub.take();
431        }
432        drop(prev_inner);
433        core.error(producer_id, h);
434    })
435}
436
437// =====================================================================
438// exhaust_map — ignore outer DATA while inner is active
439// =====================================================================
440
441struct ExhaustState {
442    inner_sub: Option<Subscription>,
443    source_done: bool,
444    terminated: bool,
445}
446
447impl ExhaustState {
448    fn new() -> Self {
449        Self {
450            inner_sub: None,
451            source_done: false,
452            terminated: false,
453        }
454    }
455}
456
457/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
458/// outer DATA while an inner subscription is active. First outer DATA
459/// per "active window" wins; subsequent DATAs are discarded until the
460/// inner completes.
461#[must_use]
462pub fn exhaust_map(
463    core: &Core,
464    binding: &Arc<dyn HigherOrderBinding>,
465    source: NodeId,
466    project: ProjectFn,
467) -> NodeId {
468    let project_fn_id = binding.register_project(project);
469    // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
470    let core_weak = core.weak_handle();
471    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
472    let producer_binding_weak: Weak<dyn ProducerBinding> =
473        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
474
475    let build = Box::new(move |ctx: ProducerCtx<'_>| {
476        let producer_id = ctx.node_id();
477        let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
478            core_weak.upgrade(),
479            binding_weak.upgrade(),
480            producer_binding_weak.upgrade(),
481        ) else {
482            return;
483        };
484        let state: Arc<Mutex<ExhaustState>> = Arc::new(Mutex::new(ExhaustState::new()));
485
486        let state_for_outer = state.clone();
487        let core_for_outer = core_clone.clone();
488        let binding_for_outer = binding_clone.clone();
489        let producer_binding_for_outer = producer_binding.clone();
490
491        let outer_sink: Sink = Arc::new(move |msgs| {
492            #[derive(Default)]
493            struct Plan {
494                first_outer_h: Option<HandleId>,
495                first_retained: bool,
496                self_complete: bool,
497                self_error: Option<HandleId>,
498            }
499            let mut plan = Plan::default();
500            {
501                let mut s = state_for_outer.lock().unwrap();
502                if s.terminated {
503                    return;
504                }
505                // Tier-based dispatch (canonical §4.2; see
506                // `feedback_use_tier_for_signal_routing.md`).
507                for m in msgs {
508                    match m.tier() {
509                        3 => {
510                            if let Some(h) = m.payload_handle() {
511                                // First DATA per active window wins.
512                                // Remember the first one we accept; subsequent
513                                // batch entries (or DATAs after) drop.
514                                if s.inner_sub.is_none() && plan.first_outer_h.is_none() {
515                                    binding_for_outer.retain_handle(h);
516                                    plan.first_outer_h = Some(h);
517                                    plan.first_retained = true;
518                                }
519                            }
520                            // else: Resolved on outer source — no action.
521                        }
522                        5 => {
523                            if let Some(h) = m.payload_handle() {
524                                // Error
525                                if !s.terminated {
526                                    s.terminated = true;
527                                    // Release any retain we took for
528                                    // first_outer_h — we won't be projecting it.
529                                    if plan.first_retained {
530                                        if let Some(h0) = plan.first_outer_h.take() {
531                                            binding_for_outer.release_handle(h0);
532                                            plan.first_retained = false;
533                                        }
534                                    }
535                                    binding_for_outer.retain_handle(h);
536                                    plan.self_error = Some(h);
537                                }
538                            } else {
539                                // Complete
540                                s.source_done = true;
541                                if s.inner_sub.is_none()
542                                    && plan.first_outer_h.is_none()
543                                    && !s.terminated
544                                {
545                                    s.terminated = true;
546                                    plan.self_complete = true;
547                                }
548                            }
549                        }
550                        _ => {} // Tiers 0/1/2/4/6 — no action.
551                    }
552                }
553            }
554
555            if plan.first_retained {
556                let outer_h = plan
557                    .first_outer_h
558                    .expect("first_retained implies first_outer_h is Some");
559                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
560                binding_for_outer.release_handle(outer_h);
561
562                let on_complete = make_exhaust_on_complete(
563                    state_for_outer.clone(),
564                    core_for_outer.clone(),
565                    producer_id,
566                );
567                let on_error = make_exhaust_on_error(
568                    state_for_outer.clone(),
569                    core_for_outer.clone(),
570                    producer_id,
571                );
572                let inner_sink = build_inner_sink(
573                    core_for_outer.clone(),
574                    producer_binding_for_outer.clone(),
575                    producer_id,
576                    on_complete,
577                    on_error,
578                );
579                // If inner already pre-completed during the handshake,
580                // on_complete already cleared `inner_sub`. We `replace`
581                // either way; in the synchronous-completion path,
582                // `inner_sub` was None so our just-subscribed (and
583                // already-dead) sub is dropped on the next iteration.
584                let inner_sub = core_for_outer.subscribe(inner_node, inner_sink);
585                let to_drop = {
586                    let mut s = state_for_outer.lock().unwrap();
587                    if s.terminated {
588                        Some(inner_sub)
589                    } else {
590                        s.inner_sub.replace(inner_sub)
591                    }
592                };
593                drop(to_drop);
594            }
595
596            if plan.self_complete {
597                core_for_outer.complete(producer_id);
598            } else if let Some(h) = plan.self_error {
599                core_for_outer.error(producer_id, h);
600            }
601        });
602
603        ctx.subscribe_to(source, outer_sink);
604    });
605
606    let fn_id = binding.register_producer_build(build);
607    core.register_producer(fn_id)
608        .expect("invariant: register_producer has no deps; no error variants reachable")
609}
610
611fn make_exhaust_on_complete(
612    state: Arc<Mutex<ExhaustState>>,
613    core: Core,
614    producer_id: NodeId,
615) -> Arc<dyn Fn() + Send + Sync> {
616    Arc::new(move || {
617        let prev_inner;
618        let mut should_complete = false;
619        {
620            let mut s = state.lock().unwrap();
621            if s.terminated {
622                return;
623            }
624            prev_inner = s.inner_sub.take();
625            if s.source_done && !s.terminated {
626                s.terminated = true;
627                should_complete = true;
628            }
629        }
630        drop(prev_inner);
631        if should_complete {
632            core.complete(producer_id);
633        }
634    })
635}
636
637fn make_exhaust_on_error(
638    state: Arc<Mutex<ExhaustState>>,
639    core: Core,
640    producer_id: NodeId,
641) -> Arc<dyn Fn(HandleId) + Send + Sync> {
642    Arc::new(move |h| {
643        let prev_inner;
644        {
645            let mut s = state.lock().unwrap();
646            if s.terminated {
647                return;
648            }
649            s.terminated = true;
650            prev_inner = s.inner_sub.take();
651        }
652        drop(prev_inner);
653        core.error(producer_id, h);
654    })
655}
656
657// =====================================================================
658// merge_map — parallel inners up to `concurrency` cap
659// concat_map — wrapper for concurrency = Some(1)
660// =====================================================================
661
662thread_local! {
663    /// Per-thread guard preventing recursive drain of `MergeMapState`.
664    /// When an `on_complete` fires synchronously inside a
665    /// `Core::subscribe` handshake (pre-completed inner), it must not
666    /// re-enter the drain loop — instead it just decrements + removes
667    /// its sub and returns. The outermost drain owns the loop and
668    /// observes the freed-up cap on its next iteration.
669    static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
670}
671
672struct MergeMapState {
673    /// Number of currently-active inner subscriptions (spawned but
674    /// not yet completed/errored).
675    active: u32,
676    /// Outer DATAs waiting because `active >= concurrency`. Each
677    /// handle has one retain share (taken on enqueue, released on
678    /// dequeue + project).
679    buffer: VecDeque<HandleId>,
680    /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
681    /// inner's `on_complete` removes its entry by id (lock-released
682    /// drop).
683    inner_subs: AHashMap<u64, Subscription>,
684    /// Pending inner ids (between `subscribe` call and
685    /// `inner_subs.insert`). Used to detect synchronous-completion:
686    /// if `on_complete` runs during `subscribe`, it removes from
687    /// `pending_inner_ids`; the post-subscribe code checks the set
688    /// and skips inserting the now-dead sub.
689    pending_inner_ids: ahash::AHashSet<u64>,
690    next_inner_id: u64,
691    source_done: bool,
692    terminated: bool,
693}
694
695impl MergeMapState {
696    fn new() -> Self {
697        Self {
698            active: 0,
699            buffer: VecDeque::new(),
700            inner_subs: AHashMap::new(),
701            pending_inner_ids: ahash::AHashSet::new(),
702            next_inner_id: 0,
703            source_done: false,
704            terminated: false,
705        }
706    }
707}
708
709/// `merge_map(source, project)` — unbounded concurrency variant.
710/// Equivalent to [`merge_map_with_concurrency`] with `None`.
711#[must_use]
712pub fn merge_map(
713    core: &Core,
714    binding: &Arc<dyn HigherOrderBinding>,
715    source: NodeId,
716    project: ProjectFn,
717) -> NodeId {
718    merge_map_with_concurrency(core, binding, source, project, None)
719}
720
721/// `concat_map(source, project)` — sequential queue variant.
722/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
723/// outer DATA is enqueued and processed one-at-a-time.
724#[must_use]
725pub fn concat_map(
726    core: &Core,
727    binding: &Arc<dyn HigherOrderBinding>,
728    source: NodeId,
729    project: ProjectFn,
730) -> NodeId {
731    merge_map_with_concurrency(core, binding, source, project, Some(1))
732}
733
734/// `merge_map_with_concurrency(source, project, concurrency)` — projects
735/// each outer DATA to an inner Node and subscribes in parallel.
736///
737/// `concurrency`:
738/// - `None` → unbounded (every outer DATA spawns immediately).
739/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
740///   buffer until an active inner completes.
741///
742/// Per D043 / D040, this matches the
743/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
744/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
745/// (would buffer everything indefinitely without ever spawning) but
746/// accepted at the type level.
747#[must_use]
748pub fn merge_map_with_concurrency(
749    core: &Core,
750    binding: &Arc<dyn HigherOrderBinding>,
751    source: NodeId,
752    project: ProjectFn,
753    concurrency: Option<u32>,
754) -> NodeId {
755    let project_fn_id = binding.register_project(project);
756    // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
757    let core_weak = core.weak_handle();
758    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
759    let producer_binding_weak: Weak<dyn ProducerBinding> =
760        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
761
762    let build = Box::new(move |ctx: ProducerCtx<'_>| {
763        let producer_id = ctx.node_id();
764        let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
765            core_weak.upgrade(),
766            binding_weak.upgrade(),
767            producer_binding_weak.upgrade(),
768        ) else {
769            return;
770        };
771        let state: Arc<Mutex<MergeMapState>> = Arc::new(Mutex::new(MergeMapState::new()));
772
773        let state_for_outer = state.clone();
774        let core_for_outer = core_clone.clone();
775        let binding_for_outer = binding_clone.clone();
776        let producer_binding_for_outer = producer_binding.clone();
777
778        let outer_sink: Sink = Arc::new(move |msgs| {
779            // Phase 1: enqueue DATAs into the buffer (always — drain
780            // loop dequeues + spawns up to cap), classify terminal
781            // signals.
782            let mut error_action: Option<HandleId> = None;
783            let mut self_complete_now = false;
784            {
785                let mut s = state_for_outer.lock().unwrap();
786                if s.terminated {
787                    return;
788                }
789                // Tier-based dispatch (canonical §4.2; see
790                // `feedback_use_tier_for_signal_routing.md`).
791                for m in msgs {
792                    match m.tier() {
793                        3 => {
794                            if let Some(h) = m.payload_handle() {
795                                // Retain on enqueue — released by drain
796                                // after invoke_project.
797                                binding_for_outer.retain_handle(h);
798                                s.buffer.push_back(h);
799                            }
800                            // else: Resolved on outer source — no action.
801                        }
802                        5 => {
803                            if let Some(h) = m.payload_handle() {
804                                // Error
805                                if !s.terminated {
806                                    s.terminated = true;
807                                    binding_for_outer.retain_handle(h);
808                                    while let Some(q) = s.buffer.pop_front() {
809                                        binding_for_outer.release_handle(q);
810                                    }
811                                    error_action = Some(h);
812                                }
813                            } else {
814                                // Complete
815                                s.source_done = true;
816                                if s.active == 0 && s.buffer.is_empty() && !s.terminated {
817                                    s.terminated = true;
818                                    self_complete_now = true;
819                                }
820                            }
821                        }
822                        _ => {} // Tiers 0/1/2/4/6 — no action.
823                    }
824                }
825            }
826
827            if let Some(h) = error_action {
828                core_for_outer.error(producer_id, h);
829                return;
830            }
831            if self_complete_now {
832                core_for_outer.complete(producer_id);
833                return;
834            }
835
836            // Phase 2: drain buffer iteratively up to concurrency cap.
837            drain_merge_buffer(
838                &state_for_outer,
839                &core_for_outer,
840                &binding_for_outer,
841                &producer_binding_for_outer,
842                producer_id,
843                project_fn_id,
844                concurrency,
845            );
846        });
847
848        ctx.subscribe_to(source, outer_sink);
849    });
850
851    let fn_id = binding.register_producer_build(build);
852    core.register_producer(fn_id)
853        .expect("invariant: register_producer has no deps; no error variants reachable")
854}
855
856/// Iteratively pop from `buffer` and spawn inners until cap is reached
857/// or buffer is empty. Re-entrance from a nested `on_complete` is
858/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
859/// the drain loop and picks up cap-frees on subsequent iterations.
860fn drain_merge_buffer(
861    state: &Arc<Mutex<MergeMapState>>,
862    core: &Core,
863    binding: &Arc<dyn HigherOrderBinding>,
864    producer_binding: &Arc<dyn ProducerBinding>,
865    producer_id: NodeId,
866    project_fn_id: FnId,
867    concurrency: Option<u32>,
868) {
869    if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
870        // Already draining on this thread; outer loop will drain
871        // remaining buffer.
872        return;
873    }
874
875    loop {
876        let h_and_id;
877        let mut should_self_complete = false;
878        {
879            let mut s = state.lock().unwrap();
880            if s.terminated {
881                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
882                return;
883            }
884            let allowed = match concurrency {
885                None => true,
886                Some(n) => s.active < n,
887            };
888            if !allowed {
889                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
890                return;
891            }
892            if let Some(h) = s.buffer.pop_front() {
893                s.active += 1;
894                let id = s.next_inner_id;
895                s.next_inner_id += 1;
896                s.pending_inner_ids.insert(id);
897                h_and_id = Some((h, id));
898            } else if s.source_done && s.active == 0 && !s.terminated {
899                s.terminated = true;
900                should_self_complete = true;
901                h_and_id = None;
902            } else {
903                h_and_id = None;
904            }
905        }
906
907        if should_self_complete {
908            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
909            core.complete(producer_id);
910            return;
911        }
912
913        let Some((outer_h, inner_id)) = h_and_id else {
914            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
915            return;
916        };
917
918        // Spawn lock-released.
919        let inner_node = binding.invoke_project(project_fn_id, outer_h);
920        binding.release_handle(outer_h);
921
922        let on_complete = make_merge_on_complete(
923            state.clone(),
924            core.clone(),
925            binding.clone(),
926            producer_binding.clone(),
927            producer_id,
928            project_fn_id,
929            inner_id,
930            concurrency,
931        );
932        let on_error =
933            make_merge_on_error(state.clone(), core.clone(), binding.clone(), producer_id);
934        let inner_sink = build_inner_sink(
935            core.clone(),
936            producer_binding.clone(),
937            producer_id,
938            on_complete,
939            on_error,
940        );
941        let inner_sub = core.subscribe(inner_node, inner_sink);
942
943        // Decide whether to install the sub: if `on_complete` fired
944        // synchronously inside `subscribe` (pre-completed inner), it
945        // already removed `inner_id` from `pending_inner_ids`.
946        let to_drop = {
947            let mut s = state.lock().unwrap();
948            if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
949                Some(inner_sub)
950            } else {
951                s.inner_subs.insert(inner_id, inner_sub);
952                None
953            }
954        };
955        drop(to_drop);
956
957        // Loop continues — pops next from buffer or returns.
958    }
959}
960
961fn make_merge_on_complete(
962    state: Arc<Mutex<MergeMapState>>,
963    core: Core,
964    binding: Arc<dyn HigherOrderBinding>,
965    producer_binding: Arc<dyn ProducerBinding>,
966    producer_id: NodeId,
967    project_fn_id: FnId,
968    this_inner_id: u64,
969    concurrency: Option<u32>,
970) -> Arc<dyn Fn() + Send + Sync> {
971    Arc::new(move || {
972        let removed_sub;
973        {
974            let mut s = state.lock().unwrap();
975            if s.terminated {
976                return;
977            }
978            s.active -= 1;
979            // Two cases:
980            // (a) Sync-completion during subscribe: `pending_inner_ids`
981            //     still contains us; `inner_subs` does not. Remove from
982            //     pending so the post-subscribe insert sees we're done
983            //     and skips installing the dead sub.
984            // (b) Async completion: `inner_subs` contains us. Remove
985            //     and drop lock-released.
986            s.pending_inner_ids.remove(&this_inner_id);
987            removed_sub = s.inner_subs.remove(&this_inner_id);
988        }
989        drop(removed_sub);
990
991        // Try to drain — if we're nested inside an outer drain loop
992        // (sync-completion path), this is a no-op and the outer drain
993        // continues.
994        drain_merge_buffer(
995            &state,
996            &core,
997            &binding,
998            &producer_binding,
999            producer_id,
1000            project_fn_id,
1001            concurrency,
1002        );
1003    })
1004}
1005
1006/// Inner-error path for merge_map: terminates the producer + drains
1007/// all inner subs (lock-released) + releases buffered DATA handles.
1008/// Captures `binding` so we can release buffered handles' retains
1009/// (taken on enqueue in the outer_sink's Data branch); without this,
1010/// inner-error before all buffered DATAs project would leak refcount
1011/// shares.
1012fn make_merge_on_error(
1013    state: Arc<Mutex<MergeMapState>>,
1014    core: Core,
1015    binding: Arc<dyn HigherOrderBinding>,
1016    producer_id: NodeId,
1017) -> Arc<dyn Fn(HandleId) + Send + Sync> {
1018    Arc::new(move |h| {
1019        let removed_subs;
1020        let buffered_to_release;
1021        {
1022            let mut s = state.lock().unwrap();
1023            if s.terminated {
1024                return;
1025            }
1026            s.terminated = true;
1027            removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1028            s.pending_inner_ids.clear();
1029            buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1030        }
1031        drop(removed_subs);
1032        for h_b in buffered_to_release {
1033            binding.release_handle(h_b);
1034        }
1035        core.error(producer_id, h);
1036    })
1037}
1038
1039// =====================================================================
1040// Send + Sync compile-time asserts (Slice E /qa)
1041// =====================================================================
1042
1043const _: fn() = || {
1044    fn assert_send_sync<T: Send + Sync>() {}
1045    assert_send_sync::<SwitchState>();
1046    assert_send_sync::<ExhaustState>();
1047    assert_send_sync::<MergeMapState>();
1048    assert_send_sync::<ProjectFn>();
1049};