Expand description
atomr-streams.
Source/Flow/Sink DSL built on top of futures::Stream. The surface
covers the linear operator set and the most common graph-DSL
junctions from upstream:
Source,Flow,Sink— core linear elements.GraphDsl/merge,broadcast,zip,concat()junctions.Framing— delimiter / length-field byte framing.FileIO,Tcp— I/O adapters.KillSwitch— external termination.RestartSource/RestartSettings— automatic resubscription.SourceQueue/Sink::queue— explicit backpressure handles.OverflowStrategy— bounded-buffer policies.BidiFlow— bidirectional composition.
The port delegates pipeline execution to futures_util::StreamExt; each
combinator builds a boxed stream closure that mirrors the corresponding
Operator.
Modules§
- deciders
- Conventional decider helpers.
Structs§
- Actor
Materializer - Bidi
Flow - Broadcast
Hub - Fan one source to many dynamic consumers.
- FileIO
- Flow
- Framing
- Graph
Dsl - Incoming
Connection - Kill
Switch - Merge
Hub - Fan many dynamic producers into one consumer source.
- Outgoing
Connection - Restart
Settings - Restart
Source - Runnable
Graph - Sink
- Sink
Queue - Sink
RefHandle - Consumer-side advertisement of a
Sink<T>. The producer attaches a source viaSinkRefHandle::attachwhich then pumps into the receiver-owned stream. - Source
- Source
Queue - Source
RefHandle - Producer-side advertisement of a
Source<T>. The owner pumps elements; consumers subscribe viaSourceRefHandle::take_source. - Tcp
Enums§
- Framing
Error - Overflow
Strategy - Queue
Offer Result - Supervision
Directive - What a
Deciderreturns for an error.
Functions§
- balance
balance(n)— round-robin one source intonoutputs.- broadcast
- Cheap fan-out into two independent sources using cloned items and a bounded channel per downstream.
- concat
- Drain first source fully, then second.
- conflate
conflate(seed, fold)— when downstream is slower than upstream, merge consecutive upstream elements into a running aggregate viafold. The aggregate is emitted whenever downstream pulls.- count_
elements count_elements(src)— convenience: returns the source unchanged plus anArc<AtomicU64>that totals every element.- expand
expand(extrapolate)— when upstream is slower than downstream, repeatedly callextrapolate(last)between elements to keep downstream supplied. After the upstream completes, the iterator returned byextrapolate(last)continues to be drained until it itself is exhausted.- group_
by group_by(max_substreams, key_fn)— fan one source into N per-key substreams. Each new key yields a(key, Source<T>)pair on the returned outer source. Oncemax_substreamskeys are open, additional keys’ elements are dropped.- grouped_
within grouped_within(n, dur)— emitVec<T>chunks of up tonelements; flush early whendurelapses since the chunk’s first element.- idle_
timeout idle_timeout(d)— complete the stream early if no element arrives ford. We surface “completed early” so a downstreamrecover_with/Sink::collect_with_statuscan disambiguate.- initial_
delay initial_delay(d)— sleepdbefore forwarding the first element. Once the first element has been emitted, downstream sees the source as a normal pass-through.- keep_
alive keep_alive(idle, gen)— inject a synthetic element whenever the upstream is silent for longer thanidle. The synthetic element is produced bygen()(typically a heartbeat).Source.KeepAlive(idle, () => element).- map_
error - Map the error variant via
f. BothOkandErrcontinue downstream; only theErrpayload type changes. - merge
- (interleaving, order not guaranteed).
- merge_
all - with arbitrary fan-in.
- merge_
prioritized - Every input contributes elements in proportion to its weight when both have items pending, falling through to whichever side has work otherwise. Weights ≥ 1.
- merge_
sorted - Merge two already-sorted sources preserving total order. Both inputs must be ascending; output is ascending. Buffers one element per side via tokio mpsc.
- monitor
monitor(src, on_each)— invokeon_each(&item)for every element flowing through, without consuming or transforming it. Useful for telemetry instrumentation.- partition
partition(n, f)— fan one source intonoutput sources; each element is sent to the output picked byf(item). Out-of-range outputs are dropped.- prefix_
and_ tail prefix_and_tail(n)— return the firstnelements as aVecalongside aSource<T>carrying the rest.- recover
- Replace any
Err(e)withf(e)and continue, mapping the stream toT(success values are unwrapped). The first error stops the upstream — subsequent elements are dropped. - recover_
with - Replace the upstream’s tail with
replacementupon the firstErr(_). Pre-errorOk(_)values flow through unchanged. - recover_
with_ retries - Replace the upstream’s tail with
replacement_factory()on each error, capped atmax_attemptstotal replacements. Aftermax_attempts, subsequent errors propagate as terminations. - select_
error - Alias for
map_errormatching naming. Keeping both names makes porting tests verbatim possible. - split_
after split_after(pred)— likesplit_when, except the splitting element stays with the previous substream and the next element starts a new one.- split_
when split_when(pred)— split the source into a sequence of substreams; a new substream begins whenpred(item)returns true, with the splitting element going to the new substream.- unzip
unzip(src)— split a source of(A, B)pairs into two sources. The second is buffered if the first lags (no backpressure across the split — matches fan-out semantics).- watch_
termination watch_termination(src)returns the original source plus aoneshot::Receiver<()>that fires when upstream completes (whether by exhaustion or by the receiver being polled past the final element).- with_
decider - Apply
deciderto each error insrc, emitting only the survivingOkpayloads. - zip
- Pair corresponding elements.
- zip_
with - Pair corresponding elements and apply
f. - zip_
with_ index