graphrefly_operators/higher_order.rs
1//! Higher-order operators (Slice E, D044) — operators whose project fn
2//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
3//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
4//! `concatMap` / `mergeMap`).
5//!
6//! All four are producer-pattern nodes (no declared deps; subscribe to
7//! the outer source from inside the build closure; emit on themselves
8//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
9//! (zip / concat / race / takeUntil); the producer substrate handles
10//! auto-cleanup of upstream + inner subscriptions on producer
11//! deactivation (D031–D038).
12//!
13//! The four flavors differ in how they handle a new outer DATA while a
14//! prior inner is still active:
15//!
16//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
17//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
18//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
19//! [`merge_map_with_concurrency`] with `Some(1)`.)
20//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
21//! up to `concurrency`. `None` = unbounded.
22//!
23//! # Inner-sub tracking (Slice E /qa refactor)
24//!
25//! Each operator owns its inner [`Subscription`]s **inside its state
26//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
27//! `producer_storage[producer_id].subs` holds only the OUTER source
28//! subscription (one entry, no positional concerns). switch_map /
29//! exhaust_map keep `Option<Subscription>` (single active inner); merge
30//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
31//! `next_inner_id`. This avoids two bugs the original positional design
32//! exposed: (a) cached-outer source firing handshake before
33//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
34//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
35//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
36//! Complete; merge/concat remove specific id on inner Complete.
37//!
38//! # Drain discipline (iterative spawn)
39//!
40//! `merge_map` could spawn the next buffered DATA from inside an
41//! inner's `on_complete` callback. For pathological pre-completed
42//! inners (synchronous Complete during the subscribe handshake),
43//! recursive spawn would grow the stack proportionally to the buffer
44//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
45//! recursion: the outermost drain owns the loop; nested `on_complete`
46//! invocations only decrement state and return.
47//!
48//! # Project closure (D044)
49//!
50//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
51//! registered through [`HigherOrderBinding::register_project`]. Bindings
52//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
53//! WASM callbacks into this Rust shape; Rust-side users register a
54//! closure directly.
55
56#![allow(clippy::collapsible_if, clippy::collapsible_match)]
57#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
58
59use std::cell::Cell;
60use std::cell::RefCell;
61use std::collections::VecDeque;
62use std::rc::Rc;
63use std::sync::{Arc, Weak};
64
65use ahash::AHashMap;
66use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink};
67use smallvec::SmallVec;
68
69use super::producer::{ProducerBinding, ProducerCtx, ProducerEmitter, SubGuard};
70
71// =====================================================================
72// HigherOrderBinding — closure registration for project: T -> Node<R>
73// =====================================================================
74
75/// Project closure: takes an outer DATA handle, returns the
76/// [`NodeId`] of an inner node to subscribe to. Closure may register
77/// new state/derived nodes on the fly via captured [`Core`].
78pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
79
80/// Closure-registration interface for higher-order operators.
81///
82/// Extends [`ProducerBinding`] with one method that bindings shipping
83/// higher-order operators must implement.
84pub trait HigherOrderBinding: ProducerBinding {
85 /// Register a project closure. The returned [`FnId`] is captured by
86 /// the operator's build closure and looked up via
87 /// [`Self::invoke_project`] on each outer DATA fire.
88 fn register_project(&self, project: ProjectFn) -> FnId;
89
90 /// Invoke a registered project closure with the given outer DATA
91 /// handle. Returns the inner node's [`NodeId`].
92 ///
93 /// # Panics
94 ///
95 /// Implementations panic if `fn_id` is not a registered project
96 /// closure.
97 fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
98}
99
100// =====================================================================
101// build_inner_sink — shared inner-subscription handler
102// =====================================================================
103//
104// Each higher-order op subscribes to the inner node returned by its
105// project closure. The inner sink dispatches by `Message::tier()`
106// (R1.3.7.b — central message-tier utility, never hardcode variant
107// checks for forwarding gating per CLAUDE.md design invariant 4):
108//
109// - Tier 0 (Start) — drop.
110// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
111// acknowledged divergence from TS legacy; Rust producer relies on
112// Core's wave engine to generate DIRTY on the producer's own emits.)
113// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
114// propagated through higher-order ops by design.
115// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
116// `Resolved` drops (same wave-engine rationale as Dirty).
117// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
118// per R1.2.7. Inner cache invalidation surfaces to producer
119// subscribers.
120// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
121// callbacks (`on_inner_complete` / `on_inner_error`). Differs
122// per-op (switch_map clears active inner; merge_map decrements
123// active count; etc.).
124// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
125// per R2.6.4. Inner permanent destruction propagates.
126
127// S2b/D230/D234: long-lived inner sink takes `em: ProducerEmitter`
128// (was `core: Core`, no longer `Clone`). Emit is the mailbox fast path;
129// the rare INVALIDATE/TEARDOWN terminal forwards route through
130// `em.defer` (`CoreFull`).
131fn build_inner_sink(
132 em: ProducerEmitter,
133 producer_binding: Arc<dyn ProducerBinding>,
134 producer_id: NodeId,
135 on_inner_complete: Rc<dyn Fn()>,
136 on_inner_error: Rc<dyn Fn(HandleId)>,
137) -> Sink {
138 Rc::new(move |msgs: &[Message]| {
139 enum Action {
140 Emit(HandleId),
141 Complete,
142 Error(HandleId),
143 Invalidate,
144 Teardown,
145 }
146 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
147 for m in msgs {
148 match m.tier() {
149 3 => {
150 // Tier 3: Data/Resolved. Only Data carries a payload
151 // to forward; Resolved is dropped per the
152 // wave-engine divergence above.
153 if let Some(h) = m.payload_handle() {
154 producer_binding.retain_handle(h);
155 actions.push(Action::Emit(h));
156 }
157 }
158 4 => {
159 // Tier 4: Invalidate. Forward to producer.
160 actions.push(Action::Invalidate);
161 }
162 5 => {
163 // Tier 5: Complete or Error. Semantic dispatch via
164 // op-specific callbacks. `payload_handle()` is the
165 // canonical Error-vs-Complete discriminator at this
166 // tier.
167 if let Some(h) = m.payload_handle() {
168 producer_binding.retain_handle(h);
169 actions.push(Action::Error(h));
170 } else {
171 actions.push(Action::Complete);
172 }
173 }
174 6 => {
175 // Tier 6: Teardown. Forward to producer.
176 actions.push(Action::Teardown);
177 }
178 // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
179 _ => {}
180 }
181 }
182 for action in actions {
183 match action {
184 Action::Emit(h) => em.emit(producer_id, h),
185 Action::Complete => on_inner_complete(),
186 Action::Error(h) => on_inner_error(h),
187 Action::Invalidate => {
188 let _ = em.defer(move |c| c.invalidate(producer_id));
189 }
190 Action::Teardown => {
191 let _ = em.defer(move |c| c.teardown(producer_id));
192 }
193 }
194 }
195 })
196}
197
198// =====================================================================
199// switch_map — cancel previous inner on each new outer DATA
200// =====================================================================
201
202struct SwitchState {
203 /// Currently-active inner subscription (if any). Cancelling the
204 /// prior inner is `Option::take` + drop — S2b: `SubGuard::Drop`
205 /// posts the `unsubscribe` via `em.defer` (owner-side, in-wave,
206 /// FIFO-ordered so cancel drains before any resubscribe — D234).
207 inner_sub: Option<SubGuard>,
208 /// QA P2 (2026-05-18): monotonic switch epoch. Under D234 the
209 /// cancel-prev is `inner_sub.take()` at outer-fire, but the prior
210 /// subscribe is a still-queued `em.defer` that hasn't populated
211 /// `inner_sub` yet — so a fast double-switch within one wave would
212 /// `take()` an empty slot and leave BOTH inners live. Each accepted
213 /// outer DATA bumps `epoch`; the subscribe-defer captures the value
214 /// and, after `try_subscribe`, drops the new `SubGuard` immediately
215 /// (deferred unsubscribe) instead of installing it if `epoch` has
216 /// moved on — i.e. a newer outer DATA superseded it.
217 epoch: u64,
218 source_done: bool,
219 terminated: bool,
220}
221
222impl SwitchState {
223 fn new() -> Self {
224 Self {
225 inner_sub: None,
226 epoch: 0,
227 source_done: false,
228 terminated: false,
229 }
230 }
231}
232
233/// `switch_map(source, project)` — for each outer DATA, cancel the
234/// previous inner subscription and subscribe to the inner node returned
235/// by `project(value)`. Inner DATA flows through to downstream; inner
236/// COMPLETE clears the active slot; outer COMPLETE (with no active
237/// inner) self-completes the operator.
238#[must_use]
239pub fn switch_map(
240 core: &Core,
241 binding: &Arc<dyn HigherOrderBinding>,
242 source: NodeId,
243 project: ProjectFn,
244) -> NodeId {
245 let project_fn_id = binding.register_project(project);
246 // S2b/D223: the Core weak is GONE (Core relocates; long-lived sinks
247 // reach it via `ProducerEmitter`/`em.defer`). The *binding* weaks
248 // stay — they break the registry→build-closure→strong-binding cycle
249 // (unrelated to Core ownership; D223 only forbids `Weak<C>`).
250 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
251 let producer_binding_weak: Weak<dyn ProducerBinding> =
252 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
253
254 let build = Box::new(move |ctx: ProducerCtx<'_>| {
255 let producer_id = ctx.node_id();
256 let (Some(binding_clone), Some(producer_binding)) =
257 (binding_weak.upgrade(), producer_binding_weak.upgrade())
258 else {
259 return;
260 };
261 let em = ctx.emitter();
262 let state: Rc<RefCell<SwitchState>> = Rc::new(RefCell::new(SwitchState::new()));
263
264 let state_for_outer = state.clone();
265 let em_for_outer = em.clone();
266 let binding_for_outer = binding_clone.clone();
267 let producer_binding_for_outer = producer_binding.clone();
268
269 let outer_sink: Sink = Rc::new(move |msgs| {
270 // Phase 1: classify under state lock. Track whether we
271 // performed a retain so phase 2 can safely release it
272 // without underflow on `[Data(_), Error(_)]` same-batch
273 // (P1 /qa fix).
274 #[derive(Default)]
275 struct Plan {
276 latest_outer_h: Option<HandleId>,
277 latest_retained: bool,
278 self_complete: bool,
279 self_error: Option<HandleId>,
280 }
281 let mut plan = Plan::default();
282 {
283 let mut s = state_for_outer.borrow_mut();
284 if s.terminated {
285 return;
286 }
287 // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
288 // and canonical §4.2 ("Always use the provided tier utilities
289 // rather than hardcoding type checks"). Tier 3 carries
290 // DATA/RESOLVED — only DATA has a payload_handle, so the
291 // `payload_handle().is_some()` test discriminates within
292 // the tier without referring to specific variants. Tier 5
293 // carries COMPLETE/ERROR — same shape (Error has handle,
294 // Complete doesn't).
295 for m in msgs {
296 match m.tier() {
297 3 => {
298 if let Some(h) = m.payload_handle() {
299 // switchMap: only the LATEST outer DATA in the
300 // batch matters (TS legacy "skip to last in
301 // the batch to avoid creating + immediately
302 // discarding N-1 inners"). Track the handle;
303 // we'll project + subscribe once after the
304 // lock drops. Skipped handles need no retain.
305 plan.latest_outer_h = Some(h);
306 }
307 // else: Resolved on outer source — no action.
308 }
309 5 => {
310 if let Some(h) = m.payload_handle() {
311 // Error
312 if !s.terminated {
313 s.terminated = true;
314 binding_for_outer.retain_handle(h);
315 plan.self_error = Some(h);
316 }
317 } else {
318 // Complete
319 s.source_done = true;
320 if s.inner_sub.is_none()
321 && plan.latest_outer_h.is_none()
322 && !s.terminated
323 {
324 s.terminated = true;
325 plan.self_complete = true;
326 }
327 }
328 }
329 _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
330 }
331 }
332 if let Some(h) = plan.latest_outer_h {
333 if !s.terminated {
334 // Retain ONE share for the chosen handle —
335 // released by phase 2 after invoke_project.
336 binding_for_outer.retain_handle(h);
337 plan.latest_retained = true;
338 }
339 }
340 }
341
342 // Phase 2: cancel prior inner sub, project, subscribe.
343 // Gated on `latest_retained` so that an `Error` arriving in
344 // the same batch (which sets terminated and skips the
345 // retain) does NOT trigger an unbalanced release.
346 if plan.latest_retained {
347 let outer_h = plan
348 .latest_outer_h
349 .expect("latest_retained implies latest_outer_h is Some");
350
351 // Cancel prior inner sub (if any) + bump the switch
352 // epoch (QA P2). `take()` only catches an *already
353 // installed* prior inner; a prior subscribe still queued
354 // as an `em.defer` hasn't populated `inner_sub` yet, so
355 // bumping `epoch` here and re-checking it inside the
356 // subscribe-defer is what actually invalidates a
357 // superseded-but-still-queued prior subscribe.
358 let my_epoch = {
359 let mut s = state_for_outer.borrow_mut();
360 let prev = s.inner_sub.take();
361 s.epoch += 1;
362 let e = s.epoch;
363 drop(s);
364 drop(prev); // SubGuard::Drop → deferred unsubscribe.
365 e
366 };
367
368 // invoke_project lock-released (binding call, not Core).
369 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
370 binding_for_outer.release_handle(outer_h);
371
372 let on_complete = make_switch_on_complete(
373 state_for_outer.clone(),
374 em_for_outer.clone(),
375 producer_id,
376 );
377 let on_error = make_switch_on_error(
378 state_for_outer.clone(),
379 em_for_outer.clone(),
380 producer_id,
381 );
382 // F2 /qa: TornDown synthesizes inner-Complete so the
383 // self-Complete trigger can fire (batched
384 // [Data,Complete]→dead-inner wedge fix).
385 let on_complete_for_dead = on_complete.clone();
386 let inner_sink = build_inner_sink(
387 em_for_outer.clone(),
388 producer_binding_for_outer.clone(),
389 producer_id,
390 on_complete,
391 on_error,
392 );
393 // D234: the inner subscribe needs `CoreFull` from a
394 // long-lived sink → `em.defer` (owner-side, in-wave,
395 // FIFO after the cancel above). The returned
396 // `SubscriptionId` is wrapped in a `SubGuard` (its Drop
397 // schedules the eventual unsubscribe). QA P2: the
398 // captured `my_epoch` invalidates this subscribe if a
399 // newer outer DATA superseded it while it was queued.
400 let state_sub = state_for_outer.clone();
401 let em_guard = em_for_outer.clone();
402 let _ = em_for_outer.defer(move |c| {
403 match c.try_subscribe(inner_node, inner_sink) {
404 Ok(sub) => {
405 let guard = SubGuard::new(inner_node, sub, em_guard);
406 let to_drop = {
407 let mut s = state_sub.borrow_mut();
408 if s.terminated || s.epoch != my_epoch {
409 Some(guard)
410 } else {
411 s.inner_sub.replace(guard)
412 }
413 };
414 drop(to_drop);
415 }
416 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
417 // R2.2.7.b: inner dead — synthesize
418 // inner-Complete (clears inner_sub, checks
419 // self-Complete trigger).
420 on_complete_for_dead();
421 }
422 }
423 });
424 }
425
426 if plan.self_complete {
427 em_for_outer.complete(producer_id);
428 } else if let Some(h) = plan.self_error {
429 em_for_outer.error(producer_id, h);
430 }
431 });
432
433 ctx.subscribe_to(source, outer_sink);
434 });
435
436 let fn_id = binding.register_producer_build(build);
437 core.register_producer(fn_id)
438 .expect("invariant: register_producer has no deps; no error variants reachable")
439}
440
441fn make_switch_on_complete(
442 state: Rc<RefCell<SwitchState>>,
443 em: ProducerEmitter,
444 producer_id: NodeId,
445) -> Rc<dyn Fn()> {
446 Rc::new(move || {
447 let prev_inner;
448 let mut should_complete = false;
449 {
450 let mut s = state.borrow_mut();
451 if s.terminated {
452 return;
453 }
454 prev_inner = s.inner_sub.take();
455 if s.source_done && !s.terminated {
456 s.terminated = true;
457 should_complete = true;
458 }
459 }
460 drop(prev_inner); // SubGuard::Drop → deferred unsubscribe.
461 if should_complete {
462 em.complete(producer_id);
463 }
464 })
465}
466
467fn make_switch_on_error(
468 state: Rc<RefCell<SwitchState>>,
469 em: ProducerEmitter,
470 producer_id: NodeId,
471) -> Rc<dyn Fn(HandleId)> {
472 Rc::new(move |h| {
473 let prev_inner;
474 {
475 let mut s = state.borrow_mut();
476 if s.terminated {
477 return;
478 }
479 s.terminated = true;
480 prev_inner = s.inner_sub.take();
481 }
482 drop(prev_inner);
483 em.error(producer_id, h);
484 })
485}
486
487// =====================================================================
488// exhaust_map — ignore outer DATA while inner is active
489// =====================================================================
490
491struct ExhaustState {
492 /// Active inner sub. S2b: `Option<SubGuard>` — drop posts the
493 /// deferred unsubscribe (D234).
494 inner_sub: Option<SubGuard>,
495 /// QA P2 (2026-05-18): exhaust is *older-wins* — while a projected
496 /// inner is pending OR active, new outer DATA is dropped. Under
497 /// D234 the subscribe is a queued `em.defer`, so `inner_sub` is
498 /// `None` between accept and install; without this flag a 2nd
499 /// outer DATA in that window would also pass `inner_sub.is_none()`
500 /// and project a 2nd inner (exhaust contract violation). Set true
501 /// when an outer DATA is accepted + its subscribe queued; cleared
502 /// when the inner completes/errors or its subscribe finds the
503 /// source dead.
504 pending: bool,
505 source_done: bool,
506 terminated: bool,
507}
508
509impl ExhaustState {
510 fn new() -> Self {
511 Self {
512 inner_sub: None,
513 pending: false,
514 source_done: false,
515 terminated: false,
516 }
517 }
518}
519
520/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
521/// outer DATA while an inner subscription is active. First outer DATA
522/// per "active window" wins; subsequent DATAs are discarded until the
523/// inner completes.
524#[must_use]
525pub fn exhaust_map(
526 core: &Core,
527 binding: &Arc<dyn HigherOrderBinding>,
528 source: NodeId,
529 project: ProjectFn,
530) -> NodeId {
531 let project_fn_id = binding.register_project(project);
532 // S2b/D223: binding weaks stay (registry-cycle break); Core weak
533 // gone — sinks reach Core via `em`/`em.defer`.
534 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
535 let producer_binding_weak: Weak<dyn ProducerBinding> =
536 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
537
538 let build = Box::new(move |ctx: ProducerCtx<'_>| {
539 let producer_id = ctx.node_id();
540 let (Some(binding_clone), Some(producer_binding)) =
541 (binding_weak.upgrade(), producer_binding_weak.upgrade())
542 else {
543 return;
544 };
545 let em = ctx.emitter();
546 let state: Rc<RefCell<ExhaustState>> = Rc::new(RefCell::new(ExhaustState::new()));
547
548 let state_for_outer = state.clone();
549 let em_for_outer = em.clone();
550 let binding_for_outer = binding_clone.clone();
551 let producer_binding_for_outer = producer_binding.clone();
552
553 let outer_sink: Sink = Rc::new(move |msgs| {
554 #[derive(Default)]
555 struct Plan {
556 first_outer_h: Option<HandleId>,
557 first_retained: bool,
558 self_complete: bool,
559 self_error: Option<HandleId>,
560 }
561 let mut plan = Plan::default();
562 {
563 let mut s = state_for_outer.borrow_mut();
564 if s.terminated {
565 return;
566 }
567 // Tier-based dispatch (canonical §4.2; see
568 // `feedback_use_tier_for_signal_routing.md`).
569 for m in msgs {
570 match m.tier() {
571 3 => {
572 if let Some(h) = m.payload_handle() {
573 // First DATA per active window wins.
574 // Remember the first one we accept; subsequent
575 // batch entries (or DATAs after) drop.
576 // QA P2: also gate on `!s.pending` — a
577 // prior accepted DATA's subscribe may be
578 // queued (inner_sub still None); exhaust
579 // must drop this one (older-wins).
580 if s.inner_sub.is_none()
581 && !s.pending
582 && plan.first_outer_h.is_none()
583 {
584 binding_for_outer.retain_handle(h);
585 plan.first_outer_h = Some(h);
586 plan.first_retained = true;
587 s.pending = true;
588 }
589 }
590 // else: Resolved on outer source — no action.
591 }
592 5 => {
593 if let Some(h) = m.payload_handle() {
594 // Error
595 if !s.terminated {
596 s.terminated = true;
597 // Release any retain we took for
598 // first_outer_h — we won't be projecting it.
599 if plan.first_retained {
600 if let Some(h0) = plan.first_outer_h.take() {
601 binding_for_outer.release_handle(h0);
602 plan.first_retained = false;
603 }
604 }
605 binding_for_outer.retain_handle(h);
606 plan.self_error = Some(h);
607 }
608 } else {
609 // Complete
610 s.source_done = true;
611 if s.inner_sub.is_none()
612 && plan.first_outer_h.is_none()
613 && !s.terminated
614 {
615 s.terminated = true;
616 plan.self_complete = true;
617 }
618 }
619 }
620 _ => {} // Tiers 0/1/2/4/6 — no action.
621 }
622 }
623 }
624
625 if plan.first_retained {
626 let outer_h = plan
627 .first_outer_h
628 .expect("first_retained implies first_outer_h is Some");
629 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
630 binding_for_outer.release_handle(outer_h);
631
632 let on_complete = make_exhaust_on_complete(
633 state_for_outer.clone(),
634 em_for_outer.clone(),
635 producer_id,
636 );
637 let on_error = make_exhaust_on_error(
638 state_for_outer.clone(),
639 em_for_outer.clone(),
640 producer_id,
641 );
642 // F2 /qa: clone for TornDown synthesis (mirrors switch_map).
643 let on_complete_for_dead = on_complete.clone();
644 let inner_sink = build_inner_sink(
645 em_for_outer.clone(),
646 producer_binding_for_outer.clone(),
647 producer_id,
648 on_complete,
649 on_error,
650 );
651 // D234: inner subscribe via `em.defer` (long-lived sink
652 // can't hold `&Core`). TornDown → synthesize
653 // inner-Complete so `inner_sub` clears and the next
654 // outer DATA can re-project (F2 /qa).
655 let state_sub = state_for_outer.clone();
656 let em_guard = em_for_outer.clone();
657 let _ =
658 em_for_outer.defer(move |c| match c.try_subscribe(inner_node, inner_sink) {
659 Ok(sub) => {
660 let guard = SubGuard::new(inner_node, sub, em_guard);
661 let to_drop = {
662 let mut s = state_sub.borrow_mut();
663 if s.terminated {
664 Some(guard)
665 } else {
666 s.inner_sub.replace(guard)
667 }
668 };
669 drop(to_drop);
670 }
671 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
672 on_complete_for_dead();
673 }
674 });
675 }
676
677 if plan.self_complete {
678 em_for_outer.complete(producer_id);
679 } else if let Some(h) = plan.self_error {
680 em_for_outer.error(producer_id, h);
681 }
682 });
683
684 ctx.subscribe_to(source, outer_sink);
685 });
686
687 let fn_id = binding.register_producer_build(build);
688 core.register_producer(fn_id)
689 .expect("invariant: register_producer has no deps; no error variants reachable")
690}
691
692fn make_exhaust_on_complete(
693 state: Rc<RefCell<ExhaustState>>,
694 em: ProducerEmitter,
695 producer_id: NodeId,
696) -> Rc<dyn Fn()> {
697 Rc::new(move || {
698 let prev_inner;
699 let mut should_complete = false;
700 {
701 let mut s = state.borrow_mut();
702 if s.terminated {
703 return;
704 }
705 prev_inner = s.inner_sub.take();
706 // QA P2: inner finished (or was dead) — exhaust window
707 // re-opens; the next outer DATA may project again.
708 s.pending = false;
709 if s.source_done && !s.terminated {
710 s.terminated = true;
711 should_complete = true;
712 }
713 }
714 drop(prev_inner);
715 if should_complete {
716 em.complete(producer_id);
717 }
718 })
719}
720
721fn make_exhaust_on_error(
722 state: Rc<RefCell<ExhaustState>>,
723 em: ProducerEmitter,
724 producer_id: NodeId,
725) -> Rc<dyn Fn(HandleId)> {
726 Rc::new(move |h| {
727 let prev_inner;
728 {
729 let mut s = state.borrow_mut();
730 if s.terminated {
731 return;
732 }
733 s.terminated = true;
734 prev_inner = s.inner_sub.take();
735 }
736 drop(prev_inner);
737 em.error(producer_id, h);
738 })
739}
740
741// =====================================================================
742// merge_map — parallel inners up to `concurrency` cap
743// concat_map — wrapper for concurrency = Some(1)
744// =====================================================================
745
746thread_local! {
747 /// Per-thread guard preventing recursive drain of `MergeMapState`.
748 /// When an `on_complete` fires synchronously inside a
749 /// `Core::subscribe` handshake (pre-completed inner), it must not
750 /// re-enter the drain loop — instead it just decrements + removes
751 /// its sub and returns. The outermost drain owns the loop and
752 /// observes the freed-up cap on its next iteration.
753 static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
754}
755
756struct MergeMapState {
757 /// Number of currently-active inner subscriptions (spawned but
758 /// not yet completed/errored).
759 active: u32,
760 /// Outer DATAs waiting because `active >= concurrency`. Each
761 /// handle has one retain share (taken on enqueue, released on
762 /// dequeue + project).
763 buffer: VecDeque<HandleId>,
764 /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
765 /// inner's `on_complete` removes its entry by id (lock-released
766 /// drop).
767 inner_subs: AHashMap<u64, SubGuard>,
768 /// Pending inner ids (between `subscribe` call and
769 /// `inner_subs.insert`). Used to detect synchronous-completion:
770 /// if `on_complete` runs during `subscribe`, it removes from
771 /// `pending_inner_ids`; the post-subscribe code checks the set
772 /// and skips inserting the now-dead sub.
773 pending_inner_ids: ahash::AHashSet<u64>,
774 next_inner_id: u64,
775 source_done: bool,
776 terminated: bool,
777}
778
779impl MergeMapState {
780 fn new() -> Self {
781 Self {
782 active: 0,
783 buffer: VecDeque::new(),
784 inner_subs: AHashMap::new(),
785 pending_inner_ids: ahash::AHashSet::new(),
786 next_inner_id: 0,
787 source_done: false,
788 terminated: false,
789 }
790 }
791}
792
793/// `merge_map(source, project)` — unbounded concurrency variant.
794/// Equivalent to [`merge_map_with_concurrency`] with `None`.
795#[must_use]
796pub fn merge_map(
797 core: &Core,
798 binding: &Arc<dyn HigherOrderBinding>,
799 source: NodeId,
800 project: ProjectFn,
801) -> NodeId {
802 merge_map_with_concurrency(core, binding, source, project, None)
803}
804
805/// `concat_map(source, project)` — sequential queue variant.
806/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
807/// outer DATA is enqueued and processed one-at-a-time.
808#[must_use]
809pub fn concat_map(
810 core: &Core,
811 binding: &Arc<dyn HigherOrderBinding>,
812 source: NodeId,
813 project: ProjectFn,
814) -> NodeId {
815 merge_map_with_concurrency(core, binding, source, project, Some(1))
816}
817
818/// `merge_map_with_concurrency(source, project, concurrency)` — projects
819/// each outer DATA to an inner Node and subscribes in parallel.
820///
821/// `concurrency`:
822/// - `None` → unbounded (every outer DATA spawns immediately).
823/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
824/// buffer until an active inner completes.
825///
826/// Per D043 / D040, this matches the
827/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
828/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
829/// (would buffer everything indefinitely without ever spawning) but
830/// accepted at the type level.
831#[must_use]
832pub fn merge_map_with_concurrency(
833 core: &Core,
834 binding: &Arc<dyn HigherOrderBinding>,
835 source: NodeId,
836 project: ProjectFn,
837 concurrency: Option<u32>,
838) -> NodeId {
839 let project_fn_id = binding.register_project(project);
840 // S2b/D223: binding weaks stay (registry-cycle break); Core weak
841 // gone — sinks reach Core via `em`/`em.defer`.
842 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
843 let producer_binding_weak: Weak<dyn ProducerBinding> =
844 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
845
846 let build = Box::new(move |ctx: ProducerCtx<'_>| {
847 let producer_id = ctx.node_id();
848 let (Some(binding_clone), Some(producer_binding)) =
849 (binding_weak.upgrade(), producer_binding_weak.upgrade())
850 else {
851 return;
852 };
853 let em = ctx.emitter();
854 let state: Rc<RefCell<MergeMapState>> = Rc::new(RefCell::new(MergeMapState::new()));
855
856 let state_for_outer = state.clone();
857 let em_for_outer = em.clone();
858 let binding_for_outer = binding_clone.clone();
859 let producer_binding_for_outer = producer_binding.clone();
860
861 let outer_sink: Sink = Rc::new(move |msgs| {
862 // Phase 1: enqueue DATAs into the buffer (always — drain
863 // loop dequeues + spawns up to cap), classify terminal
864 // signals.
865 let mut error_action: Option<HandleId> = None;
866 let mut self_complete_now = false;
867 {
868 let mut s = state_for_outer.borrow_mut();
869 if s.terminated {
870 return;
871 }
872 // Tier-based dispatch (canonical §4.2; see
873 // `feedback_use_tier_for_signal_routing.md`).
874 for m in msgs {
875 match m.tier() {
876 3 => {
877 if let Some(h) = m.payload_handle() {
878 // Retain on enqueue — released by drain
879 // after invoke_project.
880 binding_for_outer.retain_handle(h);
881 s.buffer.push_back(h);
882 }
883 // else: Resolved on outer source — no action.
884 }
885 5 => {
886 if let Some(h) = m.payload_handle() {
887 // Error
888 if !s.terminated {
889 s.terminated = true;
890 binding_for_outer.retain_handle(h);
891 while let Some(q) = s.buffer.pop_front() {
892 binding_for_outer.release_handle(q);
893 }
894 error_action = Some(h);
895 }
896 } else {
897 // Complete
898 s.source_done = true;
899 if s.active == 0 && s.buffer.is_empty() && !s.terminated {
900 s.terminated = true;
901 self_complete_now = true;
902 }
903 }
904 }
905 _ => {} // Tiers 0/1/2/4/6 — no action.
906 }
907 }
908 }
909
910 if let Some(h) = error_action {
911 em_for_outer.error(producer_id, h);
912 return;
913 }
914 if self_complete_now {
915 em_for_outer.complete(producer_id);
916 return;
917 }
918
919 // Phase 2: drain buffer iteratively up to concurrency cap.
920 drain_merge_buffer(
921 &state_for_outer,
922 &em_for_outer,
923 &binding_for_outer,
924 &producer_binding_for_outer,
925 producer_id,
926 project_fn_id,
927 concurrency,
928 );
929 });
930
931 ctx.subscribe_to(source, outer_sink);
932 });
933
934 let fn_id = binding.register_producer_build(build);
935 core.register_producer(fn_id)
936 .expect("invariant: register_producer has no deps; no error variants reachable")
937}
938
939/// Iteratively pop from `buffer` and spawn inners until cap is reached
940/// or buffer is empty. Re-entrance from a nested `on_complete` is
941/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
942/// the drain loop and picks up cap-frees on subsequent iterations.
943// S2b/D234: `core: &Core` → `em: &ProducerEmitter`. The per-inner
944// subscribe routes through `em.defer` (`CoreFull`); the returned
945// `SubscriptionId` is wrapped in a `SubGuard` keyed in `inner_subs`
946// (its Drop schedules the unsubscribe).
947fn drain_merge_buffer(
948 state: &Rc<RefCell<MergeMapState>>,
949 em: &ProducerEmitter,
950 binding: &Arc<dyn HigherOrderBinding>,
951 producer_binding: &Arc<dyn ProducerBinding>,
952 producer_id: NodeId,
953 project_fn_id: FnId,
954 concurrency: Option<u32>,
955) {
956 if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
957 // Already draining on this thread; outer loop will drain
958 // remaining buffer.
959 return;
960 }
961
962 loop {
963 let h_and_id;
964 let mut should_self_complete = false;
965 {
966 let mut s = state.borrow_mut();
967 if s.terminated {
968 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
969 return;
970 }
971 let allowed = match concurrency {
972 None => true,
973 Some(n) => s.active < n,
974 };
975 if !allowed {
976 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
977 return;
978 }
979 if let Some(h) = s.buffer.pop_front() {
980 s.active += 1;
981 let id = s.next_inner_id;
982 s.next_inner_id += 1;
983 s.pending_inner_ids.insert(id);
984 h_and_id = Some((h, id));
985 } else if s.source_done && s.active == 0 && !s.terminated {
986 s.terminated = true;
987 should_self_complete = true;
988 h_and_id = None;
989 } else {
990 h_and_id = None;
991 }
992 }
993
994 if should_self_complete {
995 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
996 em.complete(producer_id);
997 return;
998 }
999
1000 let Some((outer_h, inner_id)) = h_and_id else {
1001 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1002 return;
1003 };
1004
1005 // Spawn lock-released.
1006 let inner_node = binding.invoke_project(project_fn_id, outer_h);
1007 binding.release_handle(outer_h);
1008
1009 let on_complete = make_merge_on_complete(
1010 state.clone(),
1011 em.clone(),
1012 binding.clone(),
1013 producer_binding.clone(),
1014 producer_id,
1015 project_fn_id,
1016 inner_id,
1017 concurrency,
1018 );
1019 let on_error = make_merge_on_error(state.clone(), em.clone(), binding.clone(), producer_id);
1020 // F2 /qa: clone on_complete so the TornDown branch can
1021 // synthesize inner-Complete (closes the merge_map `s.active`
1022 // leak that left the producer never self-completing when a
1023 // projected inner was dead).
1024 let on_complete_for_dead = on_complete.clone();
1025 let inner_sink = build_inner_sink(
1026 em.clone(),
1027 producer_binding.clone(),
1028 producer_id,
1029 on_complete,
1030 on_error,
1031 );
1032 // D234: inner subscribe via `em.defer` (long-lived drain can't
1033 // hold `&Core`). The sync-completion detection still works: if
1034 // the inner pre-completes during `try_subscribe`'s handshake,
1035 // `on_complete` fires INSIDE this defer (owner-side) and removes
1036 // `inner_id` from `pending_inner_ids` before the post-subscribe
1037 // check below — so a now-dead sub is dropped, not installed.
1038 let state_sub = state.clone();
1039 let em_guard = em.clone();
1040 let _ = em.defer(move |c| {
1041 match c.try_subscribe(inner_node, inner_sink) {
1042 Ok(sub) => {
1043 let guard = SubGuard::new(inner_node, sub, em_guard);
1044 let to_drop = {
1045 let mut s = state_sub.borrow_mut();
1046 if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1047 Some(guard)
1048 } else {
1049 s.inner_subs.insert(inner_id, guard);
1050 None
1051 }
1052 };
1053 drop(to_drop);
1054 }
1055 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
1056 // R2.2.7.b / F2 /qa: synthesize inner-Complete so
1057 // `make_merge_on_complete` decrements `s.active`,
1058 // clears pending, and checks the self-Complete
1059 // trigger (Dead-inner `s.active` leak fix).
1060 on_complete_for_dead();
1061 }
1062 }
1063 });
1064
1065 // Loop continues — pops next from buffer or returns.
1066 }
1067}
1068
1069fn make_merge_on_complete(
1070 state: Rc<RefCell<MergeMapState>>,
1071 em: ProducerEmitter,
1072 binding: Arc<dyn HigherOrderBinding>,
1073 producer_binding: Arc<dyn ProducerBinding>,
1074 producer_id: NodeId,
1075 project_fn_id: FnId,
1076 this_inner_id: u64,
1077 concurrency: Option<u32>,
1078) -> Rc<dyn Fn()> {
1079 Rc::new(move || {
1080 let removed_sub;
1081 {
1082 let mut s = state.borrow_mut();
1083 if s.terminated {
1084 return;
1085 }
1086 s.active -= 1;
1087 // Two cases:
1088 // (a) Sync-completion during subscribe: `pending_inner_ids`
1089 // still contains us; `inner_subs` does not. Remove from
1090 // pending so the post-subscribe insert sees we're done
1091 // and skips installing the dead sub.
1092 // (b) Async completion: `inner_subs` contains us. Remove
1093 // and drop lock-released.
1094 s.pending_inner_ids.remove(&this_inner_id);
1095 removed_sub = s.inner_subs.remove(&this_inner_id);
1096 }
1097 drop(removed_sub); // SubGuard::Drop → deferred unsubscribe.
1098
1099 // Try to drain — if we're nested inside an outer drain loop
1100 // (sync-completion path), this is a no-op and the outer drain
1101 // continues.
1102 drain_merge_buffer(
1103 &state,
1104 &em,
1105 &binding,
1106 &producer_binding,
1107 producer_id,
1108 project_fn_id,
1109 concurrency,
1110 );
1111 })
1112}
1113
1114/// Inner-error path for merge_map: terminates the producer + drains
1115/// all inner subs (lock-released) + releases buffered DATA handles.
1116/// Captures `binding` so we can release buffered handles' retains
1117/// (taken on enqueue in the outer_sink's Data branch); without this,
1118/// inner-error before all buffered DATAs project would leak refcount
1119/// shares.
1120fn make_merge_on_error(
1121 state: Rc<RefCell<MergeMapState>>,
1122 em: ProducerEmitter,
1123 binding: Arc<dyn HigherOrderBinding>,
1124 producer_id: NodeId,
1125) -> Rc<dyn Fn(HandleId)> {
1126 Rc::new(move |h| {
1127 let removed_subs;
1128 let buffered_to_release;
1129 {
1130 let mut s = state.borrow_mut();
1131 if s.terminated {
1132 return;
1133 }
1134 s.terminated = true;
1135 removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1136 s.pending_inner_ids.clear();
1137 buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1138 }
1139 drop(removed_subs); // each SubGuard::Drop → deferred unsubscribe.
1140 for h_b in buffered_to_release {
1141 binding.release_handle(h_b);
1142 }
1143 em.error(producer_id, h);
1144 })
1145}
1146
1147// =====================================================================
1148// Compile-time asserts (Slice E /qa; D248/D249-amended)
1149// =====================================================================
1150//
1151// D248/D249/S2c: `SwitchState`/`ExhaustState`/`MergeMapState` embed a
1152// `ProducerEmitter`, which now carries the owner-only `Rc<DeferQueue>`
1153// (the `!Send` `Defer` split off `CoreMailbox`). Under full
1154// single-owner these op-states are owner-thread-only and intentionally
1155// `!Send` — the prior `Send + Sync` assertions were shared-Core-era
1156// legacy and are deleted. `ProjectFn` is a pure projector (captures no
1157// `!Send`), so it stays `Send + Sync`.
1158
1159const _: fn() = || {
1160 fn assert_send_sync<T: Send + Sync>() {}
1161 assert_send_sync::<ProjectFn>();
1162};