graphrefly_operators/higher_order.rs
1//! Higher-order operators (Slice E, D044) — operators whose project fn
2//! returns an inner [`NodeId`] for each outer DATA. Mirrors TS legacy
3//! `extra/operators/higher-order.ts` (`switchMap` / `exhaustMap` /
4//! `concatMap` / `mergeMap`).
5//!
6//! All four are producer-pattern nodes (no declared deps; subscribe to
7//! the outer source from inside the build closure; emit on themselves
8//! via [`Core::emit`]). This mirrors the [`super::ops_impl`] family
9//! (zip / concat / race / takeUntil); the producer substrate handles
10//! auto-cleanup of upstream + inner subscriptions on producer
11//! deactivation (D031–D038).
12//!
13//! The four flavors differ in how they handle a new outer DATA while a
14//! prior inner is still active:
15//!
16//! - [`switch_map`] — cancel the prior inner (Rx-style `switchMap`).
17//! - [`exhaust_map`] — drop the new value (Rx-style `exhaustMap`).
18//! - [`concat_map`] — enqueue; process sequentially. (Equivalent to
19//! [`merge_map_with_concurrency`] with `Some(1)`.)
20//! - [`merge_map`] / [`merge_map_with_concurrency`] — spawn in parallel
21//! up to `concurrency`. `None` = unbounded.
22//!
23//! # Inner-sub tracking (Slice E /qa refactor)
24//!
25//! Each operator owns its inner [`Subscription`]s **inside its state
26//! `Mutex`** (not in [`super::producer::ProducerNodeState::subs`]).
27//! `producer_storage[producer_id].subs` holds only the OUTER source
28//! subscription (one entry, no positional concerns). switch_map /
29//! exhaust_map keep `Option<Subscription>` (single active inner); merge
30//! / concat keep `HashMap<u64, Subscription>` keyed by per-op
31//! `next_inner_id`. This avoids two bugs the original positional design
32//! exposed: (a) cached-outer source firing handshake before
33//! `subscribe_to` pushed the outer sub, reordering `subs[0]`; (b)
34//! merge/concat completed-inner subs accumulating in `subs` indefinitely.
35//! Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
36//! Complete; merge/concat remove specific id on inner Complete.
37//!
38//! # Drain discipline (iterative spawn)
39//!
40//! `merge_map` could spawn the next buffered DATA from inside an
41//! inner's `on_complete` callback. For pathological pre-completed
42//! inners (synchronous Complete during the subscribe handshake),
43//! recursive spawn would grow the stack proportionally to the buffer
44//! depth. The thread-local [`MERGE_DRAIN_ACTIVE`] flag breaks the
45//! recursion: the outermost drain owns the loop; nested `on_complete`
46//! invocations only decrement state and return.
47//!
48//! # Project closure (D044)
49//!
50//! Each operator takes a `project: Fn(HandleId) -> NodeId` closure
51//! registered through [`HigherOrderBinding::register_project`]. Bindings
52//! (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
53//! WASM callbacks into this Rust shape; Rust-side users register a
54//! closure directly.
55
56#![allow(clippy::collapsible_if, clippy::collapsible_match)]
57#![allow(clippy::too_many_arguments, clippy::too_many_lines)]
58
59use std::cell::Cell;
60use std::collections::VecDeque;
61use std::sync::{Arc, Mutex, Weak};
62
63use ahash::AHashMap;
64use graphrefly_core::{Core, FnId, HandleId, Message, NodeId, Sink, Subscription};
65use smallvec::SmallVec;
66
67use super::producer::{ProducerBinding, ProducerCtx};
68
69// =====================================================================
70// HigherOrderBinding — closure registration for project: T -> Node<R>
71// =====================================================================
72
73/// Project closure: takes an outer DATA handle, returns the
74/// [`NodeId`] of an inner node to subscribe to. Closure may register
75/// new state/derived nodes on the fly via captured [`Core`].
76pub type ProjectFn = Box<dyn Fn(HandleId) -> NodeId + Send + Sync>;
77
78/// Closure-registration interface for higher-order operators.
79///
80/// Extends [`ProducerBinding`] with one method that bindings shipping
81/// higher-order operators must implement.
82pub trait HigherOrderBinding: ProducerBinding {
83 /// Register a project closure. The returned [`FnId`] is captured by
84 /// the operator's build closure and looked up via
85 /// [`Self::invoke_project`] on each outer DATA fire.
86 fn register_project(&self, project: ProjectFn) -> FnId;
87
88 /// Invoke a registered project closure with the given outer DATA
89 /// handle. Returns the inner node's [`NodeId`].
90 ///
91 /// # Panics
92 ///
93 /// Implementations panic if `fn_id` is not a registered project
94 /// closure.
95 fn invoke_project(&self, fn_id: FnId, value: HandleId) -> NodeId;
96}
97
98// =====================================================================
99// build_inner_sink — shared inner-subscription handler
100// =====================================================================
101//
102// Each higher-order op subscribes to the inner node returned by its
103// project closure. The inner sink dispatches by `Message::tier()`
104// (R1.3.7.b — central message-tier utility, never hardcode variant
105// checks for forwarding gating per CLAUDE.md design invariant 4):
106//
107// - Tier 0 (Start) — drop.
108// - Tier 1 (Dirty) — drop. (DIRTY forwarding from inner is an
109// acknowledged divergence from TS legacy; Rust producer relies on
110// Core's wave engine to generate DIRTY on the producer's own emits.)
111// - Tier 2 (Pause/Resume) — drop. Backpressure is per-stream; not
112// propagated through higher-order ops by design.
113// - Tier 3 (Data/Resolved) — `Data(h)` forwards via `Core::emit(producer_id, h)`;
114// `Resolved` drops (same wave-engine rationale as Dirty).
115// - Tier 4 (Invalidate) — forwards via `Core::invalidate(producer_id)`
116// per R1.2.7. Inner cache invalidation surfaces to producer
117// subscribers.
118// - Tier 5 (Complete/Error) — semantic dispatch via op-specific
119// callbacks (`on_inner_complete` / `on_inner_error`). Differs
120// per-op (switch_map clears active inner; merge_map decrements
121// active count; etc.).
122// - Tier 6 (Teardown) — forwards via `Core::teardown(producer_id)`
123// per R2.6.4. Inner permanent destruction propagates.
124
125fn build_inner_sink(
126 core: Core,
127 producer_binding: Arc<dyn ProducerBinding>,
128 producer_id: NodeId,
129 on_inner_complete: Arc<dyn Fn() + Send + Sync>,
130 on_inner_error: Arc<dyn Fn(HandleId) + Send + Sync>,
131) -> Sink {
132 Arc::new(move |msgs: &[Message]| {
133 enum Action {
134 Emit(HandleId),
135 Complete,
136 Error(HandleId),
137 Invalidate,
138 Teardown,
139 }
140 let mut actions: SmallVec<[Action; 4]> = SmallVec::new();
141 for m in msgs {
142 match m.tier() {
143 3 => {
144 // Tier 3: Data/Resolved. Only Data carries a payload
145 // to forward; Resolved is dropped per the
146 // wave-engine divergence above.
147 if let Some(h) = m.payload_handle() {
148 producer_binding.retain_handle(h);
149 actions.push(Action::Emit(h));
150 }
151 }
152 4 => {
153 // Tier 4: Invalidate. Forward to producer.
154 actions.push(Action::Invalidate);
155 }
156 5 => {
157 // Tier 5: Complete or Error. Semantic dispatch via
158 // op-specific callbacks. `payload_handle()` is the
159 // canonical Error-vs-Complete discriminator at this
160 // tier.
161 if let Some(h) = m.payload_handle() {
162 producer_binding.retain_handle(h);
163 actions.push(Action::Error(h));
164 } else {
165 actions.push(Action::Complete);
166 }
167 }
168 6 => {
169 // Tier 6: Teardown. Forward to producer.
170 actions.push(Action::Teardown);
171 }
172 // Tiers 0 (Start), 1 (Dirty), 2 (Pause/Resume): drop.
173 _ => {}
174 }
175 }
176 for action in actions {
177 match action {
178 Action::Emit(h) => core.emit_or_defer(producer_id, h),
179 Action::Complete => on_inner_complete(),
180 Action::Error(h) => on_inner_error(h),
181 Action::Invalidate => core.invalidate_or_defer(producer_id),
182 Action::Teardown => core.teardown_or_defer(producer_id),
183 }
184 }
185 })
186}
187
188// =====================================================================
189// switch_map — cancel previous inner on each new outer DATA
190// =====================================================================
191
192struct SwitchState {
193 /// Currently-active inner subscription (if any). Cancelling the
194 /// prior inner is `Option::take` + lock-released drop.
195 inner_sub: Option<Subscription>,
196 source_done: bool,
197 terminated: bool,
198}
199
200impl SwitchState {
201 fn new() -> Self {
202 Self {
203 inner_sub: None,
204 source_done: false,
205 terminated: false,
206 }
207 }
208}
209
210/// `switch_map(source, project)` — for each outer DATA, cancel the
211/// previous inner subscription and subscribe to the inner node returned
212/// by `project(value)`. Inner DATA flows through to downstream; inner
213/// COMPLETE clears the active slot; outer COMPLETE (with no active
214/// inner) self-completes the operator.
215#[must_use]
216pub fn switch_map(
217 core: &Core,
218 binding: &Arc<dyn HigherOrderBinding>,
219 source: NodeId,
220 project: ProjectFn,
221) -> NodeId {
222 let project_fn_id = binding.register_project(project);
223 // Weak captures break the producer-build Arc cycle (see
224 // `graphrefly_operators::ops_impl::zip` doc).
225 let core_weak = core.weak_handle();
226 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
227 let producer_binding_weak: Weak<dyn ProducerBinding> =
228 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
229
230 let build = Box::new(move |ctx: ProducerCtx<'_>| {
231 let producer_id = ctx.node_id();
232 let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
233 core_weak.upgrade(),
234 binding_weak.upgrade(),
235 producer_binding_weak.upgrade(),
236 ) else {
237 return;
238 };
239 let state: Arc<Mutex<SwitchState>> = Arc::new(Mutex::new(SwitchState::new()));
240
241 let state_for_outer = state.clone();
242 let core_for_outer = core_clone.clone();
243 let binding_for_outer = binding_clone.clone();
244 let producer_binding_for_outer = producer_binding.clone();
245
246 let outer_sink: Sink = Arc::new(move |msgs| {
247 // Phase 1: classify under state lock. Track whether we
248 // performed a retain so phase 2 can safely release it
249 // without underflow on `[Data(_), Error(_)]` same-batch
250 // (P1 /qa fix).
251 #[derive(Default)]
252 struct Plan {
253 latest_outer_h: Option<HandleId>,
254 latest_retained: bool,
255 self_complete: bool,
256 self_error: Option<HandleId>,
257 }
258 let mut plan = Plan::default();
259 {
260 let mut s = state_for_outer.lock().unwrap();
261 if s.terminated {
262 return;
263 }
264 // Tier-based dispatch per `feedback_use_tier_for_signal_routing.md`
265 // and canonical §4.2 ("Always use the provided tier utilities
266 // rather than hardcoding type checks"). Tier 3 carries
267 // DATA/RESOLVED — only DATA has a payload_handle, so the
268 // `payload_handle().is_some()` test discriminates within
269 // the tier without referring to specific variants. Tier 5
270 // carries COMPLETE/ERROR — same shape (Error has handle,
271 // Complete doesn't).
272 for m in msgs {
273 match m.tier() {
274 3 => {
275 if let Some(h) = m.payload_handle() {
276 // switchMap: only the LATEST outer DATA in the
277 // batch matters (TS legacy "skip to last in
278 // the batch to avoid creating + immediately
279 // discarding N-1 inners"). Track the handle;
280 // we'll project + subscribe once after the
281 // lock drops. Skipped handles need no retain.
282 plan.latest_outer_h = Some(h);
283 }
284 // else: Resolved on outer source — no action.
285 }
286 5 => {
287 if let Some(h) = m.payload_handle() {
288 // Error
289 if !s.terminated {
290 s.terminated = true;
291 binding_for_outer.retain_handle(h);
292 plan.self_error = Some(h);
293 }
294 } else {
295 // Complete
296 s.source_done = true;
297 if s.inner_sub.is_none()
298 && plan.latest_outer_h.is_none()
299 && !s.terminated
300 {
301 s.terminated = true;
302 plan.self_complete = true;
303 }
304 }
305 }
306 _ => {} // Tiers 0/1/2/4/6 — no action on outer source.
307 }
308 }
309 if let Some(h) = plan.latest_outer_h {
310 if !s.terminated {
311 // Retain ONE share for the chosen handle —
312 // released by phase 2 after invoke_project.
313 binding_for_outer.retain_handle(h);
314 plan.latest_retained = true;
315 }
316 }
317 }
318
319 // Phase 2: cancel prior inner sub, project, subscribe.
320 // Gated on `latest_retained` so that an `Error` arriving in
321 // the same batch (which sets terminated and skips the
322 // retain) does NOT trigger an unbalanced release.
323 if plan.latest_retained {
324 let outer_h = plan
325 .latest_outer_h
326 .expect("latest_retained implies latest_outer_h is Some");
327
328 // Cancel prior inner sub (if any) lock-released.
329 let prev_inner = {
330 let mut s = state_for_outer.lock().unwrap();
331 s.inner_sub.take()
332 };
333 drop(prev_inner);
334
335 // invoke_project lock-released. Releases our retain after.
336 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
337 binding_for_outer.release_handle(outer_h);
338
339 let on_complete = make_switch_on_complete(
340 state_for_outer.clone(),
341 core_for_outer.clone(),
342 producer_id,
343 );
344 let on_error = make_switch_on_error(
345 state_for_outer.clone(),
346 core_for_outer.clone(),
347 producer_id,
348 );
349 // F2 /qa (2026-05-10): clone `on_complete` so the Dead
350 // branch can invoke it as an immediate "inner completed
351 // before any emit" signal — closes the bug where a
352 // batched `[outer.Data, outer.Complete]` projected to a
353 // dead inner wedged the producer (source_done=true,
354 // inner_sub=None, no future trigger).
355 let on_complete_for_dead = on_complete.clone();
356 let inner_sink = build_inner_sink(
357 core_for_outer.clone(),
358 producer_binding_for_outer.clone(),
359 producer_id,
360 on_complete,
361 on_error,
362 );
363 // Phase H+ STRICT: try_subscribe + defer for inner source.
364 // F2 /qa: TornDown synthesizes inner-Complete via
365 // on_complete (so the operator's self-Complete trigger
366 // can fire).
367 let inner_sink_for_defer = inner_sink.clone();
368 match core_for_outer.try_subscribe(inner_node, inner_sink) {
369 Ok(inner_sub) => {
370 let to_drop = {
371 let mut s = state_for_outer.lock().unwrap();
372 if s.terminated {
373 Some(inner_sub)
374 } else {
375 s.inner_sub.replace(inner_sub)
376 }
377 };
378 drop(to_drop);
379 }
380 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
381 let core_cb = core_for_outer.clone();
382 let state_cb = state_for_outer.clone();
383 core_for_outer.push_deferred_producer_op(
384 graphrefly_core::DeferredProducerOp::Callback(Box::new(move || {
385 let inner_sub = core_cb.subscribe(inner_node, inner_sink_for_defer);
386 let to_drop = {
387 let mut s = state_cb.lock().unwrap();
388 if s.terminated {
389 Some(inner_sub)
390 } else {
391 s.inner_sub.replace(inner_sub)
392 }
393 };
394 drop(to_drop);
395 })),
396 );
397 }
398 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
399 // R2.2.7.b: inner is dead. Synthesize
400 // inner-Complete so switch's state machine
401 // clears inner_sub and checks the self-Complete
402 // trigger (closes the [Data,Complete] batched
403 // wedge bug).
404 on_complete_for_dead();
405 }
406 }
407 }
408
409 if plan.self_complete {
410 core_for_outer.complete_or_defer(producer_id);
411 } else if let Some(h) = plan.self_error {
412 core_for_outer.error_or_defer(producer_id, h);
413 }
414 });
415
416 ctx.subscribe_to(source, outer_sink);
417 });
418
419 let fn_id = binding.register_producer_build(build);
420 core.register_producer(fn_id)
421 .expect("invariant: register_producer has no deps; no error variants reachable")
422}
423
424fn make_switch_on_complete(
425 state: Arc<Mutex<SwitchState>>,
426 core: Core,
427 producer_id: NodeId,
428) -> Arc<dyn Fn() + Send + Sync> {
429 Arc::new(move || {
430 let prev_inner;
431 let mut should_complete = false;
432 {
433 let mut s = state.lock().unwrap();
434 if s.terminated {
435 return;
436 }
437 prev_inner = s.inner_sub.take();
438 if s.source_done && !s.terminated {
439 s.terminated = true;
440 should_complete = true;
441 }
442 }
443 drop(prev_inner);
444 if should_complete {
445 core.complete_or_defer(producer_id);
446 }
447 })
448}
449
450fn make_switch_on_error(
451 state: Arc<Mutex<SwitchState>>,
452 core: Core,
453 producer_id: NodeId,
454) -> Arc<dyn Fn(HandleId) + Send + Sync> {
455 Arc::new(move |h| {
456 let prev_inner;
457 {
458 let mut s = state.lock().unwrap();
459 if s.terminated {
460 return;
461 }
462 s.terminated = true;
463 prev_inner = s.inner_sub.take();
464 }
465 drop(prev_inner);
466 core.error_or_defer(producer_id, h);
467 })
468}
469
470// =====================================================================
471// exhaust_map — ignore outer DATA while inner is active
472// =====================================================================
473
474struct ExhaustState {
475 inner_sub: Option<Subscription>,
476 source_done: bool,
477 terminated: bool,
478}
479
480impl ExhaustState {
481 fn new() -> Self {
482 Self {
483 inner_sub: None,
484 source_done: false,
485 terminated: false,
486 }
487 }
488}
489
490/// `exhaust_map(source, project)` — like [`switch_map`] but DROPS new
491/// outer DATA while an inner subscription is active. First outer DATA
492/// per "active window" wins; subsequent DATAs are discarded until the
493/// inner completes.
494#[must_use]
495pub fn exhaust_map(
496 core: &Core,
497 binding: &Arc<dyn HigherOrderBinding>,
498 source: NodeId,
499 project: ProjectFn,
500) -> NodeId {
501 let project_fn_id = binding.register_project(project);
502 // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
503 let core_weak = core.weak_handle();
504 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
505 let producer_binding_weak: Weak<dyn ProducerBinding> =
506 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
507
508 let build = Box::new(move |ctx: ProducerCtx<'_>| {
509 let producer_id = ctx.node_id();
510 let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
511 core_weak.upgrade(),
512 binding_weak.upgrade(),
513 producer_binding_weak.upgrade(),
514 ) else {
515 return;
516 };
517 let state: Arc<Mutex<ExhaustState>> = Arc::new(Mutex::new(ExhaustState::new()));
518
519 let state_for_outer = state.clone();
520 let core_for_outer = core_clone.clone();
521 let binding_for_outer = binding_clone.clone();
522 let producer_binding_for_outer = producer_binding.clone();
523
524 let outer_sink: Sink = Arc::new(move |msgs| {
525 #[derive(Default)]
526 struct Plan {
527 first_outer_h: Option<HandleId>,
528 first_retained: bool,
529 self_complete: bool,
530 self_error: Option<HandleId>,
531 }
532 let mut plan = Plan::default();
533 {
534 let mut s = state_for_outer.lock().unwrap();
535 if s.terminated {
536 return;
537 }
538 // Tier-based dispatch (canonical §4.2; see
539 // `feedback_use_tier_for_signal_routing.md`).
540 for m in msgs {
541 match m.tier() {
542 3 => {
543 if let Some(h) = m.payload_handle() {
544 // First DATA per active window wins.
545 // Remember the first one we accept; subsequent
546 // batch entries (or DATAs after) drop.
547 if s.inner_sub.is_none() && plan.first_outer_h.is_none() {
548 binding_for_outer.retain_handle(h);
549 plan.first_outer_h = Some(h);
550 plan.first_retained = true;
551 }
552 }
553 // else: Resolved on outer source — no action.
554 }
555 5 => {
556 if let Some(h) = m.payload_handle() {
557 // Error
558 if !s.terminated {
559 s.terminated = true;
560 // Release any retain we took for
561 // first_outer_h — we won't be projecting it.
562 if plan.first_retained {
563 if let Some(h0) = plan.first_outer_h.take() {
564 binding_for_outer.release_handle(h0);
565 plan.first_retained = false;
566 }
567 }
568 binding_for_outer.retain_handle(h);
569 plan.self_error = Some(h);
570 }
571 } else {
572 // Complete
573 s.source_done = true;
574 if s.inner_sub.is_none()
575 && plan.first_outer_h.is_none()
576 && !s.terminated
577 {
578 s.terminated = true;
579 plan.self_complete = true;
580 }
581 }
582 }
583 _ => {} // Tiers 0/1/2/4/6 — no action.
584 }
585 }
586 }
587
588 if plan.first_retained {
589 let outer_h = plan
590 .first_outer_h
591 .expect("first_retained implies first_outer_h is Some");
592 let inner_node = binding_for_outer.invoke_project(project_fn_id, outer_h);
593 binding_for_outer.release_handle(outer_h);
594
595 let on_complete = make_exhaust_on_complete(
596 state_for_outer.clone(),
597 core_for_outer.clone(),
598 producer_id,
599 );
600 let on_error = make_exhaust_on_error(
601 state_for_outer.clone(),
602 core_for_outer.clone(),
603 producer_id,
604 );
605 // F2 /qa: clone for TornDown synthesis (mirrors switch_map).
606 let on_complete_for_dead = on_complete.clone();
607 let inner_sink = build_inner_sink(
608 core_for_outer.clone(),
609 producer_binding_for_outer.clone(),
610 producer_id,
611 on_complete,
612 on_error,
613 );
614 // If inner already pre-completed during the handshake,
615 // on_complete already cleared `inner_sub`. We `replace`
616 // either way; in the synchronous-completion path,
617 // `inner_sub` was None so our just-subscribed (and
618 // already-dead) sub is dropped on the next iteration.
619 // Phase H+ STRICT: try_subscribe + defer for inner source.
620 // R2.2.7.b: TornDown means the inner source is non-resubscribable
621 // and terminal — skip silently.
622 let inner_sink_for_defer = inner_sink.clone();
623 match core_for_outer.try_subscribe(inner_node, inner_sink) {
624 Ok(inner_sub) => {
625 let to_drop = {
626 let mut s = state_for_outer.lock().unwrap();
627 if s.terminated {
628 Some(inner_sub)
629 } else {
630 s.inner_sub.replace(inner_sub)
631 }
632 };
633 drop(to_drop);
634 }
635 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
636 let core_cb = core_for_outer.clone();
637 let state_cb = state_for_outer.clone();
638 core_for_outer.push_deferred_producer_op(
639 graphrefly_core::DeferredProducerOp::Callback(Box::new(move || {
640 let inner_sub = core_cb.subscribe(inner_node, inner_sink_for_defer);
641 let to_drop = {
642 let mut s = state_cb.lock().unwrap();
643 if s.terminated {
644 Some(inner_sub)
645 } else {
646 s.inner_sub.replace(inner_sub)
647 }
648 };
649 drop(to_drop);
650 })),
651 );
652 }
653 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
654 // R2.2.7.b / F2 /qa: synthesize inner-Complete
655 // so exhaust's `s.inner_sub` clears and the
656 // next outer DATA can re-project (previously
657 // the TornDown branch was a silent no-op which
658 // could leave outer spawning more dead
659 // projections indefinitely without ever
660 // advancing the state machine).
661 on_complete_for_dead();
662 }
663 }
664 }
665
666 if plan.self_complete {
667 core_for_outer.complete_or_defer(producer_id);
668 } else if let Some(h) = plan.self_error {
669 core_for_outer.error_or_defer(producer_id, h);
670 }
671 });
672
673 ctx.subscribe_to(source, outer_sink);
674 });
675
676 let fn_id = binding.register_producer_build(build);
677 core.register_producer(fn_id)
678 .expect("invariant: register_producer has no deps; no error variants reachable")
679}
680
681fn make_exhaust_on_complete(
682 state: Arc<Mutex<ExhaustState>>,
683 core: Core,
684 producer_id: NodeId,
685) -> Arc<dyn Fn() + Send + Sync> {
686 Arc::new(move || {
687 let prev_inner;
688 let mut should_complete = false;
689 {
690 let mut s = state.lock().unwrap();
691 if s.terminated {
692 return;
693 }
694 prev_inner = s.inner_sub.take();
695 if s.source_done && !s.terminated {
696 s.terminated = true;
697 should_complete = true;
698 }
699 }
700 drop(prev_inner);
701 if should_complete {
702 core.complete_or_defer(producer_id);
703 }
704 })
705}
706
707fn make_exhaust_on_error(
708 state: Arc<Mutex<ExhaustState>>,
709 core: Core,
710 producer_id: NodeId,
711) -> Arc<dyn Fn(HandleId) + Send + Sync> {
712 Arc::new(move |h| {
713 let prev_inner;
714 {
715 let mut s = state.lock().unwrap();
716 if s.terminated {
717 return;
718 }
719 s.terminated = true;
720 prev_inner = s.inner_sub.take();
721 }
722 drop(prev_inner);
723 core.error_or_defer(producer_id, h);
724 })
725}
726
727// =====================================================================
728// merge_map — parallel inners up to `concurrency` cap
729// concat_map — wrapper for concurrency = Some(1)
730// =====================================================================
731
732thread_local! {
733 /// Per-thread guard preventing recursive drain of `MergeMapState`.
734 /// When an `on_complete` fires synchronously inside a
735 /// `Core::subscribe` handshake (pre-completed inner), it must not
736 /// re-enter the drain loop — instead it just decrements + removes
737 /// its sub and returns. The outermost drain owns the loop and
738 /// observes the freed-up cap on its next iteration.
739 static MERGE_DRAIN_ACTIVE: Cell<bool> = const { Cell::new(false) };
740}
741
742struct MergeMapState {
743 /// Number of currently-active inner subscriptions (spawned but
744 /// not yet completed/errored).
745 active: u32,
746 /// Outer DATAs waiting because `active >= concurrency`. Each
747 /// handle has one retain share (taken on enqueue, released on
748 /// dequeue + project).
749 buffer: VecDeque<HandleId>,
750 /// Per-inner `Subscription`s, keyed by `next_inner_id`. Each
751 /// inner's `on_complete` removes its entry by id (lock-released
752 /// drop).
753 inner_subs: AHashMap<u64, Subscription>,
754 /// Pending inner ids (between `subscribe` call and
755 /// `inner_subs.insert`). Used to detect synchronous-completion:
756 /// if `on_complete` runs during `subscribe`, it removes from
757 /// `pending_inner_ids`; the post-subscribe code checks the set
758 /// and skips inserting the now-dead sub.
759 pending_inner_ids: ahash::AHashSet<u64>,
760 next_inner_id: u64,
761 source_done: bool,
762 terminated: bool,
763}
764
765impl MergeMapState {
766 fn new() -> Self {
767 Self {
768 active: 0,
769 buffer: VecDeque::new(),
770 inner_subs: AHashMap::new(),
771 pending_inner_ids: ahash::AHashSet::new(),
772 next_inner_id: 0,
773 source_done: false,
774 terminated: false,
775 }
776 }
777}
778
779/// `merge_map(source, project)` — unbounded concurrency variant.
780/// Equivalent to [`merge_map_with_concurrency`] with `None`.
781#[must_use]
782pub fn merge_map(
783 core: &Core,
784 binding: &Arc<dyn HigherOrderBinding>,
785 source: NodeId,
786 project: ProjectFn,
787) -> NodeId {
788 merge_map_with_concurrency(core, binding, source, project, None)
789}
790
791/// `concat_map(source, project)` — sequential queue variant.
792/// Equivalent to [`merge_map_with_concurrency`] with `Some(1)`. Each
793/// outer DATA is enqueued and processed one-at-a-time.
794#[must_use]
795pub fn concat_map(
796 core: &Core,
797 binding: &Arc<dyn HigherOrderBinding>,
798 source: NodeId,
799 project: ProjectFn,
800) -> NodeId {
801 merge_map_with_concurrency(core, binding, source, project, Some(1))
802}
803
804/// `merge_map_with_concurrency(source, project, concurrency)` — projects
805/// each outer DATA to an inner Node and subscribes in parallel.
806///
807/// `concurrency`:
808/// - `None` → unbounded (every outer DATA spawns immediately).
809/// - `Some(n)` → at most `n` concurrent inners; excess outer DATAs
810/// buffer until an active inner completes.
811///
812/// Per D043 / D040, this matches the
813/// [`Core::set_pause_buffer_cap`](graphrefly_core::Core::set_pause_buffer_cap)
814/// `Option<usize>` precedent (None = unbounded). `Some(0)` is degenerate
815/// (would buffer everything indefinitely without ever spawning) but
816/// accepted at the type level.
817#[must_use]
818pub fn merge_map_with_concurrency(
819 core: &Core,
820 binding: &Arc<dyn HigherOrderBinding>,
821 source: NodeId,
822 project: ProjectFn,
823 concurrency: Option<u32>,
824) -> NodeId {
825 let project_fn_id = binding.register_project(project);
826 // Weak captures break the producer-build Arc cycle (see `switch_map` doc).
827 let core_weak = core.weak_handle();
828 let binding_weak: Weak<dyn HigherOrderBinding> = Arc::downgrade(binding);
829 let producer_binding_weak: Weak<dyn ProducerBinding> =
830 Arc::downgrade(&(binding.clone() as Arc<dyn ProducerBinding>));
831
832 let build = Box::new(move |ctx: ProducerCtx<'_>| {
833 let producer_id = ctx.node_id();
834 let (Some(core_clone), Some(binding_clone), Some(producer_binding)) = (
835 core_weak.upgrade(),
836 binding_weak.upgrade(),
837 producer_binding_weak.upgrade(),
838 ) else {
839 return;
840 };
841 let state: Arc<Mutex<MergeMapState>> = Arc::new(Mutex::new(MergeMapState::new()));
842
843 let state_for_outer = state.clone();
844 let core_for_outer = core_clone.clone();
845 let binding_for_outer = binding_clone.clone();
846 let producer_binding_for_outer = producer_binding.clone();
847
848 let outer_sink: Sink = Arc::new(move |msgs| {
849 // Phase 1: enqueue DATAs into the buffer (always — drain
850 // loop dequeues + spawns up to cap), classify terminal
851 // signals.
852 let mut error_action: Option<HandleId> = None;
853 let mut self_complete_now = false;
854 {
855 let mut s = state_for_outer.lock().unwrap();
856 if s.terminated {
857 return;
858 }
859 // Tier-based dispatch (canonical §4.2; see
860 // `feedback_use_tier_for_signal_routing.md`).
861 for m in msgs {
862 match m.tier() {
863 3 => {
864 if let Some(h) = m.payload_handle() {
865 // Retain on enqueue — released by drain
866 // after invoke_project.
867 binding_for_outer.retain_handle(h);
868 s.buffer.push_back(h);
869 }
870 // else: Resolved on outer source — no action.
871 }
872 5 => {
873 if let Some(h) = m.payload_handle() {
874 // Error
875 if !s.terminated {
876 s.terminated = true;
877 binding_for_outer.retain_handle(h);
878 while let Some(q) = s.buffer.pop_front() {
879 binding_for_outer.release_handle(q);
880 }
881 error_action = Some(h);
882 }
883 } else {
884 // Complete
885 s.source_done = true;
886 if s.active == 0 && s.buffer.is_empty() && !s.terminated {
887 s.terminated = true;
888 self_complete_now = true;
889 }
890 }
891 }
892 _ => {} // Tiers 0/1/2/4/6 — no action.
893 }
894 }
895 }
896
897 if let Some(h) = error_action {
898 core_for_outer.error_or_defer(producer_id, h);
899 return;
900 }
901 if self_complete_now {
902 core_for_outer.complete_or_defer(producer_id);
903 return;
904 }
905
906 // Phase 2: drain buffer iteratively up to concurrency cap.
907 drain_merge_buffer(
908 &state_for_outer,
909 &core_for_outer,
910 &binding_for_outer,
911 &producer_binding_for_outer,
912 producer_id,
913 project_fn_id,
914 concurrency,
915 );
916 });
917
918 ctx.subscribe_to(source, outer_sink);
919 });
920
921 let fn_id = binding.register_producer_build(build);
922 core.register_producer(fn_id)
923 .expect("invariant: register_producer has no deps; no error variants reachable")
924}
925
926/// Iteratively pop from `buffer` and spawn inners until cap is reached
927/// or buffer is empty. Re-entrance from a nested `on_complete` is
928/// short-circuited via [`MERGE_DRAIN_ACTIVE`]; the outermost call owns
929/// the drain loop and picks up cap-frees on subsequent iterations.
930fn drain_merge_buffer(
931 state: &Arc<Mutex<MergeMapState>>,
932 core: &Core,
933 binding: &Arc<dyn HigherOrderBinding>,
934 producer_binding: &Arc<dyn ProducerBinding>,
935 producer_id: NodeId,
936 project_fn_id: FnId,
937 concurrency: Option<u32>,
938) {
939 if MERGE_DRAIN_ACTIVE.with(|f| f.replace(true)) {
940 // Already draining on this thread; outer loop will drain
941 // remaining buffer.
942 return;
943 }
944
945 loop {
946 let h_and_id;
947 let mut should_self_complete = false;
948 {
949 let mut s = state.lock().unwrap();
950 if s.terminated {
951 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
952 return;
953 }
954 let allowed = match concurrency {
955 None => true,
956 Some(n) => s.active < n,
957 };
958 if !allowed {
959 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
960 return;
961 }
962 if let Some(h) = s.buffer.pop_front() {
963 s.active += 1;
964 let id = s.next_inner_id;
965 s.next_inner_id += 1;
966 s.pending_inner_ids.insert(id);
967 h_and_id = Some((h, id));
968 } else if s.source_done && s.active == 0 && !s.terminated {
969 s.terminated = true;
970 should_self_complete = true;
971 h_and_id = None;
972 } else {
973 h_and_id = None;
974 }
975 }
976
977 if should_self_complete {
978 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
979 core.complete_or_defer(producer_id);
980 return;
981 }
982
983 let Some((outer_h, inner_id)) = h_and_id else {
984 MERGE_DRAIN_ACTIVE.with(|f| f.set(false));
985 return;
986 };
987
988 // Spawn lock-released.
989 let inner_node = binding.invoke_project(project_fn_id, outer_h);
990 binding.release_handle(outer_h);
991
992 let on_complete = make_merge_on_complete(
993 state.clone(),
994 core.clone(),
995 binding.clone(),
996 producer_binding.clone(),
997 producer_id,
998 project_fn_id,
999 inner_id,
1000 concurrency,
1001 );
1002 let on_error =
1003 make_merge_on_error(state.clone(), core.clone(), binding.clone(), producer_id);
1004 // F2 /qa: clone on_complete so the TornDown branch can
1005 // synthesize inner-Complete (closes the merge_map `s.active`
1006 // leak that left the producer never self-completing when a
1007 // projected inner was dead).
1008 let on_complete_for_dead = on_complete.clone();
1009 let inner_sink = build_inner_sink(
1010 core.clone(),
1011 producer_binding.clone(),
1012 producer_id,
1013 on_complete,
1014 on_error,
1015 );
1016 // Phase H+ STRICT: try_subscribe + defer for inner source.
1017 // R2.2.7.b: TornDown means the inner source is non-resubscribable
1018 // and terminal — skip silently and remove from pending so the
1019 // overall lifecycle isn't wedged waiting on a dead inner.
1020 let inner_sink_for_defer = inner_sink.clone();
1021 match core.try_subscribe(inner_node, inner_sink) {
1022 Ok(inner_sub) => {
1023 // Decide whether to install the sub: if `on_complete` fired
1024 // synchronously inside `subscribe` (pre-completed inner), it
1025 // already removed `inner_id` from `pending_inner_ids`.
1026 let to_drop = {
1027 let mut s = state.lock().unwrap();
1028 if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1029 Some(inner_sub)
1030 } else {
1031 s.inner_subs.insert(inner_id, inner_sub);
1032 None
1033 }
1034 };
1035 drop(to_drop);
1036 }
1037 Err(graphrefly_core::SubscribeError::PartitionOrderViolation(_)) => {
1038 let core_cb = core.clone();
1039 let state_cb = state.clone();
1040 core.push_deferred_producer_op(graphrefly_core::DeferredProducerOp::Callback(
1041 Box::new(move || {
1042 let inner_sub = core_cb.subscribe(inner_node, inner_sink_for_defer);
1043 let to_drop = {
1044 let mut s = state_cb.lock().unwrap();
1045 if s.terminated || !s.pending_inner_ids.remove(&inner_id) {
1046 Some(inner_sub)
1047 } else {
1048 s.inner_subs.insert(inner_id, inner_sub);
1049 None
1050 }
1051 };
1052 drop(to_drop);
1053 }),
1054 ));
1055 }
1056 Err(graphrefly_core::SubscribeError::TornDown { .. }) => {
1057 // R2.2.7.b / F2 /qa: synthesize inner-Complete. This
1058 // delegates to `make_merge_on_complete`'s state-machine
1059 // which decrements `s.active`, removes inner_id from
1060 // pending, and checks the self-Complete trigger
1061 // (source_done && active == 0 && buffer.empty()) —
1062 // closing the wedge bug where a Dead inner left
1063 // `s.active` permanently inflated and merge_map
1064 // never self-completed even after source completed.
1065 on_complete_for_dead();
1066 }
1067 }
1068
1069 // Loop continues — pops next from buffer or returns.
1070 }
1071}
1072
1073fn make_merge_on_complete(
1074 state: Arc<Mutex<MergeMapState>>,
1075 core: Core,
1076 binding: Arc<dyn HigherOrderBinding>,
1077 producer_binding: Arc<dyn ProducerBinding>,
1078 producer_id: NodeId,
1079 project_fn_id: FnId,
1080 this_inner_id: u64,
1081 concurrency: Option<u32>,
1082) -> Arc<dyn Fn() + Send + Sync> {
1083 Arc::new(move || {
1084 let removed_sub;
1085 {
1086 let mut s = state.lock().unwrap();
1087 if s.terminated {
1088 return;
1089 }
1090 s.active -= 1;
1091 // Two cases:
1092 // (a) Sync-completion during subscribe: `pending_inner_ids`
1093 // still contains us; `inner_subs` does not. Remove from
1094 // pending so the post-subscribe insert sees we're done
1095 // and skips installing the dead sub.
1096 // (b) Async completion: `inner_subs` contains us. Remove
1097 // and drop lock-released.
1098 s.pending_inner_ids.remove(&this_inner_id);
1099 removed_sub = s.inner_subs.remove(&this_inner_id);
1100 }
1101 drop(removed_sub);
1102
1103 // Try to drain — if we're nested inside an outer drain loop
1104 // (sync-completion path), this is a no-op and the outer drain
1105 // continues.
1106 drain_merge_buffer(
1107 &state,
1108 &core,
1109 &binding,
1110 &producer_binding,
1111 producer_id,
1112 project_fn_id,
1113 concurrency,
1114 );
1115 })
1116}
1117
1118/// Inner-error path for merge_map: terminates the producer + drains
1119/// all inner subs (lock-released) + releases buffered DATA handles.
1120/// Captures `binding` so we can release buffered handles' retains
1121/// (taken on enqueue in the outer_sink's Data branch); without this,
1122/// inner-error before all buffered DATAs project would leak refcount
1123/// shares.
1124fn make_merge_on_error(
1125 state: Arc<Mutex<MergeMapState>>,
1126 core: Core,
1127 binding: Arc<dyn HigherOrderBinding>,
1128 producer_id: NodeId,
1129) -> Arc<dyn Fn(HandleId) + Send + Sync> {
1130 Arc::new(move |h| {
1131 let removed_subs;
1132 let buffered_to_release;
1133 {
1134 let mut s = state.lock().unwrap();
1135 if s.terminated {
1136 return;
1137 }
1138 s.terminated = true;
1139 removed_subs = s.inner_subs.drain().map(|(_, sub)| sub).collect::<Vec<_>>();
1140 s.pending_inner_ids.clear();
1141 buffered_to_release = s.buffer.drain(..).collect::<Vec<_>>();
1142 }
1143 drop(removed_subs);
1144 for h_b in buffered_to_release {
1145 binding.release_handle(h_b);
1146 }
1147 core.error_or_defer(producer_id, h);
1148 })
1149}
1150
1151// =====================================================================
1152// Send + Sync compile-time asserts (Slice E /qa)
1153// =====================================================================
1154
1155const _: fn() = || {
1156 fn assert_send_sync<T: Send + Sync>() {}
1157 assert_send_sync::<SwitchState>();
1158 assert_send_sync::<ExhaustState>();
1159 assert_send_sync::<MergeMapState>();
1160 assert_send_sync::<ProjectFn>();
1161};