Skip to main content

Module flow

Module flow 

Source
Expand description

Flow operators (Slice C-3, D024) — count / predicate / terminal-aware gates that bound which DATA reaches the downstream output.

Mirrors the TS shapes in ~/src/graphrefly-ts/packages/pure-ts/src/extra/operators/take.ts, driven by Core dispatch (graphrefly_core::OperatorOp::Take / [Skip] / [TakeWhile] / [Last]) instead of derived-fn factories.

  • take — emits the first count DATA values then self-completes. count == 0 is allowed (D027): self-completes on first fire with no Data.
  • skip — drops the first count DATA values; emits the rest.
  • take_while — emits while predicate holds; on first false, emits any preceding passes then self-completes.
  • last / last_with_default — buffers the latest DATA; emits Data(latest) (or Data(default) if no DATA arrived and a default was registered) then Complete on upstream COMPLETE.
  • first — sugar for take(source, 1).
  • find — sugar for take(filter(source, predicate), 1).
  • element_at — sugar for take(skip(source, index), 1).

take_until(source, notifier) is intentionally NOT in this slice — it requires the producer / subscription-managed pattern (D020 category B), out of scope for the Core-dispatch operator family.

§Refcount discipline

For last_with_default, the default handle ownership transfers from the caller’s binding-side intern into Core’s LastState via a retain taken inside register_operator. The caller is expected to retain a share for themselves if they want to reference the default elsewhere.

Structs§

FlowRegistration
Registration output for flow operators that don’t carry a user closure (take, skip, last, last_with_default). Zero FFI on the fire path; only the node id matters.

Functions§

element_at
element_at(source, index) — emits the indexth DATA (zero-based) then Complete. Sugar for take(skip(source, index), 1).
find
find(source, predicate) — emits the first DATA matching predicate then Complete. Sugar for take(filter(source, predicate), 1).
first
first(source) — emits the first DATA then Complete. Sugar for take(source, 1).
last
last(source) — buffers the latest DATA; emits Data(latest) then Complete on upstream COMPLETE. On empty stream (no DATA arrived), emits only Complete — subscribers see [Start, Complete]. For a fallback value on empty streams, use last_with_default.
last_with
last with explicit OperatorOpts.
last_with_default
last_with_default(source, default) — buffers the latest DATA; emits Data(latest) then Complete on upstream COMPLETE. On empty stream (no DATA arrived), emits Data(default) then Complete.
last_with_default_with
last_with_default with explicit OperatorOpts.
skip
skip(source, count) — drops the first count DATA values; once the threshold is crossed, subsequent DATAs pass through verbatim. On a wave where every input is still in the skip window, settles [Dirty, Resolved] (D018 pattern).
skip_with
skip with explicit OperatorOpts.
take
take(source, count) — emits the first count DATA values then self-completes via Core::complete. When upstream completes before count is reached, the standard auto-cascade propagates COMPLETE.
take_while
take_while(source, predicate) — emits while predicate(input) holds; on the first false, emits any preceding passes from the same batch then self-completes. Reuses BindingBoundary::predicate_each (D029).
take_while_with
take_while with explicit OperatorOpts.
take_with
take with explicit OperatorOpts.