Skip to main content

graphrefly_operators/
higher_order.rs

1//! Higher-order operators (Slice E, D044) — operators whose project fn
2//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
3//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
4//! `concatMap` / `mergeMap`).
5//!
6//! All four are producer-pattern nodes (no declared deps; subscribe to
7//! the outer source from inside the build closure; emit on themselves
8//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
9//! (zip / concat / race / takeUntil); the producer substrate handles
10//! auto-cleanup of upstream + inner subscriptions on producer
11//! deactivation (D031–D038).
12//!
13//! The four flavors differ in how they handle a new outer DATA while a
14//! prior inner is still active:
15//!
16//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
17//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
18//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
19//!   [`merge_map_with_concurrency`] with `Some(1)`.)
20//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
21//!   up to `concurrency`. `None` = unbounded.
22//!
23//! # Inner-sub tracking (Slice E /qa refactor)
24//!
25//! Each operator owns its inner [`Subscription`]s **inside its state
26//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
27//! `producer_storage[producer_id].subs` holds only the OUTER source
28//! subscription (one entry, no positional concerns). switch_map /
29//! exhaust_map keep `Option<Subscription>` (single active inner); merge
30//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
31//! `next_inner_id`. This avoids two bugs the original positional design
32//! exposed: (a) cached-outer source firing handshake before
33//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
34//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
35//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
36//! Complete; merge/concat remove specific id on inner Complete.
37//!
38//! # Drain discipline (iterative spawn)
39//!
40//! `merge_map` could spawn the next buffered DATA from inside an
41//! inner's `on_complete` callback. For pathological pre-completed
42//! inners (synchronous Complete during the subscribe handshake),
43//! recursive spawn would grow the stack proportionally to the buffer
44//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
45//! recursion: the outermost drain owns the loop; nested `on_complete`
46//! invocations only decrement state and return.
47//!
48//! # Project closure (D044)
49//!
50//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
51//! registered through [`HigherOrderBinding::register_project`]. Bindings
52//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
53//! WASM callbacks into this Rust shape; Rust-side users register a
54//! closure directly.
55
56#![allow(clippy::collapsible_if, clippy::collapsible_match)]
57#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
58
59use std::cell::Cell;
60use std::cell::RefCell;
61use std::collections::VecDeque;
62use std::rc::Rc;
63use std::sync::{Arc, Weak};
64
65use ahash::AHashMap;
66use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink};
67use smallvec::SmallVec;
68
69use super::producer::{ProducerBinding, ProducerCtx, ProducerEmitter, SubGuard};
70
71// =====================================================================
72// HigherOrderBinding — closure registration for project: T -> Node<R>
73// =====================================================================
74
75/// Project closure: takes an outer DATA handle, returns the
76/// [`NodeId`] of an inner node to subscribe to. Closure may register
77/// new state/derived nodes on the fly via captured [`Core`].
78pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
79
80/// Closure-registration interface for higher-order operators.
81///
82/// Extends [`ProducerBinding`] with one method that bindings shipping
83/// higher-order operators must implement.
84pub trait HigherOrderBinding: ProducerBinding {
85    /// Register a project closure. The returned [`FnId`] is captured by
86    /// the operator's build closure and looked up via
87    /// [`Self::invoke_project`] on each outer DATA fire.
88    fn register_project(&self, project: ProjectFn) -> FnId;
89
90    /// Invoke a registered project closure with the given outer DATA
91    /// handle. Returns the inner node's [`NodeId`].
92    ///
93    /// # Panics
94    ///
95    /// Implementations panic if `fn_id` is not a registered project
96    /// closure.
97    fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
98}
99
100// =====================================================================
101// build_inner_sink — shared inner-subscription handler
102// =====================================================================
103//
104// Each higher-order op subscribes to the inner node returned by its
105// project closure. The inner sink dispatches by `Message::tier()`
106// (R1.3.7.b — central message-tier utility, never hardcode variant
107// checks for forwarding gating per CLAUDE.md design invariant 4):
108//
109// - Tier 0 (Start) — drop.
110// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
111//   acknowledged divergence from TS legacy; Rust producer relies on
112//   Core's wave engine to generate DIRTY on the producer's own emits.)
113// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
114//   propagated through higher-order ops by design.
115// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
116//   `Resolved` drops (same wave-engine rationale as Dirty).
117// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
118//   per R1.2.7. Inner cache invalidation surfaces to producer
119//   subscribers.
120// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
121//   callbacks (`on_inner_complete` / `on_inner_error`). Differs
122//   per-op (switch_map clears active inner; merge_map decrements
123//   active count; etc.).
124// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
125//   per R2.6.4. Inner permanent destruction propagates.
126
127// S2b/D230/D234: long-lived inner sink takes `em: ProducerEmitter`
128// (was `core: Core`, no longer `Clone`). Emit is the mailbox fast path;
129// the rare INVALIDATE/TEARDOWN terminal forwards route through
130// `em.defer` (`CoreFull`).
131fn build_inner_sink(
132    em: ProducerEmitter,
133    producer_binding: Arc<dyn ProducerBinding>,
134    producer_id: NodeId,
135    on_inner_complete: Rc<dyn Fn()>,
136    on_inner_error: Rc<dyn Fn(HandleId)>,
137) -> Sink {
138    Rc::new(move |msgs: &[Message]| {
139        enum Action {
140            Emit(HandleId),
141            Complete,
142            Error(HandleId),
143            Invalidate,
144            Teardown,
145        }
146        let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
147        for m in msgs {
148            match m.tier() {
149                3 => {
150                    // Tier 3: Data/Resolved. Only Data carries a payload
151                    // to forward; Resolved is dropped per the
152                    // wave-engine divergence above.
153                    if let Some(h) = m.payload_handle() {
154                        producer_binding.retain_handle(h);
155                        actions.push(Action::Emit(h));
156                    }
157                }
158                4 => {
159                    // Tier 4: Invalidate. Forward to producer.
160                    actions.push(Action::Invalidate);
161                }
162                5 => {
163                    // Tier 5: Complete or Error. Semantic dispatch via
164                    // op-specific callbacks. `payload_handle()` is the
165                    // canonical Error-vs-Complete discriminator at this
166                    // tier.
167                    if let Some(h) = m.payload_handle() {
168                        producer_binding.retain_handle(h);
169                        actions.push(Action::Error(h));
170                    } else {
171                        actions.push(Action::Complete);
172                    }
173                }
174                6 => {
175                    // Tier 6: Teardown. Forward to producer.
176                    actions.push(Action::Teardown);
177                }
178                // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
179                _ => {}
180            }
181        }
182        for action in actions {
183            match action {
184                Action::Emit(h) => em.emit(producer_id, h),
185                Action::Complete => on_inner_complete(),
186                Action::Error(h) => on_inner_error(h),
187                Action::Invalidate => {
188                    let _ = em.defer(move |c| c.invalidate(producer_id));
189                }
190                Action::Teardown => {
191                    let _ = em.defer(move |c| c.teardown(producer_id));
192                }
193            }
194        }
195    })
196}
197
198// =====================================================================
199// switch_map — cancel previous inner on each new outer DATA
200// =====================================================================
201
202struct SwitchState {
203    /// Currently-active inner subscription (if any). Cancelling the
204    /// prior inner is `Option::take` + drop — S2b: `SubGuard::Drop`
205    /// posts the `unsubscribe` via `em.defer` (owner-side, in-wave,
206    /// FIFO-ordered so cancel drains before any resubscribe — D234).
207    inner_sub: Option<SubGuard>,
208    /// QA P2 (2026-05-18): monotonic switch epoch. Under D234 the
209    /// cancel-prev is `inner_sub.take()` at outer-fire, but the prior
210    /// subscribe is a still-queued `em.defer` that hasn't populated
211    /// `inner_sub` yet — so a fast double-switch within one wave would
212    /// `take()` an empty slot and leave BOTH inners live. Each accepted
213    /// outer DATA bumps `epoch`; the subscribe-defer captures the value
214    /// and, after `try_subscribe`, drops the new `SubGuard` immediately
215    /// (deferred unsubscribe) instead of installing it if `epoch` has
216    /// moved on — i.e. a newer outer DATA superseded it.
217    epoch: u64,
218    source_done: bool,
219    terminated: bool,
220}
221
222impl SwitchState {
223    fn new() -> Self {
224        Self {
225            inner_sub: None,
226            epoch: 0,
227            source_done: false,
228            terminated: false,
229        }
230    }
231}
232
233/// `switch_map(source, project)` — for each outer DATA, cancel the
234/// previous inner subscription and subscribe to the inner node returned
235/// by `project(value)`. Inner DATA flows through to downstream; inner
236/// COMPLETE clears the active slot; outer COMPLETE (with no active
237/// inner) self-completes the operator.
238#[must_use]
239pub fn switch_map(
240    core: &Core,
241    binding: &Arc<dyn HigherOrderBinding>,
242    source: NodeId,
243    project: ProjectFn,
244) -> NodeId {
245    let project_fn_id = binding.register_project(project);
246    // S2b/D223: the Core weak is GONE (Core relocates; long-lived sinks
247    // reach it via `ProducerEmitter`/`em.defer`). The *binding* weaks
248    // stay — they break the registry→build-closure→strong-binding cycle
249    // (unrelated to Core ownership; D223 only forbids `Weak<C>`).
250    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
251    let producer_binding_weak: Weak<dyn ProducerBinding> =
252        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
253
254    let build = Box::new(move |ctx: ProducerCtx<'_>| {
255        let producer_id = ctx.node_id();
256        let (Some(binding_clone), Some(producer_binding)) =
257            (binding_weak.upgrade(), producer_binding_weak.upgrade())
258        else {
259            return;
260        };
261        let em = ctx.emitter();
262        let state: Rc<RefCell<SwitchState>> = Rc::new(RefCell::new(SwitchState::new()));
263
264        let state_for_outer = state.clone();
265        let em_for_outer = em.clone();
266        let binding_for_outer = binding_clone.clone();
267        let producer_binding_for_outer = producer_binding.clone();
268
269        let outer_sink: Sink = Rc::new(move |msgs| {
270            // Phase 1: classify under state lock. Track whether we
271            // performed a retain so phase 2 can safely release it
272            // without underflow on `[Data(_), Error(_)]` same-batch
273            // (P1 /qa fix).
274            #[derive(Default)]
275            struct Plan {
276                latest_outer_h: Option<HandleId>,
277                latest_retained: bool,
278                self_complete: bool,
279                self_error: Option<HandleId>,
280            }
281            let mut plan = Plan::default();
282            {
283                let mut s = state_for_outer.borrow_mut();
284                if s.terminated {
285                    return;
286                }
287                // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
288                // and canonical §4.2 ("Always use the provided tier utilities
289                // rather than hardcoding type checks"). Tier 3 carries
290                // DATA/RESOLVED — only DATA has a payload_handle, so the
291                // `payload_handle().is_some()` test discriminates within
292                // the tier without referring to specific variants. Tier 5
293                // carries COMPLETE/ERROR — same shape (Error has handle,
294                // Complete doesn't).
295                for m in msgs {
296                    match m.tier() {
297                        3 => {
298                            if let Some(h) = m.payload_handle() {
299                                // switchMap: only the LATEST outer DATA in the
300                                // batch matters (TS legacy "skip to last in
301                                // the batch to avoid creating + immediately
302                                // discarding N-1 inners"). Track the handle;
303                                // we'll project + subscribe once after the
304                                // lock drops. Skipped handles need no retain.
305                                plan.latest_outer_h = Some(h);
306                            }
307                            // else: Resolved on outer source — no action.
308                        }
309                        5 => {
310                            if let Some(h) = m.payload_handle() {
311                                // Error
312                                if !s.terminated {
313                                    s.terminated = true;
314                                    binding_for_outer.retain_handle(h);
315                                    plan.self_error = Some(h);
316                                }
317                            } else {
318                                // Complete
319                                s.source_done = true;
320                                if s.inner_sub.is_none()
321                                    && plan.latest_outer_h.is_none()
322                                    && !s.terminated
323                                {
324                                    s.terminated = true;
325                                    plan.self_complete = true;
326                                }
327                            }
328                        }
329                        _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
330                    }
331                }
332                if let Some(h) = plan.latest_outer_h {
333                    if !s.terminated {
334                        // Retain ONE share for the chosen handle —
335                        // released by phase 2 after invoke_project.
336                        binding_for_outer.retain_handle(h);
337                        plan.latest_retained = true;
338                    }
339                }
340            }
341
342            // Phase 2: cancel prior inner sub, project, subscribe.
343            // Gated on `latest_retained` so that an `Error` arriving in
344            // the same batch (which sets terminated and skips the
345            // retain) does NOT trigger an unbalanced release.
346            if plan.latest_retained {
347                let outer_h = plan
348                    .latest_outer_h
349                    .expect("latest_retained implies latest_outer_h is Some");
350
351                // Cancel prior inner sub (if any) + bump the switch
352                // epoch (QA P2). `take()` only catches an *already
353                // installed* prior inner; a prior subscribe still queued
354                // as an `em.defer` hasn't populated `inner_sub` yet, so
355                // bumping `epoch` here and re-checking it inside the
356                // subscribe-defer is what actually invalidates a
357                // superseded-but-still-queued prior subscribe.
358                let my_epoch = {
359                    let mut s = state_for_outer.borrow_mut();
360                    let prev = s.inner_sub.take();
361                    s.epoch += 1;
362                    let e = s.epoch;
363                    drop(s);
364                    drop(prev); // SubGuard::Drop → deferred unsubscribe.
365                    e
366                };
367
368                // invoke_project lock-released (binding call, not Core).
369                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
370                binding_for_outer.release_handle(outer_h);
371
372                let on_complete = make_switch_on_complete(
373                    state_for_outer.clone(),
374                    em_for_outer.clone(),
375                    producer_id,
376                );
377                let on_error = make_switch_on_error(
378                    state_for_outer.clone(),
379                    em_for_outer.clone(),
380                    producer_id,
381                );
382                // F2 /qa: TornDown synthesizes inner-Complete so the
383                // self-Complete trigger can fire (batched
384                // [Data,Complete]→dead-inner wedge fix).
385                let on_complete_for_dead = on_complete.clone();
386                let inner_sink = build_inner_sink(
387                    em_for_outer.clone(),
388                    producer_binding_for_outer.clone(),
389                    producer_id,
390                    on_complete,
391                    on_error,
392                );
393                // D234: the inner subscribe needs `CoreFull` from a
394                // long-lived sink → `em.defer` (owner-side, in-wave,
395                // FIFO after the cancel above). The returned
396                // `SubscriptionId` is wrapped in a `SubGuard` (its Drop
397                // schedules the eventual unsubscribe). QA P2: the
398                // captured `my_epoch` invalidates this subscribe if a
399                // newer outer DATA superseded it while it was queued.
400                let state_sub = state_for_outer.clone();
401                let em_guard = em_for_outer.clone();
402                let _ = em_for_outer.defer(move |c| {
403                    match c.try_subscribe(inner_node, inner_sink) {
404                        Ok(sub) => {
405                            let guard = SubGuard::new(inner_node, sub, em_guard);
406                            let to_drop = {
407                                let mut s = state_sub.borrow_mut();
408                                if s.terminated || s.epoch != my_epoch {
409                                    Some(guard)
410                                } else {
411                                    s.inner_sub.replace(guard)
412                                }
413                            };
414                            drop(to_drop);
415                        }
416                        Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
417                            // R2.2.7.b: inner dead — synthesize
418                            // inner-Complete (clears inner_sub, checks
419                            // self-Complete trigger).
420                            on_complete_for_dead();
421                        }
422                    }
423                });
424            }
425
426            if plan.self_complete {
427                em_for_outer.complete(producer_id);
428            } else if let Some(h) = plan.self_error {
429                em_for_outer.error(producer_id, h);
430            }
431        });
432
433        ctx.subscribe_to(source, outer_sink);
434    });
435
436    let fn_id = binding.register_producer_build(build);
437    core.register_producer(fn_id)
438        .expect("invariant: register_producer has no deps; no error variants reachable")
439}
440
441fn make_switch_on_complete(
442    state: Rc<RefCell<SwitchState>>,
443    em: ProducerEmitter,
444    producer_id: NodeId,
445) -> Rc<dyn Fn()> {
446    Rc::new(move || {
447        let prev_inner;
448        let mut should_complete = false;
449        {
450            let mut s = state.borrow_mut();
451            if s.terminated {
452                return;
453            }
454            prev_inner = s.inner_sub.take();
455            if s.source_done && !s.terminated {
456                s.terminated = true;
457                should_complete = true;
458            }
459        }
460        drop(prev_inner); // SubGuard::Drop → deferred unsubscribe.
461        if should_complete {
462            em.complete(producer_id);
463        }
464    })
465}
466
467fn make_switch_on_error(
468    state: Rc<RefCell<SwitchState>>,
469    em: ProducerEmitter,
470    producer_id: NodeId,
471) -> Rc<dyn Fn(HandleId)> {
472    Rc::new(move |h| {
473        let prev_inner;
474        {
475            let mut s = state.borrow_mut();
476            if s.terminated {
477                return;
478            }
479            s.terminated = true;
480            prev_inner = s.inner_sub.take();
481        }
482        drop(prev_inner);
483        em.error(producer_id, h);
484    })
485}
486
487// =====================================================================
488// exhaust_map — ignore outer DATA while inner is active
489// =====================================================================
490
491struct ExhaustState {
492    /// Active inner sub. S2b: `Option<SubGuard>` — drop posts the
493    /// deferred unsubscribe (D234).
494    inner_sub: Option<SubGuard>,
495    /// QA P2 (2026-05-18): exhaust is *older-wins* — while a projected
496    /// inner is pending OR active, new outer DATA is dropped. Under
497    /// D234 the subscribe is a queued `em.defer`, so `inner_sub` is
498    /// `None` between accept and install; without this flag a 2nd
499    /// outer DATA in that window would also pass `inner_sub.is_none()`
500    /// and project a 2nd inner (exhaust contract violation). Set true
501    /// when an outer DATA is accepted + its subscribe queued; cleared
502    /// when the inner completes/errors or its subscribe finds the
503    /// source dead.
504    pending: bool,
505    source_done: bool,
506    terminated: bool,
507}
508
509impl ExhaustState {
510    fn new() -> Self {
511        Self {
512            inner_sub: None,
513            pending: false,
514            source_done: false,
515            terminated: false,
516        }
517    }
518}
519
520/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
521/// outer DATA while an inner subscription is active. First outer DATA
522/// per "active window" wins; subsequent DATAs are discarded until the
523/// inner completes.
524#[must_use]
525pub fn exhaust_map(
526    core: &Core,
527    binding: &Arc<dyn HigherOrderBinding>,
528    source: NodeId,
529    project: ProjectFn,
530) -> NodeId {
531    let project_fn_id = binding.register_project(project);
532    // S2b/D223: binding weaks stay (registry-cycle break); Core weak
533    // gone — sinks reach Core via `em`/`em.defer`.
534    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
535    let producer_binding_weak: Weak<dyn ProducerBinding> =
536        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
537
538    let build = Box::new(move |ctx: ProducerCtx<'_>| {
539        let producer_id = ctx.node_id();
540        let (Some(binding_clone), Some(producer_binding)) =
541            (binding_weak.upgrade(), producer_binding_weak.upgrade())
542        else {
543            return;
544        };
545        let em = ctx.emitter();
546        let state: Rc<RefCell<ExhaustState>> = Rc::new(RefCell::new(ExhaustState::new()));
547
548        let state_for_outer = state.clone();
549        let em_for_outer = em.clone();
550        let binding_for_outer = binding_clone.clone();
551        let producer_binding_for_outer = producer_binding.clone();
552
553        let outer_sink: Sink = Rc::new(move |msgs| {
554            #[derive(Default)]
555            struct Plan {
556                first_outer_h: Option<HandleId>,
557                first_retained: bool,
558                self_complete: bool,
559                self_error: Option<HandleId>,
560            }
561            let mut plan = Plan::default();
562            {
563                let mut s = state_for_outer.borrow_mut();
564                if s.terminated {
565                    return;
566                }
567                // Tier-based dispatch (canonical §4.2; see
568                // `feedback_use_tier_for_signal_routing.md`).
569                for m in msgs {
570                    match m.tier() {
571                        3 => {
572                            if let Some(h) = m.payload_handle() {
573                                // First DATA per active window wins.
574                                // Remember the first one we accept; subsequent
575                                // batch entries (or DATAs after) drop.
576                                // QA P2: also gate on `!s.pending` — a
577                                // prior accepted DATA's subscribe may be
578                                // queued (inner_sub still None); exhaust
579                                // must drop this one (older-wins).
580                                if s.inner_sub.is_none()
581                                    && !s.pending
582                                    && plan.first_outer_h.is_none()
583                                {
584                                    binding_for_outer.retain_handle(h);
585                                    plan.first_outer_h = Some(h);
586                                    plan.first_retained = true;
587                                    s.pending = true;
588                                }
589                            }
590                            // else: Resolved on outer source — no action.
591                        }
592                        5 => {
593                            if let Some(h) = m.payload_handle() {
594                                // Error
595                                if !s.terminated {
596                                    s.terminated = true;
597                                    // Release any retain we took for
598                                    // first_outer_h — we won't be projecting it.
599                                    if plan.first_retained {
600                                        if let Some(h0) = plan.first_outer_h.take() {
601                                            binding_for_outer.release_handle(h0);
602                                            plan.first_retained = false;
603                                        }
604                                    }
605                                    binding_for_outer.retain_handle(h);
606                                    plan.self_error = Some(h);
607                                }
608                            } else {
609                                // Complete
610                                s.source_done = true;
611                                if s.inner_sub.is_none()
612                                    && plan.first_outer_h.is_none()
613                                    && !s.terminated
614                                {
615                                    s.terminated = true;
616                                    plan.self_complete = true;
617                                }
618                            }
619                        }
620                        _ => {} // Tiers 0/1/2/4/6 — no action.
621                    }
622                }
623            }
624
625            if plan.first_retained {
626                let outer_h = plan
627                    .first_outer_h
628                    .expect("first_retained implies first_outer_h is Some");
629                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
630                binding_for_outer.release_handle(outer_h);
631
632                let on_complete = make_exhaust_on_complete(
633                    state_for_outer.clone(),
634                    em_for_outer.clone(),
635                    producer_id,
636                );
637                let on_error = make_exhaust_on_error(
638                    state_for_outer.clone(),
639                    em_for_outer.clone(),
640                    producer_id,
641                );
642                // F2 /qa: clone for TornDown synthesis (mirrors switch_map).
643                let on_complete_for_dead = on_complete.clone();
644                let inner_sink = build_inner_sink(
645                    em_for_outer.clone(),
646                    producer_binding_for_outer.clone(),
647                    producer_id,
648                    on_complete,
649                    on_error,
650                );
651                // D234: inner subscribe via `em.defer` (long-lived sink
652                // can't hold `&Core`). TornDown → synthesize
653                // inner-Complete so `inner_sub` clears and the next
654                // outer DATA can re-project (F2 /qa).
655                let state_sub = state_for_outer.clone();
656                let em_guard = em_for_outer.clone();
657                let _ =
658                    em_for_outer.defer(move |c| match c.try_subscribe(inner_node, inner_sink) {
659                        Ok(sub) => {
660                            let guard = SubGuard::new(inner_node, sub, em_guard);
661                            let to_drop = {
662                                let mut s = state_sub.borrow_mut();
663                                if s.terminated {
664                                    Some(guard)
665                                } else {
666                                    s.inner_sub.replace(guard)
667                                }
668                            };
669                            drop(to_drop);
670                        }
671                        Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
672                            on_complete_for_dead();
673                        }
674                    });
675            }
676
677            if plan.self_complete {
678                em_for_outer.complete(producer_id);
679            } else if let Some(h) = plan.self_error {
680                em_for_outer.error(producer_id, h);
681            }
682        });
683
684        ctx.subscribe_to(source, outer_sink);
685    });
686
687    let fn_id = binding.register_producer_build(build);
688    core.register_producer(fn_id)
689        .expect("invariant: register_producer has no deps; no error variants reachable")
690}
691
692fn make_exhaust_on_complete(
693    state: Rc<RefCell<ExhaustState>>,
694    em: ProducerEmitter,
695    producer_id: NodeId,
696) -> Rc<dyn Fn()> {
697    Rc::new(move || {
698        let prev_inner;
699        let mut should_complete = false;
700        {
701            let mut s = state.borrow_mut();
702            if s.terminated {
703                return;
704            }
705            prev_inner = s.inner_sub.take();
706            // QA P2: inner finished (or was dead) — exhaust window
707            // re-opens; the next outer DATA may project again.
708            s.pending = false;
709            if s.source_done && !s.terminated {
710                s.terminated = true;
711                should_complete = true;
712            }
713        }
714        drop(prev_inner);
715        if should_complete {
716            em.complete(producer_id);
717        }
718    })
719}
720
721fn make_exhaust_on_error(
722    state: Rc<RefCell<ExhaustState>>,
723    em: ProducerEmitter,
724    producer_id: NodeId,
725) -> Rc<dyn Fn(HandleId)> {
726    Rc::new(move |h| {
727        let prev_inner;
728        {
729            let mut s = state.borrow_mut();
730            if s.terminated {
731                return;
732            }
733            s.terminated = true;
734            prev_inner = s.inner_sub.take();
735        }
736        drop(prev_inner);
737        em.error(producer_id, h);
738    })
739}
740
741// =====================================================================
742// merge_map — parallel inners up to `concurrency` cap
743// concat_map — wrapper for concurrency = Some(1)
744// =====================================================================
745
746thread_local! {
747    /// Per-thread guard preventing recursive drain of `MergeMapState`.
748    /// When an `on_complete` fires synchronously inside a
749    /// `Core::subscribe` handshake (pre-completed inner), it must not
750    /// re-enter the drain loop — instead it just decrements + removes
751    /// its sub and returns. The outermost drain owns the loop and
752    /// observes the freed-up cap on its next iteration.
753    static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
754}
755
756struct MergeMapState {
757    /// Number of currently-active inner subscriptions (spawned but
758    /// not yet completed/errored).
759    active: u32,
760    /// Outer DATAs waiting because `active >= concurrency`. Each
761    /// handle has one retain share (taken on enqueue, released on
762    /// dequeue + project).
763    buffer: VecDeque<HandleId>,
764    /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
765    /// inner's `on_complete` removes its entry by id (lock-released
766    /// drop).
767    inner_subs: AHashMap<u64, SubGuard>,
768    /// Pending inner ids (between `subscribe` call and
769    /// `inner_subs.insert`). Used to detect synchronous-completion:
770    /// if `on_complete` runs during `subscribe`, it removes from
771    /// `pending_inner_ids`; the post-subscribe code checks the set
772    /// and skips inserting the now-dead sub.
773    pending_inner_ids: ahash::AHashSet<u64>,
774    next_inner_id: u64,
775    source_done: bool,
776    terminated: bool,
777}
778
779impl MergeMapState {
780    fn new() -> Self {
781        Self {
782            active: 0,
783            buffer: VecDeque::new(),
784            inner_subs: AHashMap::new(),
785            pending_inner_ids: ahash::AHashSet::new(),
786            next_inner_id: 0,
787            source_done: false,
788            terminated: false,
789        }
790    }
791}
792
793/// `merge_map(source, project)` — unbounded concurrency variant.
794/// Equivalent to [`merge_map_with_concurrency`] with `None`.
795#[must_use]
796pub fn merge_map(
797    core: &Core,
798    binding: &Arc<dyn HigherOrderBinding>,
799    source: NodeId,
800    project: ProjectFn,
801) -> NodeId {
802    merge_map_with_concurrency(core, binding, source, project, None)
803}
804
805/// `concat_map(source, project)` — sequential queue variant.
806/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
807/// outer DATA is enqueued and processed one-at-a-time.
808#[must_use]
809pub fn concat_map(
810    core: &Core,
811    binding: &Arc<dyn HigherOrderBinding>,
812    source: NodeId,
813    project: ProjectFn,
814) -> NodeId {
815    merge_map_with_concurrency(core, binding, source, project, Some(1))
816}
817
818/// `merge_map_with_concurrency(source, project, concurrency)` — projects
819/// each outer DATA to an inner Node and subscribes in parallel.
820///
821/// `concurrency`:
822/// - `None` → unbounded (every outer DATA spawns immediately).
823/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
824///   buffer until an active inner completes.
825///
826/// Per D043 / D040, this matches the
827/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
828/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
829/// (would buffer everything indefinitely without ever spawning) but
830/// accepted at the type level.
831#[must_use]
832pub fn merge_map_with_concurrency(
833    core: &Core,
834    binding: &Arc<dyn HigherOrderBinding>,
835    source: NodeId,
836    project: ProjectFn,
837    concurrency: Option<u32>,
838) -> NodeId {
839    let project_fn_id = binding.register_project(project);
840    // S2b/D223: binding weaks stay (registry-cycle break); Core weak
841    // gone — sinks reach Core via `em`/`em.defer`.
842    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
843    let producer_binding_weak: Weak<dyn ProducerBinding> =
844        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
845
846    let build = Box::new(move |ctx: ProducerCtx<'_>| {
847        let producer_id = ctx.node_id();
848        let (Some(binding_clone), Some(producer_binding)) =
849            (binding_weak.upgrade(), producer_binding_weak.upgrade())
850        else {
851            return;
852        };
853        let em = ctx.emitter();
854        let state: Rc<RefCell<MergeMapState>> = Rc::new(RefCell::new(MergeMapState::new()));
855
856        let state_for_outer = state.clone();
857        let em_for_outer = em.clone();
858        let binding_for_outer = binding_clone.clone();
859        let producer_binding_for_outer = producer_binding.clone();
860
861        let outer_sink: Sink = Rc::new(move |msgs| {
862            // Phase 1: enqueue DATAs into the buffer (always — drain
863            // loop dequeues + spawns up to cap), classify terminal
864            // signals.
865            let mut error_action: Option<HandleId> = None;
866            let mut self_complete_now = false;
867            {
868                let mut s = state_for_outer.borrow_mut();
869                if s.terminated {
870                    return;
871                }
872                // Tier-based dispatch (canonical §4.2; see
873                // `feedback_use_tier_for_signal_routing.md`).
874                for m in msgs {
875                    match m.tier() {
876                        3 => {
877                            if let Some(h) = m.payload_handle() {
878                                // Retain on enqueue — released by drain
879                                // after invoke_project.
880                                binding_for_outer.retain_handle(h);
881                                s.buffer.push_back(h);
882                            }
883                            // else: Resolved on outer source — no action.
884                        }
885                        5 => {
886                            if let Some(h) = m.payload_handle() {
887                                // Error
888                                if !s.terminated {
889                                    s.terminated = true;
890                                    binding_for_outer.retain_handle(h);
891                                    while let Some(q) = s.buffer.pop_front() {
892                                        binding_for_outer.release_handle(q);
893                                    }
894                                    error_action = Some(h);
895                                }
896                            } else {
897                                // Complete
898                                s.source_done = true;
899                                if s.active == 0 && s.buffer.is_empty() && !s.terminated {
900                                    s.terminated = true;
901                                    self_complete_now = true;
902                                }
903                            }
904                        }
905                        _ => {} // Tiers 0/1/2/4/6 — no action.
906                    }
907                }
908            }
909
910            if let Some(h) = error_action {
911                em_for_outer.error(producer_id, h);
912                return;
913            }
914            if self_complete_now {
915                em_for_outer.complete(producer_id);
916                return;
917            }
918
919            // Phase 2: drain buffer iteratively up to concurrency cap.
920            drain_merge_buffer(
921                &state_for_outer,
922                &em_for_outer,
923                &binding_for_outer,
924                &producer_binding_for_outer,
925                producer_id,
926                project_fn_id,
927                concurrency,
928            );
929        });
930
931        ctx.subscribe_to(source, outer_sink);
932    });
933
934    let fn_id = binding.register_producer_build(build);
935    core.register_producer(fn_id)
936        .expect("invariant: register_producer has no deps; no error variants reachable")
937}
938
939/// Iteratively pop from `buffer` and spawn inners until cap is reached
940/// or buffer is empty. Re-entrance from a nested `on_complete` is
941/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
942/// the drain loop and picks up cap-frees on subsequent iterations.
943// S2b/D234: `core: &Core` → `em: &ProducerEmitter`. The per-inner
944// subscribe routes through `em.defer` (`CoreFull`); the returned
945// `SubscriptionId` is wrapped in a `SubGuard` keyed in `inner_subs`
946// (its Drop schedules the unsubscribe).
947fn drain_merge_buffer(
948    state: &Rc<RefCell<MergeMapState>>,
949    em: &ProducerEmitter,
950    binding: &Arc<dyn HigherOrderBinding>,
951    producer_binding: &Arc<dyn ProducerBinding>,
952    producer_id: NodeId,
953    project_fn_id: FnId,
954    concurrency: Option<u32>,
955) {
956    if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
957        // Already draining on this thread; outer loop will drain
958        // remaining buffer.
959        return;
960    }
961
962    loop {
963        let h_and_id;
964        let mut should_self_complete = false;
965        {
966            let mut s = state.borrow_mut();
967            if s.terminated {
968                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
969                return;
970            }
971            let allowed = match concurrency {
972                None => true,
973                Some(n) => s.active < n,
974            };
975            if !allowed {
976                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
977                return;
978            }
979            if let Some(h) = s.buffer.pop_front() {
980                s.active += 1;
981                let id = s.next_inner_id;
982                s.next_inner_id += 1;
983                s.pending_inner_ids.insert(id);
984                h_and_id = Some((h, id));
985            } else if s.source_done && s.active == 0 && !s.terminated {
986                s.terminated = true;
987                should_self_complete = true;
988                h_and_id = None;
989            } else {
990                h_and_id = None;
991            }
992        }
993
994        if should_self_complete {
995            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
996            em.complete(producer_id);
997            return;
998        }
999
1000        let Some((outer_h, inner_id)) = h_and_id else {
1001            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1002            return;
1003        };
1004
1005        // Spawn lock-released.
1006        let inner_node = binding.invoke_project(project_fn_id, outer_h);
1007        binding.release_handle(outer_h);
1008
1009        let on_complete = make_merge_on_complete(
1010            state.clone(),
1011            em.clone(),
1012            binding.clone(),
1013            producer_binding.clone(),
1014            producer_id,
1015            project_fn_id,
1016            inner_id,
1017            concurrency,
1018        );
1019        let on_error = make_merge_on_error(state.clone(), em.clone(), binding.clone(), producer_id);
1020        // F2 /qa: clone on_complete so the TornDown branch can
1021        // synthesize inner-Complete (closes the merge_map `s.active`
1022        // leak that left the producer never self-completing when a
1023        // projected inner was dead).
1024        let on_complete_for_dead = on_complete.clone();
1025        let inner_sink = build_inner_sink(
1026            em.clone(),
1027            producer_binding.clone(),
1028            producer_id,
1029            on_complete,
1030            on_error,
1031        );
1032        // D234: inner subscribe via `em.defer` (long-lived drain can't
1033        // hold `&Core`). The sync-completion detection still works: if
1034        // the inner pre-completes during `try_subscribe`'s handshake,
1035        // `on_complete` fires INSIDE this defer (owner-side) and removes
1036        // `inner_id` from `pending_inner_ids` before the post-subscribe
1037        // check below — so a now-dead sub is dropped, not installed.
1038        let state_sub = state.clone();
1039        let em_guard = em.clone();
1040        let _ = em.defer(move |c| {
1041            match c.try_subscribe(inner_node, inner_sink) {
1042                Ok(sub) => {
1043                    let guard = SubGuard::new(inner_node, sub, em_guard);
1044                    let to_drop = {
1045                        let mut s = state_sub.borrow_mut();
1046                        if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1047                            Some(guard)
1048                        } else {
1049                            s.inner_subs.insert(inner_id, guard);
1050                            None
1051                        }
1052                    };
1053                    drop(to_drop);
1054                }
1055                Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
1056                    // R2.2.7.b / F2 /qa: synthesize inner-Complete so
1057                    // `make_merge_on_complete` decrements `s.active`,
1058                    // clears pending, and checks the self-Complete
1059                    // trigger (Dead-inner `s.active` leak fix).
1060                    on_complete_for_dead();
1061                }
1062            }
1063        });
1064
1065        // Loop continues — pops next from buffer or returns.
1066    }
1067}
1068
1069fn make_merge_on_complete(
1070    state: Rc<RefCell<MergeMapState>>,
1071    em: ProducerEmitter,
1072    binding: Arc<dyn HigherOrderBinding>,
1073    producer_binding: Arc<dyn ProducerBinding>,
1074    producer_id: NodeId,
1075    project_fn_id: FnId,
1076    this_inner_id: u64,
1077    concurrency: Option<u32>,
1078) -> Rc<dyn Fn()> {
1079    Rc::new(move || {
1080        let removed_sub;
1081        {
1082            let mut s = state.borrow_mut();
1083            if s.terminated {
1084                return;
1085            }
1086            s.active -= 1;
1087            // Two cases:
1088            // (a) Sync-completion during subscribe: `pending_inner_ids`
1089            //     still contains us; `inner_subs` does not. Remove from
1090            //     pending so the post-subscribe insert sees we're done
1091            //     and skips installing the dead sub.
1092            // (b) Async completion: `inner_subs` contains us. Remove
1093            //     and drop lock-released.
1094            s.pending_inner_ids.remove(&this_inner_id);
1095            removed_sub = s.inner_subs.remove(&this_inner_id);
1096        }
1097        drop(removed_sub); // SubGuard::Drop → deferred unsubscribe.
1098
1099        // Try to drain — if we're nested inside an outer drain loop
1100        // (sync-completion path), this is a no-op and the outer drain
1101        // continues.
1102        drain_merge_buffer(
1103            &state,
1104            &em,
1105            &binding,
1106            &producer_binding,
1107            producer_id,
1108            project_fn_id,
1109            concurrency,
1110        );
1111    })
1112}
1113
1114/// Inner-error path for merge_map: terminates the producer + drains
1115/// all inner subs (lock-released) + releases buffered DATA handles.
1116/// Captures `binding` so we can release buffered handles' retains
1117/// (taken on enqueue in the outer_sink's Data branch); without this,
1118/// inner-error before all buffered DATAs project would leak refcount
1119/// shares.
1120fn make_merge_on_error(
1121    state: Rc<RefCell<MergeMapState>>,
1122    em: ProducerEmitter,
1123    binding: Arc<dyn HigherOrderBinding>,
1124    producer_id: NodeId,
1125) -> Rc<dyn Fn(HandleId)> {
1126    Rc::new(move |h| {
1127        let removed_subs;
1128        let buffered_to_release;
1129        {
1130            let mut s = state.borrow_mut();
1131            if s.terminated {
1132                return;
1133            }
1134            s.terminated = true;
1135            removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1136            s.pending_inner_ids.clear();
1137            buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1138        }
1139        drop(removed_subs); // each SubGuard::Drop → deferred unsubscribe.
1140        for h_b in buffered_to_release {
1141            binding.release_handle(h_b);
1142        }
1143        em.error(producer_id, h);
1144    })
1145}
1146
1147// =====================================================================
1148// Compile-time asserts (Slice E /qa; D248/D249-amended)
1149// =====================================================================
1150//
1151// D248/D249/S2c: `SwitchState`/`ExhaustState`/`MergeMapState` embed a
1152// `ProducerEmitter`, which now carries the owner-only `Rc<DeferQueue>`
1153// (the `!Send` `Defer` split off `CoreMailbox`). Under full
1154// single-owner these op-states are owner-thread-only and intentionally
1155// `!Send` — the prior `Send + Sync` assertions were shared-Core-era
1156// legacy and are deleted. `ProjectFn` is a pure projector (captures no
1157// `!Send`), so it stays `Send + Sync`.
1158
1159const _: fn() = || {
1160    fn assert_send_sync<T: Send + Sync>() {}
1161    assert_send_sync::<ProjectFn>();
1162};