Expand description
Higher-order operators (Slice E, D044) — operators whose project fn
returns an inner NodeId for each outer DATA. Mirrors TS legacy
extra/operators/higher-order.ts (switchMap / exhaustMap /
concatMap / mergeMap).
All four are producer-pattern nodes (no declared deps; subscribe to
the outer source from inside the build closure; emit on themselves
via Core::emit). This mirrors the super::ops_impl family
(zip / concat / race / takeUntil); the producer substrate handles
auto-cleanup of upstream + inner subscriptions on producer
deactivation (D031–D038).
The four flavors differ in how they handle a new outer DATA while a prior inner is still active:
switch_map— cancel the prior inner (Rx-styleswitchMap).exhaust_map— drop the new value (Rx-styleexhaustMap).concat_map— enqueue; process sequentially. (Equivalent tomerge_map_with_concurrencywithSome(1).)merge_map/merge_map_with_concurrency— spawn in parallel up toconcurrency.None= unbounded.
§Inner-sub tracking (Slice E /qa refactor)
Each operator owns its inner [Subscription]s inside its state
Mutex (not in super::producer::ProducerNodeState::subs).
producer_storage[producer_id].subs holds only the OUTER source
subscription (one entry, no positional concerns). switch_map /
exhaust_map keep Option<Subscription> (single active inner); merge
/ concat keep HashMap<u64, Subscription> keyed by per-op
next_inner_id. This avoids two bugs the original positional design
exposed: (a) cached-outer source firing handshake before
subscribe_to pushed the outer sub, reordering subs[0]; (b)
merge/concat completed-inner subs accumulating in subs indefinitely.
Inner sub cleanup is per-op now: switch/exhaust take + drop on inner
Complete; merge/concat remove specific id on inner Complete.
§Drain discipline (iterative spawn)
merge_map could spawn the next buffered DATA from inside an
inner’s on_complete callback. For pathological pre-completed
inners (synchronous Complete during the subscribe handshake),
recursive spawn would grow the stack proportionally to the buffer
depth. The thread-local [MERGE_DRAIN_ACTIVE] flag breaks the
recursion: the outermost drain owns the loop; nested on_complete
invocations only decrement state and return.
§Project closure (D044)
Each operator takes a project: Fn(HandleId) -> NodeId closure
registered through HigherOrderBinding::register_project. Bindings
(napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python /
WASM callbacks into this Rust shape; Rust-side users register a
closure directly.
Traits§
- Higher
Order Binding - Closure-registration interface for higher-order operators.
Functions§
- concat_
map concat_map(source, project)— sequential queue variant. Equivalent tomerge_map_with_concurrencywithSome(1). Each outer DATA is enqueued and processed one-at-a-time.- exhaust_
map exhaust_map(source, project)— likeswitch_mapbut DROPS new outer DATA while an inner subscription is active. First outer DATA per “active window” wins; subsequent DATAs are discarded until the inner completes.- merge_
map merge_map(source, project)— unbounded concurrency variant. Equivalent tomerge_map_with_concurrencywithNone.- merge_
map_ with_ concurrency merge_map_with_concurrency(source, project, concurrency)— projects each outer DATA to an inner Node and subscribes in parallel.- switch_
map switch_map(source, project)— for each outer DATA, cancel the previous inner subscription and subscribe to the inner node returned byproject(value). Inner DATA flows through to downstream; inner COMPLETE clears the active slot; outer COMPLETE (with no active inner) self-completes the operator.