graphrefly_operators/higher_order.rs
1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! Higher-order operators (Slice E, D044) — operators whose project fn
11//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
12//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
13//! `concatMap` / `mergeMap`).
14//!
15//! All four are producer-pattern nodes (no declared deps; subscribe to
16//! the outer source from inside the build closure; emit on themselves
17//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
18//! (zip / concat / race / takeUntil); the producer substrate handles
19//! auto-cleanup of upstream + inner subscriptions on producer
20//! deactivation (D031–D038).
21//!
22//! The four flavors differ in how they handle a new outer DATA while a
23//! prior inner is still active:
24//!
25//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
26//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
27//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
28//! [`merge_map_with_concurrency`] with `Some(1)`.)
29//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
30//! up to `concurrency`. `None` = unbounded.
31//!
32//! # Inner-sub tracking (Slice E /qa refactor)
33//!
34//! Each operator owns its inner [`Subscription`]s **inside its state
35//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
36//! `producer_storage[producer_id].subs` holds only the OUTER source
37//! subscription (one entry, no positional concerns). switch_map /
38//! exhaust_map keep `Option<Subscription>` (single active inner); merge
39//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
40//! `next_inner_id`. This avoids two bugs the original positional design
41//! exposed: (a) cached-outer source firing handshake before
42//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
43//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
44//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
45//! Complete; merge/concat remove specific id on inner Complete.
46//!
47//! # Drain discipline (iterative spawn)
48//!
49//! `merge_map` could spawn the next buffered DATA from inside an
50//! inner's `on_complete` callback. For pathological pre-completed
51//! inners (synchronous Complete during the subscribe handshake),
52//! recursive spawn would grow the stack proportionally to the buffer
53//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
54//! recursion: the outermost drain owns the loop; nested `on_complete`
55//! invocations only decrement state and return.
56//!
57//! # Project closure (D044)
58//!
59//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
60//! registered through [`HigherOrderBinding::register_project`]. Bindings
61//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
62//! WASM callbacks into this Rust shape; Rust-side users register a
63//! closure directly.
64
65#![allow(clippy::collapsible_if, clippy::collapsible_match)]
66#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
67
68use std::cell::Cell;
69use std::collections::VecDeque;
70use std::sync::{Arc, Mutex, Weak};
71
72use ahash::AHashMap;
73use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink};
74use smallvec::SmallVec;
75
76use super::producer::{ProducerBinding, ProducerCtx, ProducerEmitter, SubGuard};
77
78// =====================================================================
79// HigherOrderBinding — closure registration for project: T -> Node<R>
80// =====================================================================
81
82/// Project closure: takes an outer DATA handle, returns the
83/// [`NodeId`] of an inner node to subscribe to. Closure may register
84/// new state/derived nodes on the fly via captured [`Core`].
85pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
86
87/// Closure-registration interface for higher-order operators.
88///
89/// Extends [`ProducerBinding`] with one method that bindings shipping
90/// higher-order operators must implement.
91pub trait HigherOrderBinding: ProducerBinding {
92 /// Register a project closure. The returned [`FnId`] is captured by
93 /// the operator's build closure and looked up via
94 /// [`Self::invoke_project`] on each outer DATA fire.
95 fn register_project(&self, project: ProjectFn) -> FnId;
96
97 /// Invoke a registered project closure with the given outer DATA
98 /// handle. Returns the inner node's [`NodeId`].
99 ///
100 /// # Panics
101 ///
102 /// Implementations panic if `fn_id` is not a registered project
103 /// closure.
104 fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
105}
106
107// =====================================================================
108// build_inner_sink — shared inner-subscription handler
109// =====================================================================
110//
111// Each higher-order op subscribes to the inner node returned by its
112// project closure. The inner sink dispatches by `Message::tier()`
113// (R1.3.7.b — central message-tier utility, never hardcode variant
114// checks for forwarding gating per CLAUDE.md design invariant 4):
115//
116// - Tier 0 (Start) — drop.
117// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
118// acknowledged divergence from TS legacy; Rust producer relies on
119// Core's wave engine to generate DIRTY on the producer's own emits.)
120// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
121// propagated through higher-order ops by design.
122// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
123// `Resolved` drops (same wave-engine rationale as Dirty).
124// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
125// per R1.2.7. Inner cache invalidation surfaces to producer
126// subscribers.
127// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
128// callbacks (`on_inner_complete` / `on_inner_error`). Differs
129// per-op (switch_map clears active inner; merge_map decrements
130// active count; etc.).
131// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
132// per R2.6.4. Inner permanent destruction propagates.
133
134// S2b/D230/D234: long-lived inner sink takes `em: ProducerEmitter`
135// (was `core: Core`, no longer `Clone`). Emit is the mailbox fast path;
136// the rare INVALIDATE/TEARDOWN terminal forwards route through
137// `em.defer` (`CoreFull`).
138fn build_inner_sink(
139 em: ProducerEmitter,
140 producer_binding: Arc<dyn ProducerBinding>,
141 producer_id: NodeId,
142 on_inner_complete: Arc<dyn Fn()>,
143 on_inner_error: Arc<dyn Fn(HandleId)>,
144) -> Sink {
145 Arc::new(move |msgs: &[Message]| {
146 enum Action {
147 Emit(HandleId),
148 Complete,
149 Error(HandleId),
150 Invalidate,
151 Teardown,
152 }
153 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
154 for m in msgs {
155 match m.tier() {
156 3 => {
157 // Tier 3: Data/Resolved. Only Data carries a payload
158 // to forward; Resolved is dropped per the
159 // wave-engine divergence above.
160 if let Some(h) = m.payload_handle() {
161 producer_binding.retain_handle(h);
162 actions.push(Action::Emit(h));
163 }
164 }
165 4 => {
166 // Tier 4: Invalidate. Forward to producer.
167 actions.push(Action::Invalidate);
168 }
169 5 => {
170 // Tier 5: Complete or Error. Semantic dispatch via
171 // op-specific callbacks. `payload_handle()` is the
172 // canonical Error-vs-Complete discriminator at this
173 // tier.
174 if let Some(h) = m.payload_handle() {
175 producer_binding.retain_handle(h);
176 actions.push(Action::Error(h));
177 } else {
178 actions.push(Action::Complete);
179 }
180 }
181 6 => {
182 // Tier 6: Teardown. Forward to producer.
183 actions.push(Action::Teardown);
184 }
185 // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
186 _ => {}
187 }
188 }
189 for action in actions {
190 match action {
191 Action::Emit(h) => em.emit_or_defer(producer_id, h),
192 Action::Complete => on_inner_complete(),
193 Action::Error(h) => on_inner_error(h),
194 Action::Invalidate => {
195 let _ = em.defer(move |c| c.invalidate(producer_id));
196 }
197 Action::Teardown => {
198 let _ = em.defer(move |c| c.teardown(producer_id));
199 }
200 }
201 }
202 })
203}
204
205// =====================================================================
206// switch_map — cancel previous inner on each new outer DATA
207// =====================================================================
208
209struct SwitchState {
210 /// Currently-active inner subscription (if any). Cancelling the
211 /// prior inner is `Option::take` + drop — S2b: `SubGuard::Drop`
212 /// posts the `unsubscribe` via `em.defer` (owner-side, in-wave,
213 /// FIFO-ordered so cancel drains before any resubscribe — D234).
214 inner_sub: Option<SubGuard>,
215 /// QA P2 (2026-05-18): monotonic switch epoch. Under D234 the
216 /// cancel-prev is `inner_sub.take()` at outer-fire, but the prior
217 /// subscribe is a still-queued `em.defer` that hasn't populated
218 /// `inner_sub` yet — so a fast double-switch within one wave would
219 /// `take()` an empty slot and leave BOTH inners live. Each accepted
220 /// outer DATA bumps `epoch`; the subscribe-defer captures the value
221 /// and, after `try_subscribe`, drops the new `SubGuard` immediately
222 /// (deferred unsubscribe) instead of installing it if `epoch` has
223 /// moved on — i.e. a newer outer DATA superseded it.
224 epoch: u64,
225 source_done: bool,
226 terminated: bool,
227}
228
229impl SwitchState {
230 fn new() -> Self {
231 Self {
232 inner_sub: None,
233 epoch: 0,
234 source_done: false,
235 terminated: false,
236 }
237 }
238}
239
240/// `switch_map(source, project)` — for each outer DATA, cancel the
241/// previous inner subscription and subscribe to the inner node returned
242/// by `project(value)`. Inner DATA flows through to downstream; inner
243/// COMPLETE clears the active slot; outer COMPLETE (with no active
244/// inner) self-completes the operator.
245#[must_use]
246pub fn switch_map(
247 core: &Core,
248 binding: &Arc<dyn HigherOrderBinding>,
249 source: NodeId,
250 project: ProjectFn,
251) -> NodeId {
252 let project_fn_id = binding.register_project(project);
253 // S2b/D223: the Core weak is GONE (Core relocates; long-lived sinks
254 // reach it via `ProducerEmitter`/`em.defer`). The *binding* weaks
255 // stay — they break the registry→build-closure→strong-binding cycle
256 // (unrelated to Core ownership; D223 only forbids `Weak<C>`).
257 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
258 let producer_binding_weak: Weak<dyn ProducerBinding> =
259 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
260
261 let build = Box::new(move |ctx: ProducerCtx<'_>| {
262 let producer_id = ctx.node_id();
263 let (Some(binding_clone), Some(producer_binding)) =
264 (binding_weak.upgrade(), producer_binding_weak.upgrade())
265 else {
266 return;
267 };
268 let em = ctx.emitter();
269 let state: Arc<Mutex<SwitchState>> = Arc::new(Mutex::new(SwitchState::new()));
270
271 let state_for_outer = state.clone();
272 let em_for_outer = em.clone();
273 let binding_for_outer = binding_clone.clone();
274 let producer_binding_for_outer = producer_binding.clone();
275
276 let outer_sink: Sink = Arc::new(move |msgs| {
277 // Phase 1: classify under state lock. Track whether we
278 // performed a retain so phase 2 can safely release it
279 // without underflow on `[Data(_), Error(_)]` same-batch
280 // (P1 /qa fix).
281 #[derive(Default)]
282 struct Plan {
283 latest_outer_h: Option<HandleId>,
284 latest_retained: bool,
285 self_complete: bool,
286 self_error: Option<HandleId>,
287 }
288 let mut plan = Plan::default();
289 {
290 let mut s = state_for_outer.lock().unwrap();
291 if s.terminated {
292 return;
293 }
294 // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
295 // and canonical §4.2 ("Always use the provided tier utilities
296 // rather than hardcoding type checks"). Tier 3 carries
297 // DATA/RESOLVED — only DATA has a payload_handle, so the
298 // `payload_handle().is_some()` test discriminates within
299 // the tier without referring to specific variants. Tier 5
300 // carries COMPLETE/ERROR — same shape (Error has handle,
301 // Complete doesn't).
302 for m in msgs {
303 match m.tier() {
304 3 => {
305 if let Some(h) = m.payload_handle() {
306 // switchMap: only the LATEST outer DATA in the
307 // batch matters (TS legacy "skip to last in
308 // the batch to avoid creating + immediately
309 // discarding N-1 inners"). Track the handle;
310 // we'll project + subscribe once after the
311 // lock drops. Skipped handles need no retain.
312 plan.latest_outer_h = Some(h);
313 }
314 // else: Resolved on outer source — no action.
315 }
316 5 => {
317 if let Some(h) = m.payload_handle() {
318 // Error
319 if !s.terminated {
320 s.terminated = true;
321 binding_for_outer.retain_handle(h);
322 plan.self_error = Some(h);
323 }
324 } else {
325 // Complete
326 s.source_done = true;
327 if s.inner_sub.is_none()
328 && plan.latest_outer_h.is_none()
329 && !s.terminated
330 {
331 s.terminated = true;
332 plan.self_complete = true;
333 }
334 }
335 }
336 _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
337 }
338 }
339 if let Some(h) = plan.latest_outer_h {
340 if !s.terminated {
341 // Retain ONE share for the chosen handle —
342 // released by phase 2 after invoke_project.
343 binding_for_outer.retain_handle(h);
344 plan.latest_retained = true;
345 }
346 }
347 }
348
349 // Phase 2: cancel prior inner sub, project, subscribe.
350 // Gated on `latest_retained` so that an `Error` arriving in
351 // the same batch (which sets terminated and skips the
352 // retain) does NOT trigger an unbalanced release.
353 if plan.latest_retained {
354 let outer_h = plan
355 .latest_outer_h
356 .expect("latest_retained implies latest_outer_h is Some");
357
358 // Cancel prior inner sub (if any) + bump the switch
359 // epoch (QA P2). `take()` only catches an *already
360 // installed* prior inner; a prior subscribe still queued
361 // as an `em.defer` hasn't populated `inner_sub` yet, so
362 // bumping `epoch` here and re-checking it inside the
363 // subscribe-defer is what actually invalidates a
364 // superseded-but-still-queued prior subscribe.
365 let my_epoch = {
366 let mut s = state_for_outer.lock().unwrap();
367 let prev = s.inner_sub.take();
368 s.epoch += 1;
369 let e = s.epoch;
370 drop(s);
371 drop(prev); // SubGuard::Drop → deferred unsubscribe.
372 e
373 };
374
375 // invoke_project lock-released (binding call, not Core).
376 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
377 binding_for_outer.release_handle(outer_h);
378
379 let on_complete = make_switch_on_complete(
380 state_for_outer.clone(),
381 em_for_outer.clone(),
382 producer_id,
383 );
384 let on_error = make_switch_on_error(
385 state_for_outer.clone(),
386 em_for_outer.clone(),
387 producer_id,
388 );
389 // F2 /qa: TornDown synthesizes inner-Complete so the
390 // self-Complete trigger can fire (batched
391 // [Data,Complete]→dead-inner wedge fix).
392 let on_complete_for_dead = on_complete.clone();
393 let inner_sink = build_inner_sink(
394 em_for_outer.clone(),
395 producer_binding_for_outer.clone(),
396 producer_id,
397 on_complete,
398 on_error,
399 );
400 // D234: the inner subscribe needs `CoreFull` from a
401 // long-lived sink → `em.defer` (owner-side, in-wave,
402 // FIFO after the cancel above). The returned
403 // `SubscriptionId` is wrapped in a `SubGuard` (its Drop
404 // schedules the eventual unsubscribe). QA P2: the
405 // captured `my_epoch` invalidates this subscribe if a
406 // newer outer DATA superseded it while it was queued.
407 let state_sub = state_for_outer.clone();
408 let em_guard = em_for_outer.clone();
409 let _ = em_for_outer.defer(move |c| {
410 match c.try_subscribe(inner_node, inner_sink) {
411 Ok(sub) => {
412 let guard = SubGuard::new(inner_node, sub, em_guard);
413 let to_drop = {
414 let mut s = state_sub.lock().unwrap();
415 if s.terminated || s.epoch != my_epoch {
416 Some(guard)
417 } else {
418 s.inner_sub.replace(guard)
419 }
420 };
421 drop(to_drop);
422 }
423 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
424 // R2.2.7.b: inner dead — synthesize
425 // inner-Complete (clears inner_sub, checks
426 // self-Complete trigger).
427 on_complete_for_dead();
428 }
429 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
430 // Already inside the in-wave drain (no
431 // partitions held the old way) — a violation
432 // here is the substrate-invariant break the
433 // old wave-end-drain guard caught.
434 panic!(
435 "switch_map inner subscribe: partition-order \
436 violation inside em.defer — substrate invariant broken"
437 );
438 }
439 }
440 });
441 }
442
443 if plan.self_complete {
444 em_for_outer.complete_or_defer(producer_id);
445 } else if let Some(h) = plan.self_error {
446 em_for_outer.error_or_defer(producer_id, h);
447 }
448 });
449
450 ctx.subscribe_to(source, outer_sink);
451 });
452
453 let fn_id = binding.register_producer_build(build);
454 core.register_producer(fn_id)
455 .expect("invariant: register_producer has no deps; no error variants reachable")
456}
457
458fn make_switch_on_complete(
459 state: Arc<Mutex<SwitchState>>,
460 em: ProducerEmitter,
461 producer_id: NodeId,
462) -> Arc<dyn Fn()> {
463 Arc::new(move || {
464 let prev_inner;
465 let mut should_complete = false;
466 {
467 let mut s = state.lock().unwrap();
468 if s.terminated {
469 return;
470 }
471 prev_inner = s.inner_sub.take();
472 if s.source_done && !s.terminated {
473 s.terminated = true;
474 should_complete = true;
475 }
476 }
477 drop(prev_inner); // SubGuard::Drop → deferred unsubscribe.
478 if should_complete {
479 em.complete_or_defer(producer_id);
480 }
481 })
482}
483
484fn make_switch_on_error(
485 state: Arc<Mutex<SwitchState>>,
486 em: ProducerEmitter,
487 producer_id: NodeId,
488) -> Arc<dyn Fn(HandleId)> {
489 Arc::new(move |h| {
490 let prev_inner;
491 {
492 let mut s = state.lock().unwrap();
493 if s.terminated {
494 return;
495 }
496 s.terminated = true;
497 prev_inner = s.inner_sub.take();
498 }
499 drop(prev_inner);
500 em.error_or_defer(producer_id, h);
501 })
502}
503
504// =====================================================================
505// exhaust_map — ignore outer DATA while inner is active
506// =====================================================================
507
508struct ExhaustState {
509 /// Active inner sub. S2b: `Option<SubGuard>` — drop posts the
510 /// deferred unsubscribe (D234).
511 inner_sub: Option<SubGuard>,
512 /// QA P2 (2026-05-18): exhaust is *older-wins* — while a projected
513 /// inner is pending OR active, new outer DATA is dropped. Under
514 /// D234 the subscribe is a queued `em.defer`, so `inner_sub` is
515 /// `None` between accept and install; without this flag a 2nd
516 /// outer DATA in that window would also pass `inner_sub.is_none()`
517 /// and project a 2nd inner (exhaust contract violation). Set true
518 /// when an outer DATA is accepted + its subscribe queued; cleared
519 /// when the inner completes/errors or its subscribe finds the
520 /// source dead.
521 pending: bool,
522 source_done: bool,
523 terminated: bool,
524}
525
526impl ExhaustState {
527 fn new() -> Self {
528 Self {
529 inner_sub: None,
530 pending: false,
531 source_done: false,
532 terminated: false,
533 }
534 }
535}
536
537/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
538/// outer DATA while an inner subscription is active. First outer DATA
539/// per "active window" wins; subsequent DATAs are discarded until the
540/// inner completes.
541#[must_use]
542pub fn exhaust_map(
543 core: &Core,
544 binding: &Arc<dyn HigherOrderBinding>,
545 source: NodeId,
546 project: ProjectFn,
547) -> NodeId {
548 let project_fn_id = binding.register_project(project);
549 // S2b/D223: binding weaks stay (registry-cycle break); Core weak
550 // gone — sinks reach Core via `em`/`em.defer`.
551 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
552 let producer_binding_weak: Weak<dyn ProducerBinding> =
553 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
554
555 let build = Box::new(move |ctx: ProducerCtx<'_>| {
556 let producer_id = ctx.node_id();
557 let (Some(binding_clone), Some(producer_binding)) =
558 (binding_weak.upgrade(), producer_binding_weak.upgrade())
559 else {
560 return;
561 };
562 let em = ctx.emitter();
563 let state: Arc<Mutex<ExhaustState>> = Arc::new(Mutex::new(ExhaustState::new()));
564
565 let state_for_outer = state.clone();
566 let em_for_outer = em.clone();
567 let binding_for_outer = binding_clone.clone();
568 let producer_binding_for_outer = producer_binding.clone();
569
570 let outer_sink: Sink = Arc::new(move |msgs| {
571 #[derive(Default)]
572 struct Plan {
573 first_outer_h: Option<HandleId>,
574 first_retained: bool,
575 self_complete: bool,
576 self_error: Option<HandleId>,
577 }
578 let mut plan = Plan::default();
579 {
580 let mut s = state_for_outer.lock().unwrap();
581 if s.terminated {
582 return;
583 }
584 // Tier-based dispatch (canonical §4.2; see
585 // `feedback_use_tier_for_signal_routing.md`).
586 for m in msgs {
587 match m.tier() {
588 3 => {
589 if let Some(h) = m.payload_handle() {
590 // First DATA per active window wins.
591 // Remember the first one we accept; subsequent
592 // batch entries (or DATAs after) drop.
593 // QA P2: also gate on `!s.pending` — a
594 // prior accepted DATA's subscribe may be
595 // queued (inner_sub still None); exhaust
596 // must drop this one (older-wins).
597 if s.inner_sub.is_none()
598 && !s.pending
599 && plan.first_outer_h.is_none()
600 {
601 binding_for_outer.retain_handle(h);
602 plan.first_outer_h = Some(h);
603 plan.first_retained = true;
604 s.pending = true;
605 }
606 }
607 // else: Resolved on outer source — no action.
608 }
609 5 => {
610 if let Some(h) = m.payload_handle() {
611 // Error
612 if !s.terminated {
613 s.terminated = true;
614 // Release any retain we took for
615 // first_outer_h — we won't be projecting it.
616 if plan.first_retained {
617 if let Some(h0) = plan.first_outer_h.take() {
618 binding_for_outer.release_handle(h0);
619 plan.first_retained = false;
620 }
621 }
622 binding_for_outer.retain_handle(h);
623 plan.self_error = Some(h);
624 }
625 } else {
626 // Complete
627 s.source_done = true;
628 if s.inner_sub.is_none()
629 && plan.first_outer_h.is_none()
630 && !s.terminated
631 {
632 s.terminated = true;
633 plan.self_complete = true;
634 }
635 }
636 }
637 _ => {} // Tiers 0/1/2/4/6 — no action.
638 }
639 }
640 }
641
642 if plan.first_retained {
643 let outer_h = plan
644 .first_outer_h
645 .expect("first_retained implies first_outer_h is Some");
646 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
647 binding_for_outer.release_handle(outer_h);
648
649 let on_complete = make_exhaust_on_complete(
650 state_for_outer.clone(),
651 em_for_outer.clone(),
652 producer_id,
653 );
654 let on_error = make_exhaust_on_error(
655 state_for_outer.clone(),
656 em_for_outer.clone(),
657 producer_id,
658 );
659 // F2 /qa: clone for TornDown synthesis (mirrors switch_map).
660 let on_complete_for_dead = on_complete.clone();
661 let inner_sink = build_inner_sink(
662 em_for_outer.clone(),
663 producer_binding_for_outer.clone(),
664 producer_id,
665 on_complete,
666 on_error,
667 );
668 // D234: inner subscribe via `em.defer` (long-lived sink
669 // can't hold `&Core`). TornDown → synthesize
670 // inner-Complete so `inner_sub` clears and the next
671 // outer DATA can re-project (F2 /qa).
672 let state_sub = state_for_outer.clone();
673 let em_guard = em_for_outer.clone();
674 let _ =
675 em_for_outer.defer(move |c| match c.try_subscribe(inner_node, inner_sink) {
676 Ok(sub) => {
677 let guard = SubGuard::new(inner_node, sub, em_guard);
678 let to_drop = {
679 let mut s = state_sub.lock().unwrap();
680 if s.terminated {
681 Some(guard)
682 } else {
683 s.inner_sub.replace(guard)
684 }
685 };
686 drop(to_drop);
687 }
688 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
689 on_complete_for_dead();
690 }
691 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
692 panic!(
693 "exhaust_map inner subscribe: partition-order \
694 violation inside em.defer — substrate invariant broken"
695 );
696 }
697 });
698 }
699
700 if plan.self_complete {
701 em_for_outer.complete_or_defer(producer_id);
702 } else if let Some(h) = plan.self_error {
703 em_for_outer.error_or_defer(producer_id, h);
704 }
705 });
706
707 ctx.subscribe_to(source, outer_sink);
708 });
709
710 let fn_id = binding.register_producer_build(build);
711 core.register_producer(fn_id)
712 .expect("invariant: register_producer has no deps; no error variants reachable")
713}
714
715fn make_exhaust_on_complete(
716 state: Arc<Mutex<ExhaustState>>,
717 em: ProducerEmitter,
718 producer_id: NodeId,
719) -> Arc<dyn Fn()> {
720 Arc::new(move || {
721 let prev_inner;
722 let mut should_complete = false;
723 {
724 let mut s = state.lock().unwrap();
725 if s.terminated {
726 return;
727 }
728 prev_inner = s.inner_sub.take();
729 // QA P2: inner finished (or was dead) — exhaust window
730 // re-opens; the next outer DATA may project again.
731 s.pending = false;
732 if s.source_done && !s.terminated {
733 s.terminated = true;
734 should_complete = true;
735 }
736 }
737 drop(prev_inner);
738 if should_complete {
739 em.complete_or_defer(producer_id);
740 }
741 })
742}
743
744fn make_exhaust_on_error(
745 state: Arc<Mutex<ExhaustState>>,
746 em: ProducerEmitter,
747 producer_id: NodeId,
748) -> Arc<dyn Fn(HandleId)> {
749 Arc::new(move |h| {
750 let prev_inner;
751 {
752 let mut s = state.lock().unwrap();
753 if s.terminated {
754 return;
755 }
756 s.terminated = true;
757 prev_inner = s.inner_sub.take();
758 }
759 drop(prev_inner);
760 em.error_or_defer(producer_id, h);
761 })
762}
763
764// =====================================================================
765// merge_map — parallel inners up to `concurrency` cap
766// concat_map — wrapper for concurrency = Some(1)
767// =====================================================================
768
769thread_local! {
770 /// Per-thread guard preventing recursive drain of `MergeMapState`.
771 /// When an `on_complete` fires synchronously inside a
772 /// `Core::subscribe` handshake (pre-completed inner), it must not
773 /// re-enter the drain loop — instead it just decrements + removes
774 /// its sub and returns. The outermost drain owns the loop and
775 /// observes the freed-up cap on its next iteration.
776 static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
777}
778
779struct MergeMapState {
780 /// Number of currently-active inner subscriptions (spawned but
781 /// not yet completed/errored).
782 active: u32,
783 /// Outer DATAs waiting because `active >= concurrency`. Each
784 /// handle has one retain share (taken on enqueue, released on
785 /// dequeue + project).
786 buffer: VecDeque<HandleId>,
787 /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
788 /// inner's `on_complete` removes its entry by id (lock-released
789 /// drop).
790 inner_subs: AHashMap<u64, SubGuard>,
791 /// Pending inner ids (between `subscribe` call and
792 /// `inner_subs.insert`). Used to detect synchronous-completion:
793 /// if `on_complete` runs during `subscribe`, it removes from
794 /// `pending_inner_ids`; the post-subscribe code checks the set
795 /// and skips inserting the now-dead sub.
796 pending_inner_ids: ahash::AHashSet<u64>,
797 next_inner_id: u64,
798 source_done: bool,
799 terminated: bool,
800}
801
802impl MergeMapState {
803 fn new() -> Self {
804 Self {
805 active: 0,
806 buffer: VecDeque::new(),
807 inner_subs: AHashMap::new(),
808 pending_inner_ids: ahash::AHashSet::new(),
809 next_inner_id: 0,
810 source_done: false,
811 terminated: false,
812 }
813 }
814}
815
816/// `merge_map(source, project)` — unbounded concurrency variant.
817/// Equivalent to [`merge_map_with_concurrency`] with `None`.
818#[must_use]
819pub fn merge_map(
820 core: &Core,
821 binding: &Arc<dyn HigherOrderBinding>,
822 source: NodeId,
823 project: ProjectFn,
824) -> NodeId {
825 merge_map_with_concurrency(core, binding, source, project, None)
826}
827
828/// `concat_map(source, project)` — sequential queue variant.
829/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
830/// outer DATA is enqueued and processed one-at-a-time.
831#[must_use]
832pub fn concat_map(
833 core: &Core,
834 binding: &Arc<dyn HigherOrderBinding>,
835 source: NodeId,
836 project: ProjectFn,
837) -> NodeId {
838 merge_map_with_concurrency(core, binding, source, project, Some(1))
839}
840
841/// `merge_map_with_concurrency(source, project, concurrency)` — projects
842/// each outer DATA to an inner Node and subscribes in parallel.
843///
844/// `concurrency`:
845/// - `None` → unbounded (every outer DATA spawns immediately).
846/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
847/// buffer until an active inner completes.
848///
849/// Per D043 / D040, this matches the
850/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
851/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
852/// (would buffer everything indefinitely without ever spawning) but
853/// accepted at the type level.
854#[must_use]
855pub fn merge_map_with_concurrency(
856 core: &Core,
857 binding: &Arc<dyn HigherOrderBinding>,
858 source: NodeId,
859 project: ProjectFn,
860 concurrency: Option<u32>,
861) -> NodeId {
862 let project_fn_id = binding.register_project(project);
863 // S2b/D223: binding weaks stay (registry-cycle break); Core weak
864 // gone — sinks reach Core via `em`/`em.defer`.
865 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
866 let producer_binding_weak: Weak<dyn ProducerBinding> =
867 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
868
869 let build = Box::new(move |ctx: ProducerCtx<'_>| {
870 let producer_id = ctx.node_id();
871 let (Some(binding_clone), Some(producer_binding)) =
872 (binding_weak.upgrade(), producer_binding_weak.upgrade())
873 else {
874 return;
875 };
876 let em = ctx.emitter();
877 let state: Arc<Mutex<MergeMapState>> = Arc::new(Mutex::new(MergeMapState::new()));
878
879 let state_for_outer = state.clone();
880 let em_for_outer = em.clone();
881 let binding_for_outer = binding_clone.clone();
882 let producer_binding_for_outer = producer_binding.clone();
883
884 let outer_sink: Sink = Arc::new(move |msgs| {
885 // Phase 1: enqueue DATAs into the buffer (always — drain
886 // loop dequeues + spawns up to cap), classify terminal
887 // signals.
888 let mut error_action: Option<HandleId> = None;
889 let mut self_complete_now = false;
890 {
891 let mut s = state_for_outer.lock().unwrap();
892 if s.terminated {
893 return;
894 }
895 // Tier-based dispatch (canonical §4.2; see
896 // `feedback_use_tier_for_signal_routing.md`).
897 for m in msgs {
898 match m.tier() {
899 3 => {
900 if let Some(h) = m.payload_handle() {
901 // Retain on enqueue — released by drain
902 // after invoke_project.
903 binding_for_outer.retain_handle(h);
904 s.buffer.push_back(h);
905 }
906 // else: Resolved on outer source — no action.
907 }
908 5 => {
909 if let Some(h) = m.payload_handle() {
910 // Error
911 if !s.terminated {
912 s.terminated = true;
913 binding_for_outer.retain_handle(h);
914 while let Some(q) = s.buffer.pop_front() {
915 binding_for_outer.release_handle(q);
916 }
917 error_action = Some(h);
918 }
919 } else {
920 // Complete
921 s.source_done = true;
922 if s.active == 0 && s.buffer.is_empty() && !s.terminated {
923 s.terminated = true;
924 self_complete_now = true;
925 }
926 }
927 }
928 _ => {} // Tiers 0/1/2/4/6 — no action.
929 }
930 }
931 }
932
933 if let Some(h) = error_action {
934 em_for_outer.error_or_defer(producer_id, h);
935 return;
936 }
937 if self_complete_now {
938 em_for_outer.complete_or_defer(producer_id);
939 return;
940 }
941
942 // Phase 2: drain buffer iteratively up to concurrency cap.
943 drain_merge_buffer(
944 &state_for_outer,
945 &em_for_outer,
946 &binding_for_outer,
947 &producer_binding_for_outer,
948 producer_id,
949 project_fn_id,
950 concurrency,
951 );
952 });
953
954 ctx.subscribe_to(source, outer_sink);
955 });
956
957 let fn_id = binding.register_producer_build(build);
958 core.register_producer(fn_id)
959 .expect("invariant: register_producer has no deps; no error variants reachable")
960}
961
962/// Iteratively pop from `buffer` and spawn inners until cap is reached
963/// or buffer is empty. Re-entrance from a nested `on_complete` is
964/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
965/// the drain loop and picks up cap-frees on subsequent iterations.
966// S2b/D234: `core: &Core` → `em: &ProducerEmitter`. The per-inner
967// subscribe routes through `em.defer` (`CoreFull`); the returned
968// `SubscriptionId` is wrapped in a `SubGuard` keyed in `inner_subs`
969// (its Drop schedules the unsubscribe).
970fn drain_merge_buffer(
971 state: &Arc<Mutex<MergeMapState>>,
972 em: &ProducerEmitter,
973 binding: &Arc<dyn HigherOrderBinding>,
974 producer_binding: &Arc<dyn ProducerBinding>,
975 producer_id: NodeId,
976 project_fn_id: FnId,
977 concurrency: Option<u32>,
978) {
979 if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
980 // Already draining on this thread; outer loop will drain
981 // remaining buffer.
982 return;
983 }
984
985 loop {
986 let h_and_id;
987 let mut should_self_complete = false;
988 {
989 let mut s = state.lock().unwrap();
990 if s.terminated {
991 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
992 return;
993 }
994 let allowed = match concurrency {
995 None => true,
996 Some(n) => s.active < n,
997 };
998 if !allowed {
999 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1000 return;
1001 }
1002 if let Some(h) = s.buffer.pop_front() {
1003 s.active += 1;
1004 let id = s.next_inner_id;
1005 s.next_inner_id += 1;
1006 s.pending_inner_ids.insert(id);
1007 h_and_id = Some((h, id));
1008 } else if s.source_done && s.active == 0 && !s.terminated {
1009 s.terminated = true;
1010 should_self_complete = true;
1011 h_and_id = None;
1012 } else {
1013 h_and_id = None;
1014 }
1015 }
1016
1017 if should_self_complete {
1018 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1019 em.complete_or_defer(producer_id);
1020 return;
1021 }
1022
1023 let Some((outer_h, inner_id)) = h_and_id else {
1024 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
1025 return;
1026 };
1027
1028 // Spawn lock-released.
1029 let inner_node = binding.invoke_project(project_fn_id, outer_h);
1030 binding.release_handle(outer_h);
1031
1032 let on_complete = make_merge_on_complete(
1033 state.clone(),
1034 em.clone(),
1035 binding.clone(),
1036 producer_binding.clone(),
1037 producer_id,
1038 project_fn_id,
1039 inner_id,
1040 concurrency,
1041 );
1042 let on_error = make_merge_on_error(state.clone(), em.clone(), binding.clone(), producer_id);
1043 // F2 /qa: clone on_complete so the TornDown branch can
1044 // synthesize inner-Complete (closes the merge_map `s.active`
1045 // leak that left the producer never self-completing when a
1046 // projected inner was dead).
1047 let on_complete_for_dead = on_complete.clone();
1048 let inner_sink = build_inner_sink(
1049 em.clone(),
1050 producer_binding.clone(),
1051 producer_id,
1052 on_complete,
1053 on_error,
1054 );
1055 // D234: inner subscribe via `em.defer` (long-lived drain can't
1056 // hold `&Core`). The sync-completion detection still works: if
1057 // the inner pre-completes during `try_subscribe`'s handshake,
1058 // `on_complete` fires INSIDE this defer (owner-side) and removes
1059 // `inner_id` from `pending_inner_ids` before the post-subscribe
1060 // check below — so a now-dead sub is dropped, not installed.
1061 let state_sub = state.clone();
1062 let em_guard = em.clone();
1063 let _ = em.defer(move |c| {
1064 match c.try_subscribe(inner_node, inner_sink) {
1065 Ok(sub) => {
1066 let guard = SubGuard::new(inner_node, sub, em_guard);
1067 let to_drop = {
1068 let mut s = state_sub.lock().unwrap();
1069 if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1070 Some(guard)
1071 } else {
1072 s.inner_subs.insert(inner_id, guard);
1073 None
1074 }
1075 };
1076 drop(to_drop);
1077 }
1078 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
1079 // R2.2.7.b / F2 /qa: synthesize inner-Complete so
1080 // `make_merge_on_complete` decrements `s.active`,
1081 // clears pending, and checks the self-Complete
1082 // trigger (Dead-inner `s.active` leak fix).
1083 on_complete_for_dead();
1084 }
1085 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
1086 panic!(
1087 "merge_map inner subscribe: partition-order \
1088 violation inside em.defer — substrate invariant broken"
1089 );
1090 }
1091 }
1092 });
1093
1094 // Loop continues — pops next from buffer or returns.
1095 }
1096}
1097
1098fn make_merge_on_complete(
1099 state: Arc<Mutex<MergeMapState>>,
1100 em: ProducerEmitter,
1101 binding: Arc<dyn HigherOrderBinding>,
1102 producer_binding: Arc<dyn ProducerBinding>,
1103 producer_id: NodeId,
1104 project_fn_id: FnId,
1105 this_inner_id: u64,
1106 concurrency: Option<u32>,
1107) -> Arc<dyn Fn()> {
1108 Arc::new(move || {
1109 let removed_sub;
1110 {
1111 let mut s = state.lock().unwrap();
1112 if s.terminated {
1113 return;
1114 }
1115 s.active -= 1;
1116 // Two cases:
1117 // (a) Sync-completion during subscribe: `pending_inner_ids`
1118 // still contains us; `inner_subs` does not. Remove from
1119 // pending so the post-subscribe insert sees we're done
1120 // and skips installing the dead sub.
1121 // (b) Async completion: `inner_subs` contains us. Remove
1122 // and drop lock-released.
1123 s.pending_inner_ids.remove(&this_inner_id);
1124 removed_sub = s.inner_subs.remove(&this_inner_id);
1125 }
1126 drop(removed_sub); // SubGuard::Drop → deferred unsubscribe.
1127
1128 // Try to drain — if we're nested inside an outer drain loop
1129 // (sync-completion path), this is a no-op and the outer drain
1130 // continues.
1131 drain_merge_buffer(
1132 &state,
1133 &em,
1134 &binding,
1135 &producer_binding,
1136 producer_id,
1137 project_fn_id,
1138 concurrency,
1139 );
1140 })
1141}
1142
1143/// Inner-error path for merge_map: terminates the producer + drains
1144/// all inner subs (lock-released) + releases buffered DATA handles.
1145/// Captures `binding` so we can release buffered handles' retains
1146/// (taken on enqueue in the outer_sink's Data branch); without this,
1147/// inner-error before all buffered DATAs project would leak refcount
1148/// shares.
1149fn make_merge_on_error(
1150 state: Arc<Mutex<MergeMapState>>,
1151 em: ProducerEmitter,
1152 binding: Arc<dyn HigherOrderBinding>,
1153 producer_id: NodeId,
1154) -> Arc<dyn Fn(HandleId)> {
1155 Arc::new(move |h| {
1156 let removed_subs;
1157 let buffered_to_release;
1158 {
1159 let mut s = state.lock().unwrap();
1160 if s.terminated {
1161 return;
1162 }
1163 s.terminated = true;
1164 removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1165 s.pending_inner_ids.clear();
1166 buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1167 }
1168 drop(removed_subs); // each SubGuard::Drop → deferred unsubscribe.
1169 for h_b in buffered_to_release {
1170 binding.release_handle(h_b);
1171 }
1172 em.error_or_defer(producer_id, h);
1173 })
1174}
1175
1176// =====================================================================
1177// Compile-time asserts (Slice E /qa; D248/D249-amended)
1178// =====================================================================
1179//
1180// D248/D249/S2c: `SwitchState`/`ExhaustState`/`MergeMapState` embed a
1181// `ProducerEmitter`, which now carries the owner-only `Rc<DeferQueue>`
1182// (the `!Send` `Defer` split off `CoreMailbox`). Under full
1183// single-owner these op-states are owner-thread-only and intentionally
1184// `!Send` — the prior `Send + Sync` assertions were shared-Core-era
1185// legacy and are deleted. `ProjectFn` is a pure projector (captures no
1186// `!Send`), so it stays `Send + Sync`.
1187
1188const _: fn() = || {
1189 fn assert_send_sync<T: Send + Sync>() {}
1190 assert_send_sync::<ProjectFn>();
1191};