Skip to main content

graphrefly_operators/
higher_order.rs

1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Higher-order operators (Slice E, D044) — operators whose project fn
11//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
12//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
13//! `concatMap` / `mergeMap`).
14//!
15//! All four are producer-pattern nodes (no declared deps; subscribe to
16//! the outer source from inside the build closure; emit on themselves
17//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
18//! (zip / concat / race / takeUntil); the producer substrate handles
19//! auto-cleanup of upstream + inner subscriptions on producer
20//! deactivation (D031–D038).
21//!
22//! The four flavors differ in how they handle a new outer DATA while a
23//! prior inner is still active:
24//!
25//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
26//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
27//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
28//!   [`merge_map_with_concurrency`] with `Some(1)`.)
29//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
30//!   up to `concurrency`. `None` = unbounded.
31//!
32//! # Inner-sub tracking (Slice E /qa refactor)
33//!
34//! Each operator owns its inner [`Subscription`]s **inside its state
35//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
36//! `producer_storage[producer_id].subs` holds only the OUTER source
37//! subscription (one entry, no positional concerns). switch_map /
38//! exhaust_map keep `Option<Subscription>` (single active inner); merge
39//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
40//! `next_inner_id`. This avoids two bugs the original positional design
41//! exposed: (a) cached-outer source firing handshake before
42//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
43//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
44//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
45//! Complete; merge/concat remove specific id on inner Complete.
46//!
47//! # Drain discipline (iterative spawn)
48//!
49//! `merge_map` could spawn the next buffered DATA from inside an
50//! inner's `on_complete` callback. For pathological pre-completed
51//! inners (synchronous Complete during the subscribe handshake),
52//! recursive spawn would grow the stack proportionally to the buffer
53//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
54//! recursion: the outermost drain owns the loop; nested `on_complete`
55//! invocations only decrement state and return.
56//!
57//! # Project closure (D044)
58//!
59//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
60//! registered through [`HigherOrderBinding::register_project`]. Bindings
61//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
62//! WASM callbacks into this Rust shape; Rust-side users register a
63//! closure directly.
64
65#![allow(clippy::collapsible_if, clippy::collapsible_match)]
66#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
67
68use std::cell::Cell;
69use std::collections::VecDeque;
70use std::sync::{Arc, Mutex, Weak};
71
72use ahash::AHashMap;
73use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink};
74use smallvec::SmallVec;
75
76use super::producer::{ProducerBinding, ProducerCtx, ProducerEmitter, SubGuard};
77
78// =====================================================================
79// HigherOrderBinding — closure registration for project: T -> Node<R>
80// =====================================================================
81
82/// Project closure: takes an outer DATA handle, returns the
83/// [`NodeId`] of an inner node to subscribe to. Closure may register
84/// new state/derived nodes on the fly via captured [`Core`].
85pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
86
87/// Closure-registration interface for higher-order operators.
88///
89/// Extends [`ProducerBinding`] with one method that bindings shipping
90/// higher-order operators must implement.
91pub trait HigherOrderBinding: ProducerBinding {
92    /// Register a project closure. The returned [`FnId`] is captured by
93    /// the operator's build closure and looked up via
94    /// [`Self::invoke_project`] on each outer DATA fire.
95    fn register_project(&self, project: ProjectFn) -> FnId;
96
97    /// Invoke a registered project closure with the given outer DATA
98    /// handle. Returns the inner node's [`NodeId`].
99    ///
100    /// # Panics
101    ///
102    /// Implementations panic if `fn_id` is not a registered project
103    /// closure.
104    fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
105}
106
107// =====================================================================
108// build_inner_sink — shared inner-subscription handler
109// =====================================================================
110//
111// Each higher-order op subscribes to the inner node returned by its
112// project closure. The inner sink dispatches by `Message::tier()`
113// (R1.3.7.b — central message-tier utility, never hardcode variant
114// checks for forwarding gating per CLAUDE.md design invariant 4):
115//
116// - Tier 0 (Start) — drop.
117// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
118//   acknowledged divergence from TS legacy; Rust producer relies on
119//   Core's wave engine to generate DIRTY on the producer's own emits.)
120// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
121//   propagated through higher-order ops by design.
122// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
123//   `Resolved` drops (same wave-engine rationale as Dirty).
124// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
125//   per R1.2.7. Inner cache invalidation surfaces to producer
126//   subscribers.
127// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
128//   callbacks (`on_inner_complete` / `on_inner_error`). Differs
129//   per-op (switch_map clears active inner; merge_map decrements
130//   active count; etc.).
131// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
132//   per R2.6.4. Inner permanent destruction propagates.
133
134// S2b/D230/D234: long-lived inner sink takes `em: ProducerEmitter`
135// (was `core: Core`, no longer `Clone`). Emit is the mailbox fast path;
136// the rare INVALIDATE/TEARDOWN terminal forwards route through
137// `em.defer` (`CoreFull`).
138fn build_inner_sink(
139    em: ProducerEmitter,
140    producer_binding: Arc<dyn ProducerBinding>,
141    producer_id: NodeId,
142    on_inner_complete: Arc<dyn Fn()>,
143    on_inner_error: Arc<dyn Fn(HandleId)>,
144) -> Sink {
145    Arc::new(move |msgs: &[Message]| {
146        enum Action {
147            Emit(HandleId),
148            Complete,
149            Error(HandleId),
150            Invalidate,
151            Teardown,
152        }
153        let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
154        for m in msgs {
155            match m.tier() {
156                3 => {
157                    // Tier 3: Data/Resolved. Only Data carries a payload
158                    // to forward; Resolved is dropped per the
159                    // wave-engine divergence above.
160                    if let Some(h) = m.payload_handle() {
161                        producer_binding.retain_handle(h);
162                        actions.push(Action::Emit(h));
163                    }
164                }
165                4 => {
166                    // Tier 4: Invalidate. Forward to producer.
167                    actions.push(Action::Invalidate);
168                }
169                5 => {
170                    // Tier 5: Complete or Error. Semantic dispatch via
171                    // op-specific callbacks. `payload_handle()` is the
172                    // canonical Error-vs-Complete discriminator at this
173                    // tier.
174                    if let Some(h) = m.payload_handle() {
175                        producer_binding.retain_handle(h);
176                        actions.push(Action::Error(h));
177                    } else {
178                        actions.push(Action::Complete);
179                    }
180                }
181                6 => {
182                    // Tier 6: Teardown. Forward to producer.
183                    actions.push(Action::Teardown);
184                }
185                // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
186                _ => {}
187            }
188        }
189        for action in actions {
190            match action {
191                Action::Emit(h) => em.emit_or_defer(producer_id, h),
192                Action::Complete => on_inner_complete(),
193                Action::Error(h) => on_inner_error(h),
194                Action::Invalidate => {
195                    let _ = em.defer(move |c| c.invalidate(producer_id));
196                }
197                Action::Teardown => {
198                    let _ = em.defer(move |c| c.teardown(producer_id));
199                }
200            }
201        }
202    })
203}
204
205// =====================================================================
206// switch_map — cancel previous inner on each new outer DATA
207// =====================================================================
208
209struct SwitchState {
210    /// Currently-active inner subscription (if any). Cancelling the
211    /// prior inner is `Option::take` + drop — S2b: `SubGuard::Drop`
212    /// posts the `unsubscribe` via `em.defer` (owner-side, in-wave,
213    /// FIFO-ordered so cancel drains before any resubscribe — D234).
214    inner_sub: Option<SubGuard>,
215    /// QA P2 (2026-05-18): monotonic switch epoch. Under D234 the
216    /// cancel-prev is `inner_sub.take()` at outer-fire, but the prior
217    /// subscribe is a still-queued `em.defer` that hasn't populated
218    /// `inner_sub` yet — so a fast double-switch within one wave would
219    /// `take()` an empty slot and leave BOTH inners live. Each accepted
220    /// outer DATA bumps `epoch`; the subscribe-defer captures the value
221    /// and, after `try_subscribe`, drops the new `SubGuard` immediately
222    /// (deferred unsubscribe) instead of installing it if `epoch` has
223    /// moved on — i.e. a newer outer DATA superseded it.
224    epoch: u64,
225    source_done: bool,
226    terminated: bool,
227}
228
229impl SwitchState {
230    fn new() -> Self {
231        Self {
232            inner_sub: None,
233            epoch: 0,
234            source_done: false,
235            terminated: false,
236        }
237    }
238}
239
240/// `switch_map(source, project)` — for each outer DATA, cancel the
241/// previous inner subscription and subscribe to the inner node returned
242/// by `project(value)`. Inner DATA flows through to downstream; inner
243/// COMPLETE clears the active slot; outer COMPLETE (with no active
244/// inner) self-completes the operator.
245#[must_use]
246pub fn switch_map(
247    core: &Core,
248    binding: &Arc<dyn HigherOrderBinding>,
249    source: NodeId,
250    project: ProjectFn,
251) -> NodeId {
252    let project_fn_id = binding.register_project(project);
253    // S2b/D223: the Core weak is GONE (Core relocates; long-lived sinks
254    // reach it via `ProducerEmitter`/`em.defer`). The *binding* weaks
255    // stay — they break the registry→build-closure→strong-binding cycle
256    // (unrelated to Core ownership; D223 only forbids `Weak<C>`).
257    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
258    let producer_binding_weak: Weak<dyn ProducerBinding> =
259        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
260
261    let build = Box::new(move |ctx: ProducerCtx<'_>| {
262        let producer_id = ctx.node_id();
263        let (Some(binding_clone), Some(producer_binding)) =
264            (binding_weak.upgrade(), producer_binding_weak.upgrade())
265        else {
266            return;
267        };
268        let em = ctx.emitter();
269        let state: Arc<Mutex<SwitchState>> = Arc::new(Mutex::new(SwitchState::new()));
270
271        let state_for_outer = state.clone();
272        let em_for_outer = em.clone();
273        let binding_for_outer = binding_clone.clone();
274        let producer_binding_for_outer = producer_binding.clone();
275
276        let outer_sink: Sink = Arc::new(move |msgs| {
277            // Phase 1: classify under state lock. Track whether we
278            // performed a retain so phase 2 can safely release it
279            // without underflow on `[Data(_), Error(_)]` same-batch
280            // (P1 /qa fix).
281            #[derive(Default)]
282            struct Plan {
283                latest_outer_h: Option<HandleId>,
284                latest_retained: bool,
285                self_complete: bool,
286                self_error: Option<HandleId>,
287            }
288            let mut plan = Plan::default();
289            {
290                let mut s = state_for_outer.lock().unwrap();
291                if s.terminated {
292                    return;
293                }
294                // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
295                // and canonical §4.2 ("Always use the provided tier utilities
296                // rather than hardcoding type checks"). Tier 3 carries
297                // DATA/RESOLVED — only DATA has a payload_handle, so the
298                // `payload_handle().is_some()` test discriminates within
299                // the tier without referring to specific variants. Tier 5
300                // carries COMPLETE/ERROR — same shape (Error has handle,
301                // Complete doesn't).
302                for m in msgs {
303                    match m.tier() {
304                        3 => {
305                            if let Some(h) = m.payload_handle() {
306                                // switchMap: only the LATEST outer DATA in the
307                                // batch matters (TS legacy "skip to last in
308                                // the batch to avoid creating + immediately
309                                // discarding N-1 inners"). Track the handle;
310                                // we'll project + subscribe once after the
311                                // lock drops. Skipped handles need no retain.
312                                plan.latest_outer_h = Some(h);
313                            }
314                            // else: Resolved on outer source — no action.
315                        }
316                        5 => {
317                            if let Some(h) = m.payload_handle() {
318                                // Error
319                                if !s.terminated {
320                                    s.terminated = true;
321                                    binding_for_outer.retain_handle(h);
322                                    plan.self_error = Some(h);
323                                }
324                            } else {
325                                // Complete
326                                s.source_done = true;
327                                if s.inner_sub.is_none()
328                                    && plan.latest_outer_h.is_none()
329                                    && !s.terminated
330                                {
331                                    s.terminated = true;
332                                    plan.self_complete = true;
333                                }
334                            }
335                        }
336                        _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
337                    }
338                }
339                if let Some(h) = plan.latest_outer_h {
340                    if !s.terminated {
341                        // Retain ONE share for the chosen handle —
342                        // released by phase 2 after invoke_project.
343                        binding_for_outer.retain_handle(h);
344                        plan.latest_retained = true;
345                    }
346                }
347            }
348
349            // Phase 2: cancel prior inner sub, project, subscribe.
350            // Gated on `latest_retained` so that an `Error` arriving in
351            // the same batch (which sets terminated and skips the
352            // retain) does NOT trigger an unbalanced release.
353            if plan.latest_retained {
354                let outer_h = plan
355                    .latest_outer_h
356                    .expect("latest_retained implies latest_outer_h is Some");
357
358                // Cancel prior inner sub (if any) + bump the switch
359                // epoch (QA P2). `take()` only catches an *already
360                // installed* prior inner; a prior subscribe still queued
361                // as an `em.defer` hasn't populated `inner_sub` yet, so
362                // bumping `epoch` here and re-checking it inside the
363                // subscribe-defer is what actually invalidates a
364                // superseded-but-still-queued prior subscribe.
365                let my_epoch = {
366                    let mut s = state_for_outer.lock().unwrap();
367                    let prev = s.inner_sub.take();
368                    s.epoch += 1;
369                    let e = s.epoch;
370                    drop(s);
371                    drop(prev); // SubGuard::Drop → deferred unsubscribe.
372                    e
373                };
374
375                // invoke_project lock-released (binding call, not Core).
376                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
377                binding_for_outer.release_handle(outer_h);
378
379                let on_complete = make_switch_on_complete(
380                    state_for_outer.clone(),
381                    em_for_outer.clone(),
382                    producer_id,
383                );
384                let on_error = make_switch_on_error(
385                    state_for_outer.clone(),
386                    em_for_outer.clone(),
387                    producer_id,
388                );
389                // F2 /qa: TornDown synthesizes inner-Complete so the
390                // self-Complete trigger can fire (batched
391                // [Data,Complete]→dead-inner wedge fix).
392                let on_complete_for_dead = on_complete.clone();
393                let inner_sink = build_inner_sink(
394                    em_for_outer.clone(),
395                    producer_binding_for_outer.clone(),
396                    producer_id,
397                    on_complete,
398                    on_error,
399                );
400                // D234: the inner subscribe needs `CoreFull` from a
401                // long-lived sink → `em.defer` (owner-side, in-wave,
402                // FIFO after the cancel above). The returned
403                // `SubscriptionId` is wrapped in a `SubGuard` (its Drop
404                // schedules the eventual unsubscribe). QA P2: the
405                // captured `my_epoch` invalidates this subscribe if a
406                // newer outer DATA superseded it while it was queued.
407                let state_sub = state_for_outer.clone();
408                let em_guard = em_for_outer.clone();
409                let _ = em_for_outer.defer(move |c| {
410                    match c.try_subscribe(inner_node, inner_sink) {
411                        Ok(sub) => {
412                            let guard = SubGuard::new(inner_node, sub, em_guard);
413                            let to_drop = {
414                                let mut s = state_sub.lock().unwrap();
415                                if s.terminated || s.epoch != my_epoch {
416                                    Some(guard)
417                                } else {
418                                    s.inner_sub.replace(guard)
419                                }
420                            };
421                            drop(to_drop);
422                        }
423                        Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
424                            // R2.2.7.b: inner dead — synthesize
425                            // inner-Complete (clears inner_sub, checks
426                            // self-Complete trigger).
427                            on_complete_for_dead();
428                        }
429                        Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
430                            // Already inside the in-wave drain (no
431                            // partitions held the old way) — a violation
432                            // here is the substrate-invariant break the
433                            // old wave-end-drain guard caught.
434                            panic!(
435                                "switch_map inner subscribe: partition-order \
436                                 violation inside em.defer — substrate invariant broken"
437                            );
438                        }
439                    }
440                });
441            }
442
443            if plan.self_complete {
444                em_for_outer.complete_or_defer(producer_id);
445            } else if let Some(h) = plan.self_error {
446                em_for_outer.error_or_defer(producer_id, h);
447            }
448        });
449
450        ctx.subscribe_to(source, outer_sink);
451    });
452
453    let fn_id = binding.register_producer_build(build);
454    core.register_producer(fn_id)
455        .expect("invariant: register_producer has no deps; no error variants reachable")
456}
457
458fn make_switch_on_complete(
459    state: Arc<Mutex<SwitchState>>,
460    em: ProducerEmitter,
461    producer_id: NodeId,
462) -> Arc<dyn Fn()> {
463    Arc::new(move || {
464        let prev_inner;
465        let mut should_complete = false;
466        {
467            let mut s = state.lock().unwrap();
468            if s.terminated {
469                return;
470            }
471            prev_inner = s.inner_sub.take();
472            if s.source_done && !s.terminated {
473                s.terminated = true;
474                should_complete = true;
475            }
476        }
477        drop(prev_inner); // SubGuard::Drop → deferred unsubscribe.
478        if should_complete {
479            em.complete_or_defer(producer_id);
480        }
481    })
482}
483
484fn make_switch_on_error(
485    state: Arc<Mutex<SwitchState>>,
486    em: ProducerEmitter,
487    producer_id: NodeId,
488) -> Arc<dyn Fn(HandleId)> {
489    Arc::new(move |h| {
490        let prev_inner;
491        {
492            let mut s = state.lock().unwrap();
493            if s.terminated {
494                return;
495            }
496            s.terminated = true;
497            prev_inner = s.inner_sub.take();
498        }
499        drop(prev_inner);
500        em.error_or_defer(producer_id, h);
501    })
502}
503
504// =====================================================================
505// exhaust_map — ignore outer DATA while inner is active
506// =====================================================================
507
508struct ExhaustState {
509    /// Active inner sub. S2b: `Option<SubGuard>` — drop posts the
510    /// deferred unsubscribe (D234).
511    inner_sub: Option<SubGuard>,
512    /// QA P2 (2026-05-18): exhaust is *older-wins* — while a projected
513    /// inner is pending OR active, new outer DATA is dropped. Under
514    /// D234 the subscribe is a queued `em.defer`, so `inner_sub` is
515    /// `None` between accept and install; without this flag a 2nd
516    /// outer DATA in that window would also pass `inner_sub.is_none()`
517    /// and project a 2nd inner (exhaust contract violation). Set true
518    /// when an outer DATA is accepted + its subscribe queued; cleared
519    /// when the inner completes/errors or its subscribe finds the
520    /// source dead.
521    pending: bool,
522    source_done: bool,
523    terminated: bool,
524}
525
526impl ExhaustState {
527    fn new() -> Self {
528        Self {
529            inner_sub: None,
530            pending: false,
531            source_done: false,
532            terminated: false,
533        }
534    }
535}
536
537/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
538/// outer DATA while an inner subscription is active. First outer DATA
539/// per "active window" wins; subsequent DATAs are discarded until the
540/// inner completes.
541#[must_use]
542pub fn exhaust_map(
543    core: &Core,
544    binding: &Arc<dyn HigherOrderBinding>,
545    source: NodeId,
546    project: ProjectFn,
547) -> NodeId {
548    let project_fn_id = binding.register_project(project);
549    // S2b/D223: binding weaks stay (registry-cycle break); Core weak
550    // gone — sinks reach Core via `em`/`em.defer`.
551    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
552    let producer_binding_weak: Weak<dyn ProducerBinding> =
553        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
554
555    let build = Box::new(move |ctx: ProducerCtx<'_>| {
556        let producer_id = ctx.node_id();
557        let (Some(binding_clone), Some(producer_binding)) =
558            (binding_weak.upgrade(), producer_binding_weak.upgrade())
559        else {
560            return;
561        };
562        let em = ctx.emitter();
563        let state: Arc<Mutex<ExhaustState>> = Arc::new(Mutex::new(ExhaustState::new()));
564
565        let state_for_outer = state.clone();
566        let em_for_outer = em.clone();
567        let binding_for_outer = binding_clone.clone();
568        let producer_binding_for_outer = producer_binding.clone();
569
570        let outer_sink: Sink = Arc::new(move |msgs| {
571            #[derive(Default)]
572            struct Plan {
573                first_outer_h: Option<HandleId>,
574                first_retained: bool,
575                self_complete: bool,
576                self_error: Option<HandleId>,
577            }
578            let mut plan = Plan::default();
579            {
580                let mut s = state_for_outer.lock().unwrap();
581                if s.terminated {
582                    return;
583                }
584                // Tier-based dispatch (canonical §4.2; see
585                // `feedback_use_tier_for_signal_routing.md`).
586                for m in msgs {
587                    match m.tier() {
588                        3 => {
589                            if let Some(h) = m.payload_handle() {
590                                // First DATA per active window wins.
591                                // Remember the first one we accept; subsequent
592                                // batch entries (or DATAs after) drop.
593                                // QA P2: also gate on `!s.pending` — a
594                                // prior accepted DATA's subscribe may be
595                                // queued (inner_sub still None); exhaust
596                                // must drop this one (older-wins).
597                                if s.inner_sub.is_none()
598                                    && !s.pending
599                                    && plan.first_outer_h.is_none()
600                                {
601                                    binding_for_outer.retain_handle(h);
602                                    plan.first_outer_h = Some(h);
603                                    plan.first_retained = true;
604                                    s.pending = true;
605                                }
606                            }
607                            // else: Resolved on outer source — no action.
608                        }
609                        5 => {
610                            if let Some(h) = m.payload_handle() {
611                                // Error
612                                if !s.terminated {
613                                    s.terminated = true;
614                                    // Release any retain we took for
615                                    // first_outer_h — we won't be projecting it.
616                                    if plan.first_retained {
617                                        if let Some(h0) = plan.first_outer_h.take() {
618                                            binding_for_outer.release_handle(h0);
619                                            plan.first_retained = false;
620                                        }
621                                    }
622                                    binding_for_outer.retain_handle(h);
623                                    plan.self_error = Some(h);
624                                }
625                            } else {
626                                // Complete
627                                s.source_done = true;
628                                if s.inner_sub.is_none()
629                                    && plan.first_outer_h.is_none()
630                                    && !s.terminated
631                                {
632                                    s.terminated = true;
633                                    plan.self_complete = true;
634                                }
635                            }
636                        }
637                        _ => {} // Tiers 0/1/2/4/6 — no action.
638                    }
639                }
640            }
641
642            if plan.first_retained {
643                let outer_h = plan
644                    .first_outer_h
645                    .expect("first_retained implies first_outer_h is Some");
646                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
647                binding_for_outer.release_handle(outer_h);
648
649                let on_complete = make_exhaust_on_complete(
650                    state_for_outer.clone(),
651                    em_for_outer.clone(),
652                    producer_id,
653                );
654                let on_error = make_exhaust_on_error(
655                    state_for_outer.clone(),
656                    em_for_outer.clone(),
657                    producer_id,
658                );
659                // F2 /qa: clone for TornDown synthesis (mirrors switch_map).
660                let on_complete_for_dead = on_complete.clone();
661                let inner_sink = build_inner_sink(
662                    em_for_outer.clone(),
663                    producer_binding_for_outer.clone(),
664                    producer_id,
665                    on_complete,
666                    on_error,
667                );
668                // D234: inner subscribe via `em.defer` (long-lived sink
669                // can't hold `&Core`). TornDown → synthesize
670                // inner-Complete so `inner_sub` clears and the next
671                // outer DATA can re-project (F2 /qa).
672                let state_sub = state_for_outer.clone();
673                let em_guard = em_for_outer.clone();
674                let _ =
675                    em_for_outer.defer(move |c| match c.try_subscribe(inner_node, inner_sink) {
676                        Ok(sub) => {
677                            let guard = SubGuard::new(inner_node, sub, em_guard);
678                            let to_drop = {
679                                let mut s = state_sub.lock().unwrap();
680                                if s.terminated {
681                                    Some(guard)
682                                } else {
683                                    s.inner_sub.replace(guard)
684                                }
685                            };
686                            drop(to_drop);
687                        }
688                        Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
689                            on_complete_for_dead();
690                        }
691                        Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
692                            panic!(
693                                "exhaust_map inner subscribe: partition-order \
694                                 violation inside em.defer — substrate invariant broken"
695                            );
696                        }
697                    });
698            }
699
700            if plan.self_complete {
701                em_for_outer.complete_or_defer(producer_id);
702            } else if let Some(h) = plan.self_error {
703                em_for_outer.error_or_defer(producer_id, h);
704            }
705        });
706
707        ctx.subscribe_to(source, outer_sink);
708    });
709
710    let fn_id = binding.register_producer_build(build);
711    core.register_producer(fn_id)
712        .expect("invariant: register_producer has no deps; no error variants reachable")
713}
714
715fn make_exhaust_on_complete(
716    state: Arc<Mutex<ExhaustState>>,
717    em: ProducerEmitter,
718    producer_id: NodeId,
719) -> Arc<dyn Fn()> {
720    Arc::new(move || {
721        let prev_inner;
722        let mut should_complete = false;
723        {
724            let mut s = state.lock().unwrap();
725            if s.terminated {
726                return;
727            }
728            prev_inner = s.inner_sub.take();
729            // QA P2: inner finished (or was dead) — exhaust window
730            // re-opens; the next outer DATA may project again.
731            s.pending = false;
732            if s.source_done && !s.terminated {
733                s.terminated = true;
734                should_complete = true;
735            }
736        }
737        drop(prev_inner);
738        if should_complete {
739            em.complete_or_defer(producer_id);
740        }
741    })
742}
743
744fn make_exhaust_on_error(
745    state: Arc<Mutex<ExhaustState>>,
746    em: ProducerEmitter,
747    producer_id: NodeId,
748) -> Arc<dyn Fn(HandleId)> {
749    Arc::new(move |h| {
750        let prev_inner;
751        {
752            let mut s = state.lock().unwrap();
753            if s.terminated {
754                return;
755            }
756            s.terminated = true;
757            prev_inner = s.inner_sub.take();
758        }
759        drop(prev_inner);
760        em.error_or_defer(producer_id, h);
761    })
762}
763
764// =====================================================================
765// merge_map — parallel inners up to `concurrency` cap
766// concat_map — wrapper for concurrency = Some(1)
767// =====================================================================
768
769thread_local! {
770    /// Per-thread guard preventing recursive drain of `MergeMapState`.
771    /// When an `on_complete` fires synchronously inside a
772    /// `Core::subscribe` handshake (pre-completed inner), it must not
773    /// re-enter the drain loop — instead it just decrements + removes
774    /// its sub and returns. The outermost drain owns the loop and
775    /// observes the freed-up cap on its next iteration.
776    static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
777}
778
779struct MergeMapState {
780    /// Number of currently-active inner subscriptions (spawned but
781    /// not yet completed/errored).
782    active: u32,
783    /// Outer DATAs waiting because `active >= concurrency`. Each
784    /// handle has one retain share (taken on enqueue, released on
785    /// dequeue + project).
786    buffer: VecDeque<HandleId>,
787    /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
788    /// inner's `on_complete` removes its entry by id (lock-released
789    /// drop).
790    inner_subs: AHashMap<u64, SubGuard>,
791    /// Pending inner ids (between `subscribe` call and
792    /// `inner_subs.insert`). Used to detect synchronous-completion:
793    /// if `on_complete` runs during `subscribe`, it removes from
794    /// `pending_inner_ids`; the post-subscribe code checks the set
795    /// and skips inserting the now-dead sub.
796    pending_inner_ids: ahash::AHashSet<u64>,
797    next_inner_id: u64,
798    source_done: bool,
799    terminated: bool,
800}
801
802impl MergeMapState {
803    fn new() -> Self {
804        Self {
805            active: 0,
806            buffer: VecDeque::new(),
807            inner_subs: AHashMap::new(),
808            pending_inner_ids: ahash::AHashSet::new(),
809            next_inner_id: 0,
810            source_done: false,
811            terminated: false,
812        }
813    }
814}
815
816/// `merge_map(source, project)` — unbounded concurrency variant.
817/// Equivalent to [`merge_map_with_concurrency`] with `None`.
818#[must_use]
819pub fn merge_map(
820    core: &Core,
821    binding: &Arc<dyn HigherOrderBinding>,
822    source: NodeId,
823    project: ProjectFn,
824) -> NodeId {
825    merge_map_with_concurrency(core, binding, source, project, None)
826}
827
828/// `concat_map(source, project)` — sequential queue variant.
829/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
830/// outer DATA is enqueued and processed one-at-a-time.
831#[must_use]
832pub fn concat_map(
833    core: &Core,
834    binding: &Arc<dyn HigherOrderBinding>,
835    source: NodeId,
836    project: ProjectFn,
837) -> NodeId {
838    merge_map_with_concurrency(core, binding, source, project, Some(1))
839}
840
841/// `merge_map_with_concurrency(source, project, concurrency)` — projects
842/// each outer DATA to an inner Node and subscribes in parallel.
843///
844/// `concurrency`:
845/// - `None` → unbounded (every outer DATA spawns immediately).
846/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
847///   buffer until an active inner completes.
848///
849/// Per D043 / D040, this matches the
850/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
851/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
852/// (would buffer everything indefinitely without ever spawning) but
853/// accepted at the type level.
854#[must_use]
855pub fn merge_map_with_concurrency(
856    core: &Core,
857    binding: &Arc<dyn HigherOrderBinding>,
858    source: NodeId,
859    project: ProjectFn,
860    concurrency: Option<u32>,
861) -> NodeId {
862    let project_fn_id = binding.register_project(project);
863    // S2b/D223: binding weaks stay (registry-cycle break); Core weak
864    // gone — sinks reach Core via `em`/`em.defer`.
865    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
866    let producer_binding_weak: Weak<dyn ProducerBinding> =
867        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
868
869    let build = Box::new(move |ctx: ProducerCtx<'_>| {
870        let producer_id = ctx.node_id();
871        let (Some(binding_clone), Some(producer_binding)) =
872            (binding_weak.upgrade(), producer_binding_weak.upgrade())
873        else {
874            return;
875        };
876        let em = ctx.emitter();
877        let state: Arc<Mutex<MergeMapState>> = Arc::new(Mutex::new(MergeMapState::new()));
878
879        let state_for_outer = state.clone();
880        let em_for_outer = em.clone();
881        let binding_for_outer = binding_clone.clone();
882        let producer_binding_for_outer = producer_binding.clone();
883
884        let outer_sink: Sink = Arc::new(move |msgs| {
885            // Phase 1: enqueue DATAs into the buffer (always — drain
886            // loop dequeues + spawns up to cap), classify terminal
887            // signals.
888            let mut error_action: Option<HandleId> = None;
889            let mut self_complete_now = false;
890            {
891                let mut s = state_for_outer.lock().unwrap();
892                if s.terminated {
893                    return;
894                }
895                // Tier-based dispatch (canonical §4.2; see
896                // `feedback_use_tier_for_signal_routing.md`).
897                for m in msgs {
898                    match m.tier() {
899                        3 => {
900                            if let Some(h) = m.payload_handle() {
901                                // Retain on enqueue — released by drain
902                                // after invoke_project.
903                                binding_for_outer.retain_handle(h);
904                                s.buffer.push_back(h);
905                            }
906                            // else: Resolved on outer source — no action.
907                        }
908                        5 => {
909                            if let Some(h) = m.payload_handle() {
910                                // Error
911                                if !s.terminated {
912                                    s.terminated = true;
913                                    binding_for_outer.retain_handle(h);
914                                    while let Some(q) = s.buffer.pop_front() {
915                                        binding_for_outer.release_handle(q);
916                                    }
917                                    error_action = Some(h);
918                                }
919                            } else {
920                                // Complete
921                                s.source_done = true;
922                                if s.active == 0 && s.buffer.is_empty() && !s.terminated {
923                                    s.terminated = true;
924                                    self_complete_now = true;
925                                }
926                            }
927                        }
928                        _ => {} // Tiers 0/1/2/4/6 — no action.
929                    }
930                }
931            }
932
933            if let Some(h) = error_action {
934                em_for_outer.error_or_defer(producer_id, h);
935                return;
936            }
937            if self_complete_now {
938                em_for_outer.complete_or_defer(producer_id);
939                return;
940            }
941
942            // Phase 2: drain buffer iteratively up to concurrency cap.
943            drain_merge_buffer(
944                &state_for_outer,
945                &em_for_outer,
946                &binding_for_outer,
947                &producer_binding_for_outer,
948                producer_id,
949                project_fn_id,
950                concurrency,
951            );
952        });
953
954        ctx.subscribe_to(source, outer_sink);
955    });
956
957    let fn_id = binding.register_producer_build(build);
958    core.register_producer(fn_id)
959        .expect("invariant: register_producer has no deps; no error variants reachable")
960}
961
962/// Iteratively pop from `buffer` and spawn inners until cap is reached
963/// or buffer is empty. Re-entrance from a nested `on_complete` is
964/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
965/// the drain loop and picks up cap-frees on subsequent iterations.
966// S2b/D234: `core: &Core` → `em: &ProducerEmitter`. The per-inner
967// subscribe routes through `em.defer` (`CoreFull`); the returned
968// `SubscriptionId` is wrapped in a `SubGuard` keyed in `inner_subs`
969// (its Drop schedules the unsubscribe).
970fn drain_merge_buffer(
971    state: &Arc<Mutex<MergeMapState>>,
972    em: &ProducerEmitter,
973    binding: &Arc<dyn HigherOrderBinding>,
974    producer_binding: &Arc<dyn ProducerBinding>,
975    producer_id: NodeId,
976    project_fn_id: FnId,
977    concurrency: Option<u32>,
978) {
979    if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
980        // Already draining on this thread; outer loop will drain
981        // remaining buffer.
982        return;
983    }
984
985    loop {
986        let h_and_id;
987        let mut should_self_complete = false;
988        {
989            let mut s = state.lock().unwrap();
990            if s.terminated {
991                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
992                return;
993            }
994            let allowed = match concurrency {
995                None => true,
996                Some(n) => s.active < n,
997            };
998            if !allowed {
999                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1000                return;
1001            }
1002            if let Some(h) = s.buffer.pop_front() {
1003                s.active += 1;
1004                let id = s.next_inner_id;
1005                s.next_inner_id += 1;
1006                s.pending_inner_ids.insert(id);
1007                h_and_id = Some((h, id));
1008            } else if s.source_done && s.active == 0 && !s.terminated {
1009                s.terminated = true;
1010                should_self_complete = true;
1011                h_and_id = None;
1012            } else {
1013                h_and_id = None;
1014            }
1015        }
1016
1017        if should_self_complete {
1018            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1019            em.complete_or_defer(producer_id);
1020            return;
1021        }
1022
1023        let Some((outer_h, inner_id)) = h_and_id else {
1024            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1025            return;
1026        };
1027
1028        // Spawn lock-released.
1029        let inner_node = binding.invoke_project(project_fn_id, outer_h);
1030        binding.release_handle(outer_h);
1031
1032        let on_complete = make_merge_on_complete(
1033            state.clone(),
1034            em.clone(),
1035            binding.clone(),
1036            producer_binding.clone(),
1037            producer_id,
1038            project_fn_id,
1039            inner_id,
1040            concurrency,
1041        );
1042        let on_error = make_merge_on_error(state.clone(), em.clone(), binding.clone(), producer_id);
1043        // F2 /qa: clone on_complete so the TornDown branch can
1044        // synthesize inner-Complete (closes the merge_map `s.active`
1045        // leak that left the producer never self-completing when a
1046        // projected inner was dead).
1047        let on_complete_for_dead = on_complete.clone();
1048        let inner_sink = build_inner_sink(
1049            em.clone(),
1050            producer_binding.clone(),
1051            producer_id,
1052            on_complete,
1053            on_error,
1054        );
1055        // D234: inner subscribe via `em.defer` (long-lived drain can't
1056        // hold `&Core`). The sync-completion detection still works: if
1057        // the inner pre-completes during `try_subscribe`'s handshake,
1058        // `on_complete` fires INSIDE this defer (owner-side) and removes
1059        // `inner_id` from `pending_inner_ids` before the post-subscribe
1060        // check below — so a now-dead sub is dropped, not installed.
1061        let state_sub = state.clone();
1062        let em_guard = em.clone();
1063        let _ = em.defer(move |c| {
1064            match c.try_subscribe(inner_node, inner_sink) {
1065                Ok(sub) => {
1066                    let guard = SubGuard::new(inner_node, sub, em_guard);
1067                    let to_drop = {
1068                        let mut s = state_sub.lock().unwrap();
1069                        if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1070                            Some(guard)
1071                        } else {
1072                            s.inner_subs.insert(inner_id, guard);
1073                            None
1074                        }
1075                    };
1076                    drop(to_drop);
1077                }
1078                Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
1079                    // R2.2.7.b / F2 /qa: synthesize inner-Complete so
1080                    // `make_merge_on_complete` decrements `s.active`,
1081                    // clears pending, and checks the self-Complete
1082                    // trigger (Dead-inner `s.active` leak fix).
1083                    on_complete_for_dead();
1084                }
1085                Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
1086                    panic!(
1087                        "merge_map inner subscribe: partition-order \
1088                         violation inside em.defer — substrate invariant broken"
1089                    );
1090                }
1091            }
1092        });
1093
1094        // Loop continues — pops next from buffer or returns.
1095    }
1096}
1097
1098fn make_merge_on_complete(
1099    state: Arc<Mutex<MergeMapState>>,
1100    em: ProducerEmitter,
1101    binding: Arc<dyn HigherOrderBinding>,
1102    producer_binding: Arc<dyn ProducerBinding>,
1103    producer_id: NodeId,
1104    project_fn_id: FnId,
1105    this_inner_id: u64,
1106    concurrency: Option<u32>,
1107) -> Arc<dyn Fn()> {
1108    Arc::new(move || {
1109        let removed_sub;
1110        {
1111            let mut s = state.lock().unwrap();
1112            if s.terminated {
1113                return;
1114            }
1115            s.active -= 1;
1116            // Two cases:
1117            // (a) Sync-completion during subscribe: `pending_inner_ids`
1118            //     still contains us; `inner_subs` does not. Remove from
1119            //     pending so the post-subscribe insert sees we're done
1120            //     and skips installing the dead sub.
1121            // (b) Async completion: `inner_subs` contains us. Remove
1122            //     and drop lock-released.
1123            s.pending_inner_ids.remove(&this_inner_id);
1124            removed_sub = s.inner_subs.remove(&this_inner_id);
1125        }
1126        drop(removed_sub); // SubGuard::Drop → deferred unsubscribe.
1127
1128        // Try to drain — if we're nested inside an outer drain loop
1129        // (sync-completion path), this is a no-op and the outer drain
1130        // continues.
1131        drain_merge_buffer(
1132            &state,
1133            &em,
1134            &binding,
1135            &producer_binding,
1136            producer_id,
1137            project_fn_id,
1138            concurrency,
1139        );
1140    })
1141}
1142
1143/// Inner-error path for merge_map: terminates the producer + drains
1144/// all inner subs (lock-released) + releases buffered DATA handles.
1145/// Captures `binding` so we can release buffered handles' retains
1146/// (taken on enqueue in the outer_sink's Data branch); without this,
1147/// inner-error before all buffered DATAs project would leak refcount
1148/// shares.
1149fn make_merge_on_error(
1150    state: Arc<Mutex<MergeMapState>>,
1151    em: ProducerEmitter,
1152    binding: Arc<dyn HigherOrderBinding>,
1153    producer_id: NodeId,
1154) -> Arc<dyn Fn(HandleId)> {
1155    Arc::new(move |h| {
1156        let removed_subs;
1157        let buffered_to_release;
1158        {
1159            let mut s = state.lock().unwrap();
1160            if s.terminated {
1161                return;
1162            }
1163            s.terminated = true;
1164            removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1165            s.pending_inner_ids.clear();
1166            buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1167        }
1168        drop(removed_subs); // each SubGuard::Drop → deferred unsubscribe.
1169        for h_b in buffered_to_release {
1170            binding.release_handle(h_b);
1171        }
1172        em.error_or_defer(producer_id, h);
1173    })
1174}
1175
1176// =====================================================================
1177// Compile-time asserts (Slice E /qa; D248/D249-amended)
1178// =====================================================================
1179//
1180// D248/D249/S2c: `SwitchState`/`ExhaustState`/`MergeMapState` embed a
1181// `ProducerEmitter`, which now carries the owner-only `Rc<DeferQueue>`
1182// (the `!Send` `Defer` split off `CoreMailbox`). Under full
1183// single-owner these op-states are owner-thread-only and intentionally
1184// `!Send` — the prior `Send + Sync` assertions were shared-Core-era
1185// legacy and are deleted. `ProjectFn` is a pure projector (captures no
1186// `!Send`), so it stays `Send + Sync`.
1187
1188const _: fn() = || {
1189    fn assert_send_sync<T: Send + Sync>() {}
1190    assert_send_sync::<ProjectFn>();
1191};