Skip to main content

Module higher_order

Module higher_order 

Source
Expand description

Higher-order operators (Slice E, D044) — operators whose project fn returns an inner NodeId for each outer DATA. Mirrors TS legacy extra/operators/higher-order.ts (switchMap / exhaustMap / concatMap / mergeMap).

All four are producer-pattern nodes (no declared deps; subscribe to the outer source from inside the build closure; emit on themselves via Core::emit). This mirrors the super::ops_impl family (zip / concat / race / takeUntil); the producer substrate handles auto-cleanup of upstream + inner subscriptions on producer deactivation (D031–D038).

The four flavors differ in how they handle a new outer DATA while a prior inner is still active:

§Inner-sub tracking (Slice E /qa refactor)

Each operator owns its inner [Subscription]s inside its state Mutex (not in super::producer::ProducerNodeState::subs). producer_storage[producer_id].subs holds only the OUTER source subscription (one entry, no positional concerns). switch_map / exhaust_map keep Option<Subscription> (single active inner); merge / concat keep HashMap<u64, Subscription> keyed by per-op next_inner_id. This avoids two bugs the original positional design exposed: (a) cached-outer source firing handshake before subscribe_to pushed the outer sub, reordering subs[0]; (b) merge/concat completed-inner subs accumulating in subs indefinitely. Inner sub cleanup is per-op now: switch/exhaust take + drop on inner Complete; merge/concat remove specific id on inner Complete.

§Drain discipline (iterative spawn)

merge_map could spawn the next buffered DATA from inside an inner’s on_complete callback. For pathological pre-completed inners (synchronous Complete during the subscribe handshake), recursive spawn would grow the stack proportionally to the buffer depth. The thread-local [MERGE_DRAIN_ACTIVE] flag breaks the recursion: the outermost drain owns the loop; nested on_complete invocations only decrement state and return.

§Project closure (D044)

Each operator takes a project: Fn(HandleId) -> NodeId closure registered through HigherOrderBinding::register_project. Bindings (napi-rs / pyo3 / wasm-bindgen) marshal user-supplied JS / Python / WASM callbacks into this Rust shape; Rust-side users register a closure directly.

Traits§

HigherOrderBinding
Closure-registration interface for higher-order operators.

Functions§

concat_map
concat_map(source, project) — sequential queue variant. Equivalent to merge_map_with_concurrency with Some(1). Each outer DATA is enqueued and processed one-at-a-time.
exhaust_map
exhaust_map(source, project) — like switch_map but DROPS new outer DATA while an inner subscription is active. First outer DATA per “active window” wins; subsequent DATAs are discarded until the inner completes.
merge_map
merge_map(source, project) — unbounded concurrency variant. Equivalent to merge_map_with_concurrency with None.
merge_map_with_concurrency
merge_map_with_concurrency(source, project, concurrency) — projects each outer DATA to an inner Node and subscribes in parallel.
switch_map
switch_map(source, project) — for each outer DATA, cancel the previous inner subscription and subscribe to the inner node returned by project(value). Inner DATA flows through to downstream; inner COMPLETE clears the active slot; outer COMPLETE (with no active inner) self-completes the operator.

Type Aliases§

ProjectFn
Project closure: takes an outer DATA handle, returns the NodeId of an inner node to subscribe to. Closure may register new state/derived nodes on the fly via captured Core.