Skip to main content

graphrefly_operators/
higher_order.rs

1//! Higher-order operators (Slice E, D044) — operators whose project fn
2//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
3//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
4//! `concatMap` / `mergeMap`).
5//!
6//! All four are producer-pattern nodes (no declared deps; subscribe to
7//! the outer source from inside the build closure; emit on themselves
8//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
9//! (zip / concat / race / takeUntil); the producer substrate handles
10//! auto-cleanup of upstream + inner subscriptions on producer
11//! deactivation (D031–D038).
12//!
13//! The four flavors differ in how they handle a new outer DATA while a
14//! prior inner is still active:
15//!
16//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
17//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
18//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
19//!   [`merge_map_with_concurrency`] with `Some(1)`.)
20//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
21//!   up to `concurrency`. `None` = unbounded.
22//!
23//! # Inner-sub tracking (Slice E /qa refactor)
24//!
25//! Each operator owns its inner [`Subscription`]s **inside its state
26//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
27//! `producer_storage[producer_id].subs` holds only the OUTER source
28//! subscription (one entry, no positional concerns). switch_map /
29//! exhaust_map keep `Option<Subscription>` (single active inner); merge
30//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
31//! `next_inner_id`. This avoids two bugs the original positional design
32//! exposed: (a) cached-outer source firing handshake before
33//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
34//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
35//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
36//! Complete; merge/concat remove specific id on inner Complete.
37//!
38//! # Drain discipline (iterative spawn)
39//!
40//! `merge_map` could spawn the next buffered DATA from inside an
41//! inner's `on_complete` callback. For pathological pre-completed
42//! inners (synchronous Complete during the subscribe handshake),
43//! recursive spawn would grow the stack proportionally to the buffer
44//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
45//! recursion: the outermost drain owns the loop; nested `on_complete`
46//! invocations only decrement state and return.
47//!
48//! # Project closure (D044)
49//!
50//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
51//! registered through [`HigherOrderBinding::register_project`]. Bindings
52//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
53//! WASM callbacks into this Rust shape; Rust-side users register a
54//! closure directly.
55
56#![allow(clippy::collapsible_if, clippy::collapsible_match)]
57#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
58
59use std::cell::Cell;
60use std::collections::VecDeque;
61use std::sync::{Arc, Mutex, Weak};
62
63use ahash::AHashMap;
64use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink, Subscription};
65use smallvec::SmallVec;
66
67use super::producer::{ProducerBinding, ProducerCtx};
68
69// =====================================================================
70// HigherOrderBinding — closure registration for project: T -> Node<R>
71// =====================================================================
72
73/// Project closure: takes an outer DATA handle, returns the
74/// [`NodeId`] of an inner node to subscribe to. Closure may register
75/// new state/derived nodes on the fly via captured [`Core`].
76pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
77
78/// Closure-registration interface for higher-order operators.
79///
80/// Extends [`ProducerBinding`] with one method that bindings shipping
81/// higher-order operators must implement.
82pub trait HigherOrderBinding: ProducerBinding {
83    /// Register a project closure. The returned [`FnId`] is captured by
84    /// the operator's build closure and looked up via
85    /// [`Self::invoke_project`] on each outer DATA fire.
86    fn register_project(&self, project: ProjectFn) -> FnId;
87
88    /// Invoke a registered project closure with the given outer DATA
89    /// handle. Returns the inner node's [`NodeId`].
90    ///
91    /// # Panics
92    ///
93    /// Implementations panic if `fn_id` is not a registered project
94    /// closure.
95    fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
96}
97
98// =====================================================================
99// build_inner_sink — shared inner-subscription handler
100// =====================================================================
101//
102// Each higher-order op subscribes to the inner node returned by its
103// project closure. The inner sink dispatches by `Message::tier()`
104// (R1.3.7.b — central message-tier utility, never hardcode variant
105// checks for forwarding gating per CLAUDE.md design invariant 4):
106//
107// - Tier 0 (Start) — drop.
108// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
109//   acknowledged divergence from TS legacy; Rust producer relies on
110//   Core's wave engine to generate DIRTY on the producer's own emits.)
111// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
112//   propagated through higher-order ops by design.
113// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
114//   `Resolved` drops (same wave-engine rationale as Dirty).
115// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
116//   per R1.2.7. Inner cache invalidation surfaces to producer
117//   subscribers.
118// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
119//   callbacks (`on_inner_complete` / `on_inner_error`). Differs
120//   per-op (switch_map clears active inner; merge_map decrements
121//   active count; etc.).
122// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
123//   per R2.6.4. Inner permanent destruction propagates.
124
125fn build_inner_sink(
126    core: Core,
127    producer_binding: Arc<dyn ProducerBinding>,
128    producer_id: NodeId,
129    on_inner_complete: Arc<dyn Fn() + Send + Sync>,
130    on_inner_error: Arc<dyn Fn(HandleId) + Send + Sync>,
131) -> Sink {
132    Arc::new(move |msgs: &[Message]| {
133        enum Action {
134            Emit(HandleId),
135            Complete,
136            Error(HandleId),
137            Invalidate,
138            Teardown,
139        }
140        let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
141        for m in msgs {
142            match m.tier() {
143                3 => {
144                    // Tier 3: Data/Resolved. Only Data carries a payload
145                    // to forward; Resolved is dropped per the
146                    // wave-engine divergence above.
147                    if let Some(h) = m.payload_handle() {
148                        producer_binding.retain_handle(h);
149                        actions.push(Action::Emit(h));
150                    }
151                }
152                4 => {
153                    // Tier 4: Invalidate. Forward to producer.
154                    actions.push(Action::Invalidate);
155                }
156                5 => {
157                    // Tier 5: Complete or Error. Semantic dispatch via
158                    // op-specific callbacks. `payload_handle()` is the
159                    // canonical Error-vs-Complete discriminator at this
160                    // tier.
161                    if let Some(h) = m.payload_handle() {
162                        producer_binding.retain_handle(h);
163                        actions.push(Action::Error(h));
164                    } else {
165                        actions.push(Action::Complete);
166                    }
167                }
168                6 => {
169                    // Tier 6: Teardown. Forward to producer.
170                    actions.push(Action::Teardown);
171                }
172                // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
173                _ => {}
174            }
175        }
176        for action in actions {
177            match action {
178                Action::Emit(h) => core.emit_or_defer(producer_id, h),
179                Action::Complete => on_inner_complete(),
180                Action::Error(h) => on_inner_error(h),
181                Action::Invalidate => core.invalidate_or_defer(producer_id),
182                Action::Teardown => core.teardown_or_defer(producer_id),
183            }
184        }
185    })
186}
187
188// =====================================================================
189// switch_map — cancel previous inner on each new outer DATA
190// =====================================================================
191
192struct SwitchState {
193    /// Currently-active inner subscription (if any). Cancelling the
194    /// prior inner is `Option::take` + lock-released drop.
195    inner_sub: Option<Subscription>,
196    source_done: bool,
197    terminated: bool,
198}
199
200impl SwitchState {
201    fn new() -> Self {
202        Self {
203            inner_sub: None,
204            source_done: false,
205            terminated: false,
206        }
207    }
208}
209
210/// `switch_map(source, project)` — for each outer DATA, cancel the
211/// previous inner subscription and subscribe to the inner node returned
212/// by `project(value)`. Inner DATA flows through to downstream; inner
213/// COMPLETE clears the active slot; outer COMPLETE (with no active
214/// inner) self-completes the operator.
215#[must_use]
216pub fn switch_map(
217    core: &Core,
218    binding: &Arc<dyn HigherOrderBinding>,
219    source: NodeId,
220    project: ProjectFn,
221) -> NodeId {
222    let project_fn_id = binding.register_project(project);
223    // Weak captures break the producer-build Arc cycle (see
224    // `graphrefly_operators::ops_impl::zip` doc).
225    let core_weak = core.weak_handle();
226    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
227    let producer_binding_weak: Weak<dyn ProducerBinding> =
228        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
229
230    let build = Box::new(move |ctx: ProducerCtx<'_>| {
231        let producer_id = ctx.node_id();
232        let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
233            core_weak.upgrade(),
234            binding_weak.upgrade(),
235            producer_binding_weak.upgrade(),
236        ) else {
237            return;
238        };
239        let state: Arc<Mutex<SwitchState>> = Arc::new(Mutex::new(SwitchState::new()));
240
241        let state_for_outer = state.clone();
242        let core_for_outer = core_clone.clone();
243        let binding_for_outer = binding_clone.clone();
244        let producer_binding_for_outer = producer_binding.clone();
245
246        let outer_sink: Sink = Arc::new(move |msgs| {
247            // Phase 1: classify under state lock. Track whether we
248            // performed a retain so phase 2 can safely release it
249            // without underflow on `[Data(_), Error(_)]` same-batch
250            // (P1 /qa fix).
251            #[derive(Default)]
252            struct Plan {
253                latest_outer_h: Option<HandleId>,
254                latest_retained: bool,
255                self_complete: bool,
256                self_error: Option<HandleId>,
257            }
258            let mut plan = Plan::default();
259            {
260                let mut s = state_for_outer.lock().unwrap();
261                if s.terminated {
262                    return;
263                }
264                // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
265                // and canonical §4.2 ("Always use the provided tier utilities
266                // rather than hardcoding type checks"). Tier 3 carries
267                // DATA/RESOLVED — only DATA has a payload_handle, so the
268                // `payload_handle().is_some()` test discriminates within
269                // the tier without referring to specific variants. Tier 5
270                // carries COMPLETE/ERROR — same shape (Error has handle,
271                // Complete doesn't).
272                for m in msgs {
273                    match m.tier() {
274                        3 => {
275                            if let Some(h) = m.payload_handle() {
276                                // switchMap: only the LATEST outer DATA in the
277                                // batch matters (TS legacy "skip to last in
278                                // the batch to avoid creating + immediately
279                                // discarding N-1 inners"). Track the handle;
280                                // we'll project + subscribe once after the
281                                // lock drops. Skipped handles need no retain.
282                                plan.latest_outer_h = Some(h);
283                            }
284                            // else: Resolved on outer source — no action.
285                        }
286                        5 => {
287                            if let Some(h) = m.payload_handle() {
288                                // Error
289                                if !s.terminated {
290                                    s.terminated = true;
291                                    binding_for_outer.retain_handle(h);
292                                    plan.self_error = Some(h);
293                                }
294                            } else {
295                                // Complete
296                                s.source_done = true;
297                                if s.inner_sub.is_none()
298                                    && plan.latest_outer_h.is_none()
299                                    && !s.terminated
300                                {
301                                    s.terminated = true;
302                                    plan.self_complete = true;
303                                }
304                            }
305                        }
306                        _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
307                    }
308                }
309                if let Some(h) = plan.latest_outer_h {
310                    if !s.terminated {
311                        // Retain ONE share for the chosen handle —
312                        // released by phase 2 after invoke_project.
313                        binding_for_outer.retain_handle(h);
314                        plan.latest_retained = true;
315                    }
316                }
317            }
318
319            // Phase 2: cancel prior inner sub, project, subscribe.
320            // Gated on `latest_retained` so that an `Error` arriving in
321            // the same batch (which sets terminated and skips the
322            // retain) does NOT trigger an unbalanced release.
323            if plan.latest_retained {
324                let outer_h = plan
325                    .latest_outer_h
326                    .expect("latest_retained implies latest_outer_h is Some");
327
328                // Cancel prior inner sub (if any) lock-released.
329                let prev_inner = {
330                    let mut s = state_for_outer.lock().unwrap();
331                    s.inner_sub.take()
332                };
333                drop(prev_inner);
334
335                // invoke_project lock-released. Releases our retain after.
336                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
337                binding_for_outer.release_handle(outer_h);
338
339                let on_complete = make_switch_on_complete(
340                    state_for_outer.clone(),
341                    core_for_outer.clone(),
342                    producer_id,
343                );
344                let on_error = make_switch_on_error(
345                    state_for_outer.clone(),
346                    core_for_outer.clone(),
347                    producer_id,
348                );
349                // F2 /qa (2026-05-10): clone `on_complete` so the Dead
350                // branch can invoke it as an immediate "inner completed
351                // before any emit" signal — closes the bug where a
352                // batched `[outer.Data, outer.Complete]` projected to a
353                // dead inner wedged the producer (source_done=true,
354                // inner_sub=None, no future trigger).
355                let on_complete_for_dead = on_complete.clone();
356                let inner_sink = build_inner_sink(
357                    core_for_outer.clone(),
358                    producer_binding_for_outer.clone(),
359                    producer_id,
360                    on_complete,
361                    on_error,
362                );
363                // Phase H+ STRICT: try_subscribe + defer for inner source.
364                // F2 /qa: TornDown synthesizes inner-Complete via
365                // on_complete (so the operator's self-Complete trigger
366                // can fire).
367                let inner_sink_for_defer = inner_sink.clone();
368                match core_for_outer.try_subscribe(inner_node, inner_sink) {
369                    Ok(inner_sub) => {
370                        let to_drop = {
371                            let mut s = state_for_outer.lock().unwrap();
372                            if s.terminated {
373                                Some(inner_sub)
374                            } else {
375                                s.inner_sub.replace(inner_sub)
376                            }
377                        };
378                        drop(to_drop);
379                    }
380                    Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
381                        let core_cb = core_for_outer.clone();
382                        let state_cb = state_for_outer.clone();
383                        core_for_outer.push_deferred_producer_op(
384                            graphrefly_core::DeferredProducerOp::Callback(Box::new(move || {
385                                let inner_sub = core_cb.subscribe(inner_node, inner_sink_for_defer);
386                                let to_drop = {
387                                    let mut s = state_cb.lock().unwrap();
388                                    if s.terminated {
389                                        Some(inner_sub)
390                                    } else {
391                                        s.inner_sub.replace(inner_sub)
392                                    }
393                                };
394                                drop(to_drop);
395                            })),
396                        );
397                    }
398                    Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
399                        // R2.2.7.b: inner is dead. Synthesize
400                        // inner-Complete so switch's state machine
401                        // clears inner_sub and checks the self-Complete
402                        // trigger (closes the [Data,Complete] batched
403                        // wedge bug).
404                        on_complete_for_dead();
405                    }
406                }
407            }
408
409            if plan.self_complete {
410                core_for_outer.complete_or_defer(producer_id);
411            } else if let Some(h) = plan.self_error {
412                core_for_outer.error_or_defer(producer_id, h);
413            }
414        });
415
416        ctx.subscribe_to(source, outer_sink);
417    });
418
419    let fn_id = binding.register_producer_build(build);
420    core.register_producer(fn_id)
421        .expect("invariant: register_producer has no deps; no error variants reachable")
422}
423
424fn make_switch_on_complete(
425    state: Arc<Mutex<SwitchState>>,
426    core: Core,
427    producer_id: NodeId,
428) -> Arc<dyn Fn() + Send + Sync> {
429    Arc::new(move || {
430        let prev_inner;
431        let mut should_complete = false;
432        {
433            let mut s = state.lock().unwrap();
434            if s.terminated {
435                return;
436            }
437            prev_inner = s.inner_sub.take();
438            if s.source_done && !s.terminated {
439                s.terminated = true;
440                should_complete = true;
441            }
442        }
443        drop(prev_inner);
444        if should_complete {
445            core.complete_or_defer(producer_id);
446        }
447    })
448}
449
450fn make_switch_on_error(
451    state: Arc<Mutex<SwitchState>>,
452    core: Core,
453    producer_id: NodeId,
454) -> Arc<dyn Fn(HandleId) + Send + Sync> {
455    Arc::new(move |h| {
456        let prev_inner;
457        {
458            let mut s = state.lock().unwrap();
459            if s.terminated {
460                return;
461            }
462            s.terminated = true;
463            prev_inner = s.inner_sub.take();
464        }
465        drop(prev_inner);
466        core.error_or_defer(producer_id, h);
467    })
468}
469
470// =====================================================================
471// exhaust_map — ignore outer DATA while inner is active
472// =====================================================================
473
474struct ExhaustState {
475    inner_sub: Option<Subscription>,
476    source_done: bool,
477    terminated: bool,
478}
479
480impl ExhaustState {
481    fn new() -> Self {
482        Self {
483            inner_sub: None,
484            source_done: false,
485            terminated: false,
486        }
487    }
488}
489
490/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
491/// outer DATA while an inner subscription is active. First outer DATA
492/// per "active window" wins; subsequent DATAs are discarded until the
493/// inner completes.
494#[must_use]
495pub fn exhaust_map(
496    core: &Core,
497    binding: &Arc<dyn HigherOrderBinding>,
498    source: NodeId,
499    project: ProjectFn,
500) -> NodeId {
501    let project_fn_id = binding.register_project(project);
502    // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
503    let core_weak = core.weak_handle();
504    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
505    let producer_binding_weak: Weak<dyn ProducerBinding> =
506        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
507
508    let build = Box::new(move |ctx: ProducerCtx<'_>| {
509        let producer_id = ctx.node_id();
510        let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
511            core_weak.upgrade(),
512            binding_weak.upgrade(),
513            producer_binding_weak.upgrade(),
514        ) else {
515            return;
516        };
517        let state: Arc<Mutex<ExhaustState>> = Arc::new(Mutex::new(ExhaustState::new()));
518
519        let state_for_outer = state.clone();
520        let core_for_outer = core_clone.clone();
521        let binding_for_outer = binding_clone.clone();
522        let producer_binding_for_outer = producer_binding.clone();
523
524        let outer_sink: Sink = Arc::new(move |msgs| {
525            #[derive(Default)]
526            struct Plan {
527                first_outer_h: Option<HandleId>,
528                first_retained: bool,
529                self_complete: bool,
530                self_error: Option<HandleId>,
531            }
532            let mut plan = Plan::default();
533            {
534                let mut s = state_for_outer.lock().unwrap();
535                if s.terminated {
536                    return;
537                }
538                // Tier-based dispatch (canonical §4.2; see
539                // `feedback_use_tier_for_signal_routing.md`).
540                for m in msgs {
541                    match m.tier() {
542                        3 => {
543                            if let Some(h) = m.payload_handle() {
544                                // First DATA per active window wins.
545                                // Remember the first one we accept; subsequent
546                                // batch entries (or DATAs after) drop.
547                                if s.inner_sub.is_none() && plan.first_outer_h.is_none() {
548                                    binding_for_outer.retain_handle(h);
549                                    plan.first_outer_h = Some(h);
550                                    plan.first_retained = true;
551                                }
552                            }
553                            // else: Resolved on outer source — no action.
554                        }
555                        5 => {
556                            if let Some(h) = m.payload_handle() {
557                                // Error
558                                if !s.terminated {
559                                    s.terminated = true;
560                                    // Release any retain we took for
561                                    // first_outer_h — we won't be projecting it.
562                                    if plan.first_retained {
563                                        if let Some(h0) = plan.first_outer_h.take() {
564                                            binding_for_outer.release_handle(h0);
565                                            plan.first_retained = false;
566                                        }
567                                    }
568                                    binding_for_outer.retain_handle(h);
569                                    plan.self_error = Some(h);
570                                }
571                            } else {
572                                // Complete
573                                s.source_done = true;
574                                if s.inner_sub.is_none()
575                                    && plan.first_outer_h.is_none()
576                                    && !s.terminated
577                                {
578                                    s.terminated = true;
579                                    plan.self_complete = true;
580                                }
581                            }
582                        }
583                        _ => {} // Tiers 0/1/2/4/6 — no action.
584                    }
585                }
586            }
587
588            if plan.first_retained {
589                let outer_h = plan
590                    .first_outer_h
591                    .expect("first_retained implies first_outer_h is Some");
592                let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
593                binding_for_outer.release_handle(outer_h);
594
595                let on_complete = make_exhaust_on_complete(
596                    state_for_outer.clone(),
597                    core_for_outer.clone(),
598                    producer_id,
599                );
600                let on_error = make_exhaust_on_error(
601                    state_for_outer.clone(),
602                    core_for_outer.clone(),
603                    producer_id,
604                );
605                // F2 /qa: clone for TornDown synthesis (mirrors switch_map).
606                let on_complete_for_dead = on_complete.clone();
607                let inner_sink = build_inner_sink(
608                    core_for_outer.clone(),
609                    producer_binding_for_outer.clone(),
610                    producer_id,
611                    on_complete,
612                    on_error,
613                );
614                // If inner already pre-completed during the handshake,
615                // on_complete already cleared `inner_sub`. We `replace`
616                // either way; in the synchronous-completion path,
617                // `inner_sub` was None so our just-subscribed (and
618                // already-dead) sub is dropped on the next iteration.
619                // Phase H+ STRICT: try_subscribe + defer for inner source.
620                // R2.2.7.b: TornDown means the inner source is non-resubscribable
621                // and terminal — skip silently.
622                let inner_sink_for_defer = inner_sink.clone();
623                match core_for_outer.try_subscribe(inner_node, inner_sink) {
624                    Ok(inner_sub) => {
625                        let to_drop = {
626                            let mut s = state_for_outer.lock().unwrap();
627                            if s.terminated {
628                                Some(inner_sub)
629                            } else {
630                                s.inner_sub.replace(inner_sub)
631                            }
632                        };
633                        drop(to_drop);
634                    }
635                    Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
636                        let core_cb = core_for_outer.clone();
637                        let state_cb = state_for_outer.clone();
638                        core_for_outer.push_deferred_producer_op(
639                            graphrefly_core::DeferredProducerOp::Callback(Box::new(move || {
640                                let inner_sub = core_cb.subscribe(inner_node, inner_sink_for_defer);
641                                let to_drop = {
642                                    let mut s = state_cb.lock().unwrap();
643                                    if s.terminated {
644                                        Some(inner_sub)
645                                    } else {
646                                        s.inner_sub.replace(inner_sub)
647                                    }
648                                };
649                                drop(to_drop);
650                            })),
651                        );
652                    }
653                    Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
654                        // R2.2.7.b / F2 /qa: synthesize inner-Complete
655                        // so exhaust's `s.inner_sub` clears and the
656                        // next outer DATA can re-project (previously
657                        // the TornDown branch was a silent no-op which
658                        // could leave outer spawning more dead
659                        // projections indefinitely without ever
660                        // advancing the state machine).
661                        on_complete_for_dead();
662                    }
663                }
664            }
665
666            if plan.self_complete {
667                core_for_outer.complete_or_defer(producer_id);
668            } else if let Some(h) = plan.self_error {
669                core_for_outer.error_or_defer(producer_id, h);
670            }
671        });
672
673        ctx.subscribe_to(source, outer_sink);
674    });
675
676    let fn_id = binding.register_producer_build(build);
677    core.register_producer(fn_id)
678        .expect("invariant: register_producer has no deps; no error variants reachable")
679}
680
681fn make_exhaust_on_complete(
682    state: Arc<Mutex<ExhaustState>>,
683    core: Core,
684    producer_id: NodeId,
685) -> Arc<dyn Fn() + Send + Sync> {
686    Arc::new(move || {
687        let prev_inner;
688        let mut should_complete = false;
689        {
690            let mut s = state.lock().unwrap();
691            if s.terminated {
692                return;
693            }
694            prev_inner = s.inner_sub.take();
695            if s.source_done && !s.terminated {
696                s.terminated = true;
697                should_complete = true;
698            }
699        }
700        drop(prev_inner);
701        if should_complete {
702            core.complete_or_defer(producer_id);
703        }
704    })
705}
706
707fn make_exhaust_on_error(
708    state: Arc<Mutex<ExhaustState>>,
709    core: Core,
710    producer_id: NodeId,
711) -> Arc<dyn Fn(HandleId) + Send + Sync> {
712    Arc::new(move |h| {
713        let prev_inner;
714        {
715            let mut s = state.lock().unwrap();
716            if s.terminated {
717                return;
718            }
719            s.terminated = true;
720            prev_inner = s.inner_sub.take();
721        }
722        drop(prev_inner);
723        core.error_or_defer(producer_id, h);
724    })
725}
726
727// =====================================================================
728// merge_map — parallel inners up to `concurrency` cap
729// concat_map — wrapper for concurrency = Some(1)
730// =====================================================================
731
732thread_local! {
733    /// Per-thread guard preventing recursive drain of `MergeMapState`.
734    /// When an `on_complete` fires synchronously inside a
735    /// `Core::subscribe` handshake (pre-completed inner), it must not
736    /// re-enter the drain loop — instead it just decrements + removes
737    /// its sub and returns. The outermost drain owns the loop and
738    /// observes the freed-up cap on its next iteration.
739    static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
740}
741
742struct MergeMapState {
743    /// Number of currently-active inner subscriptions (spawned but
744    /// not yet completed/errored).
745    active: u32,
746    /// Outer DATAs waiting because `active >= concurrency`. Each
747    /// handle has one retain share (taken on enqueue, released on
748    /// dequeue + project).
749    buffer: VecDeque<HandleId>,
750    /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
751    /// inner's `on_complete` removes its entry by id (lock-released
752    /// drop).
753    inner_subs: AHashMap<u64, Subscription>,
754    /// Pending inner ids (between `subscribe` call and
755    /// `inner_subs.insert`). Used to detect synchronous-completion:
756    /// if `on_complete` runs during `subscribe`, it removes from
757    /// `pending_inner_ids`; the post-subscribe code checks the set
758    /// and skips inserting the now-dead sub.
759    pending_inner_ids: ahash::AHashSet<u64>,
760    next_inner_id: u64,
761    source_done: bool,
762    terminated: bool,
763}
764
765impl MergeMapState {
766    fn new() -> Self {
767        Self {
768            active: 0,
769            buffer: VecDeque::new(),
770            inner_subs: AHashMap::new(),
771            pending_inner_ids: ahash::AHashSet::new(),
772            next_inner_id: 0,
773            source_done: false,
774            terminated: false,
775        }
776    }
777}
778
779/// `merge_map(source, project)` — unbounded concurrency variant.
780/// Equivalent to [`merge_map_with_concurrency`] with `None`.
781#[must_use]
782pub fn merge_map(
783    core: &Core,
784    binding: &Arc<dyn HigherOrderBinding>,
785    source: NodeId,
786    project: ProjectFn,
787) -> NodeId {
788    merge_map_with_concurrency(core, binding, source, project, None)
789}
790
791/// `concat_map(source, project)` — sequential queue variant.
792/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
793/// outer DATA is enqueued and processed one-at-a-time.
794#[must_use]
795pub fn concat_map(
796    core: &Core,
797    binding: &Arc<dyn HigherOrderBinding>,
798    source: NodeId,
799    project: ProjectFn,
800) -> NodeId {
801    merge_map_with_concurrency(core, binding, source, project, Some(1))
802}
803
804/// `merge_map_with_concurrency(source, project, concurrency)` — projects
805/// each outer DATA to an inner Node and subscribes in parallel.
806///
807/// `concurrency`:
808/// - `None` → unbounded (every outer DATA spawns immediately).
809/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
810///   buffer until an active inner completes.
811///
812/// Per D043 / D040, this matches the
813/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
814/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
815/// (would buffer everything indefinitely without ever spawning) but
816/// accepted at the type level.
817#[must_use]
818pub fn merge_map_with_concurrency(
819    core: &Core,
820    binding: &Arc<dyn HigherOrderBinding>,
821    source: NodeId,
822    project: ProjectFn,
823    concurrency: Option<u32>,
824) -> NodeId {
825    let project_fn_id = binding.register_project(project);
826    // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
827    let core_weak = core.weak_handle();
828    let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
829    let producer_binding_weak: Weak<dyn ProducerBinding> =
830        Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
831
832    let build = Box::new(move |ctx: ProducerCtx<'_>| {
833        let producer_id = ctx.node_id();
834        let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
835            core_weak.upgrade(),
836            binding_weak.upgrade(),
837            producer_binding_weak.upgrade(),
838        ) else {
839            return;
840        };
841        let state: Arc<Mutex<MergeMapState>> = Arc::new(Mutex::new(MergeMapState::new()));
842
843        let state_for_outer = state.clone();
844        let core_for_outer = core_clone.clone();
845        let binding_for_outer = binding_clone.clone();
846        let producer_binding_for_outer = producer_binding.clone();
847
848        let outer_sink: Sink = Arc::new(move |msgs| {
849            // Phase 1: enqueue DATAs into the buffer (always — drain
850            // loop dequeues + spawns up to cap), classify terminal
851            // signals.
852            let mut error_action: Option<HandleId> = None;
853            let mut self_complete_now = false;
854            {
855                let mut s = state_for_outer.lock().unwrap();
856                if s.terminated {
857                    return;
858                }
859                // Tier-based dispatch (canonical §4.2; see
860                // `feedback_use_tier_for_signal_routing.md`).
861                for m in msgs {
862                    match m.tier() {
863                        3 => {
864                            if let Some(h) = m.payload_handle() {
865                                // Retain on enqueue — released by drain
866                                // after invoke_project.
867                                binding_for_outer.retain_handle(h);
868                                s.buffer.push_back(h);
869                            }
870                            // else: Resolved on outer source — no action.
871                        }
872                        5 => {
873                            if let Some(h) = m.payload_handle() {
874                                // Error
875                                if !s.terminated {
876                                    s.terminated = true;
877                                    binding_for_outer.retain_handle(h);
878                                    while let Some(q) = s.buffer.pop_front() {
879                                        binding_for_outer.release_handle(q);
880                                    }
881                                    error_action = Some(h);
882                                }
883                            } else {
884                                // Complete
885                                s.source_done = true;
886                                if s.active == 0 && s.buffer.is_empty() && !s.terminated {
887                                    s.terminated = true;
888                                    self_complete_now = true;
889                                }
890                            }
891                        }
892                        _ => {} // Tiers 0/1/2/4/6 — no action.
893                    }
894                }
895            }
896
897            if let Some(h) = error_action {
898                core_for_outer.error_or_defer(producer_id, h);
899                return;
900            }
901            if self_complete_now {
902                core_for_outer.complete_or_defer(producer_id);
903                return;
904            }
905
906            // Phase 2: drain buffer iteratively up to concurrency cap.
907            drain_merge_buffer(
908                &state_for_outer,
909                &core_for_outer,
910                &binding_for_outer,
911                &producer_binding_for_outer,
912                producer_id,
913                project_fn_id,
914                concurrency,
915            );
916        });
917
918        ctx.subscribe_to(source, outer_sink);
919    });
920
921    let fn_id = binding.register_producer_build(build);
922    core.register_producer(fn_id)
923        .expect("invariant: register_producer has no deps; no error variants reachable")
924}
925
926/// Iteratively pop from `buffer` and spawn inners until cap is reached
927/// or buffer is empty. Re-entrance from a nested `on_complete` is
928/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
929/// the drain loop and picks up cap-frees on subsequent iterations.
930fn drain_merge_buffer(
931    state: &Arc<Mutex<MergeMapState>>,
932    core: &Core,
933    binding: &Arc<dyn HigherOrderBinding>,
934    producer_binding: &Arc<dyn ProducerBinding>,
935    producer_id: NodeId,
936    project_fn_id: FnId,
937    concurrency: Option<u32>,
938) {
939    if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
940        // Already draining on this thread; outer loop will drain
941        // remaining buffer.
942        return;
943    }
944
945    loop {
946        let h_and_id;
947        let mut should_self_complete = false;
948        {
949            let mut s = state.lock().unwrap();
950            if s.terminated {
951                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
952                return;
953            }
954            let allowed = match concurrency {
955                None => true,
956                Some(n) => s.active < n,
957            };
958            if !allowed {
959                MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
960                return;
961            }
962            if let Some(h) = s.buffer.pop_front() {
963                s.active += 1;
964                let id = s.next_inner_id;
965                s.next_inner_id += 1;
966                s.pending_inner_ids.insert(id);
967                h_and_id = Some((h, id));
968            } else if s.source_done && s.active == 0 && !s.terminated {
969                s.terminated = true;
970                should_self_complete = true;
971                h_and_id = None;
972            } else {
973                h_and_id = None;
974            }
975        }
976
977        if should_self_complete {
978            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
979            core.complete_or_defer(producer_id);
980            return;
981        }
982
983        let Some((outer_h, inner_id)) = h_and_id else {
984            MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
985            return;
986        };
987
988        // Spawn lock-released.
989        let inner_node = binding.invoke_project(project_fn_id, outer_h);
990        binding.release_handle(outer_h);
991
992        let on_complete = make_merge_on_complete(
993            state.clone(),
994            core.clone(),
995            binding.clone(),
996            producer_binding.clone(),
997            producer_id,
998            project_fn_id,
999            inner_id,
1000            concurrency,
1001        );
1002        let on_error =
1003            make_merge_on_error(state.clone(), core.clone(), binding.clone(), producer_id);
1004        // F2 /qa: clone on_complete so the TornDown branch can
1005        // synthesize inner-Complete (closes the merge_map `s.active`
1006        // leak that left the producer never self-completing when a
1007        // projected inner was dead).
1008        let on_complete_for_dead = on_complete.clone();
1009        let inner_sink = build_inner_sink(
1010            core.clone(),
1011            producer_binding.clone(),
1012            producer_id,
1013            on_complete,
1014            on_error,
1015        );
1016        // Phase H+ STRICT: try_subscribe + defer for inner source.
1017        // R2.2.7.b: TornDown means the inner source is non-resubscribable
1018        // and terminal — skip silently and remove from pending so the
1019        // overall lifecycle isn't wedged waiting on a dead inner.
1020        let inner_sink_for_defer = inner_sink.clone();
1021        match core.try_subscribe(inner_node, inner_sink) {
1022            Ok(inner_sub) => {
1023                // Decide whether to install the sub: if `on_complete` fired
1024                // synchronously inside `subscribe` (pre-completed inner), it
1025                // already removed `inner_id` from `pending_inner_ids`.
1026                let to_drop = {
1027                    let mut s = state.lock().unwrap();
1028                    if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1029                        Some(inner_sub)
1030                    } else {
1031                        s.inner_subs.insert(inner_id, inner_sub);
1032                        None
1033                    }
1034                };
1035                drop(to_drop);
1036            }
1037            Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
1038                let core_cb = core.clone();
1039                let state_cb = state.clone();
1040                core.push_deferred_producer_op(graphrefly_core::DeferredProducerOp::Callback(
1041                    Box::new(move || {
1042                        let inner_sub = core_cb.subscribe(inner_node, inner_sink_for_defer);
1043                        let to_drop = {
1044                            let mut s = state_cb.lock().unwrap();
1045                            if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1046                                Some(inner_sub)
1047                            } else {
1048                                s.inner_subs.insert(inner_id, inner_sub);
1049                                None
1050                            }
1051                        };
1052                        drop(to_drop);
1053                    }),
1054                ));
1055            }
1056            Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
1057                // R2.2.7.b / F2 /qa: synthesize inner-Complete. This
1058                // delegates to `make_merge_on_complete`'s state-machine
1059                // which decrements `s.active`, removes inner_id from
1060                // pending, and checks the self-Complete trigger
1061                // (source_done && active == 0 && buffer.empty()) —
1062                // closing the wedge bug where a Dead inner left
1063                // `s.active` permanently inflated and merge_map
1064                // never self-completed even after source completed.
1065                on_complete_for_dead();
1066            }
1067        }
1068
1069        // Loop continues — pops next from buffer or returns.
1070    }
1071}
1072
1073fn make_merge_on_complete(
1074    state: Arc<Mutex<MergeMapState>>,
1075    core: Core,
1076    binding: Arc<dyn HigherOrderBinding>,
1077    producer_binding: Arc<dyn ProducerBinding>,
1078    producer_id: NodeId,
1079    project_fn_id: FnId,
1080    this_inner_id: u64,
1081    concurrency: Option<u32>,
1082) -> Arc<dyn Fn() + Send + Sync> {
1083    Arc::new(move || {
1084        let removed_sub;
1085        {
1086            let mut s = state.lock().unwrap();
1087            if s.terminated {
1088                return;
1089            }
1090            s.active -= 1;
1091            // Two cases:
1092            // (a) Sync-completion during subscribe: `pending_inner_ids`
1093            //     still contains us; `inner_subs` does not. Remove from
1094            //     pending so the post-subscribe insert sees we're done
1095            //     and skips installing the dead sub.
1096            // (b) Async completion: `inner_subs` contains us. Remove
1097            //     and drop lock-released.
1098            s.pending_inner_ids.remove(&this_inner_id);
1099            removed_sub = s.inner_subs.remove(&this_inner_id);
1100        }
1101        drop(removed_sub);
1102
1103        // Try to drain — if we're nested inside an outer drain loop
1104        // (sync-completion path), this is a no-op and the outer drain
1105        // continues.
1106        drain_merge_buffer(
1107            &state,
1108            &core,
1109            &binding,
1110            &producer_binding,
1111            producer_id,
1112            project_fn_id,
1113            concurrency,
1114        );
1115    })
1116}
1117
1118/// Inner-error path for merge_map: terminates the producer + drains
1119/// all inner subs (lock-released) + releases buffered DATA handles.
1120/// Captures `binding` so we can release buffered handles' retains
1121/// (taken on enqueue in the outer_sink's Data branch); without this,
1122/// inner-error before all buffered DATAs project would leak refcount
1123/// shares.
1124fn make_merge_on_error(
1125    state: Arc<Mutex<MergeMapState>>,
1126    core: Core,
1127    binding: Arc<dyn HigherOrderBinding>,
1128    producer_id: NodeId,
1129) -> Arc<dyn Fn(HandleId) + Send + Sync> {
1130    Arc::new(move |h| {
1131        let removed_subs;
1132        let buffered_to_release;
1133        {
1134            let mut s = state.lock().unwrap();
1135            if s.terminated {
1136                return;
1137            }
1138            s.terminated = true;
1139            removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1140            s.pending_inner_ids.clear();
1141            buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1142        }
1143        drop(removed_subs);
1144        for h_b in buffered_to_release {
1145            binding.release_handle(h_b);
1146        }
1147        core.error_or_defer(producer_id, h);
1148    })
1149}
1150
1151// =====================================================================
1152// Send + Sync compile-time asserts (Slice E /qa)
1153// =====================================================================
1154
1155const _: fn() = || {
1156    fn assert_send_sync<T: Send + Sync>() {}
1157    assert_send_sync::<SwitchState>();
1158    assert_send_sync::<ExhaustState>();
1159    assert_send_sync::<MergeMapState>();
1160    assert_send_sync::<ProjectFn>();
1161};