Skip to main content

zip

Function zip 

Source
pub fn zip(
    core: &Core,
    binding: &Arc<dyn ProducerBinding>,
    sources: Vec<NodeId>,
    pack_fn_id: FnId,
) -> Result<NodeId, OperatorFactoryError>
Expand description

zip(s1, s2, ..., sN) — collect one value from each source, emit a tuple, repeat. Models RxJS / TS zip:

  • Each upstream DATA pushes into that source’s per-source queue.
  • When every queue has at least one entry, pop one from each, pack into a tuple via graphrefly_core::BindingBoundary::pack_tuple, and emit on the producer.
  • On any source’s COMPLETE: if its queue is empty, terminate the producer with COMPLETE. Otherwise continue draining; terminate when this source’s queue becomes empty (zip can’t produce a tuple without input from every source).
  • On any source’s ERROR: terminate the producer with the same ERROR (first error wins, like merge per Slice C-2 D022).

Empty source list (n == 0) emits a single empty-tuple event then completes. Single source (n == 1) is identity-passthrough.

§Refcount discipline

Each upstream DATA handle is retain_handle-bumped before being pushed onto a queue (the inbound message’s payload retain belongs to the wave-end-flush release path; we take our own share for the queue). On pop, component handles are passed to pack_tuple which must NOT consume or release them — the caller (zip) retains ownership throughout the call and releases each component handle’s queue share after pack_tuple returns. The returned tuple handle has a pre-bumped retain (binding convention per D020 doc on [BindingBoundary::pack_tuple]).

§Errors

Returns OperatorFactoryError::EmptySources when sources is empty (R5.7.x — zip requires ≥1 source; vacuous-tuple semantics rejected).