pub fn zip(
core: &Core,
binding: &Arc<dyn ProducerBinding>,
sources: Vec<NodeId>,
pack_fn_id: FnId,
) -> Result<NodeId, OperatorFactoryError>Expand description
zip(s1, s2, ..., sN) — collect one value from each source, emit a
tuple, repeat. Models RxJS / TS zip:
- Each upstream DATA pushes into that source’s per-source queue.
- When every queue has at least one entry, pop one from each,
pack into a tuple via
graphrefly_core::BindingBoundary::pack_tuple, and emit on the producer. - On any source’s COMPLETE: if its queue is empty, terminate the producer with COMPLETE. Otherwise continue draining; terminate when this source’s queue becomes empty (zip can’t produce a tuple without input from every source).
- On any source’s ERROR: terminate the producer with the same ERROR (first error wins, like merge per Slice C-2 D022).
Empty source list (n == 0) emits a single empty-tuple event then
completes. Single source (n == 1) is identity-passthrough.
§Refcount discipline
Each upstream DATA handle is retain_handle-bumped before being
pushed onto a queue (the inbound message’s payload retain belongs
to the wave-end-flush release path; we take our own share for the
queue). On pop, component handles are passed to pack_tuple which
must NOT consume or release them — the caller (zip) retains
ownership throughout the call and releases each component handle’s
queue share after pack_tuple returns. The returned tuple handle
has a pre-bumped retain (binding convention per D020 doc on
[BindingBoundary::pack_tuple]).
§Errors
Returns OperatorFactoryError::EmptySources when sources is empty
(R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).