Skip to main content

Crate atomr_streams

Crate atomr_streams 

Source
Expand description

atomr-streams.

Source/Flow/Sink DSL built on top of futures::Stream. The surface covers the linear operator set and the most common graph-DSL junctions from upstream:

The port delegates pipeline execution to futures_util::StreamExt; each combinator builds a boxed stream closure that mirrors the corresponding Operator.

Modules§

deciders
Conventional decider helpers.

Structs§

ActorMaterializer
BidiFlow
BroadcastHub
Fan one source to many dynamic consumers.
FileIO
Flow
Framing
GraphDsl
IncomingConnection
KillSwitch
MergeHub
Fan many dynamic producers into one consumer source.
OutgoingConnection
RestartSettings
RestartSource
RunnableGraph
Sink
SinkQueue
SinkRefHandle
Consumer-side advertisement of a Sink<T>. The producer attaches a source via SinkRefHandle::attach which then pumps into the receiver-owned stream.
Source
SourceQueue
SourceRefHandle
Producer-side advertisement of a Source<T>. The owner pumps elements; consumers subscribe via SourceRefHandle::take_source.
Tcp

Enums§

FramingError
OverflowStrategy
QueueOfferResult
SupervisionDirective
What a Decider returns for an error.

Functions§

balance
balance(n) — round-robin one source into n outputs.
broadcast
Cheap fan-out into two independent sources using cloned items and a bounded channel per downstream.
concat
Drain first source fully, then second.
conflate
conflate(seed, fold) — when downstream is slower than upstream, merge consecutive upstream elements into a running aggregate via fold. The aggregate is emitted whenever downstream pulls.
count_elements
count_elements(src) — convenience: returns the source unchanged plus an Arc<AtomicU64> that totals every element.
expand
expand(extrapolate) — when upstream is slower than downstream, repeatedly call extrapolate(last) between elements to keep downstream supplied. After the upstream completes, the iterator returned by extrapolate(last) continues to be drained until it itself is exhausted.
group_by
group_by(max_substreams, key_fn) — fan one source into N per-key substreams. Each new key yields a (key, Source<T>) pair on the returned outer source. Once max_substreams keys are open, additional keys’ elements are dropped.
grouped_within
grouped_within(n, dur) — emit Vec<T> chunks of up to n elements; flush early when dur elapses since the chunk’s first element.
idle_timeout
idle_timeout(d) — complete the stream early if no element arrives for d. We surface “completed early” so a downstream recover_with / Sink::collect_with_status can disambiguate.
initial_delay
initial_delay(d) — sleep d before forwarding the first element. Once the first element has been emitted, downstream sees the source as a normal pass-through.
keep_alive
keep_alive(idle, gen) — inject a synthetic element whenever the upstream is silent for longer than idle. The synthetic element is produced by gen() (typically a heartbeat). Source.KeepAlive(idle, () => element).
map_error
Map the error variant via f. Both Ok and Err continue downstream; only the Err payload type changes.
merge
(interleaving, order not guaranteed).
merge_all
with arbitrary fan-in.
merge_prioritized
Every input contributes elements in proportion to its weight when both have items pending, falling through to whichever side has work otherwise. Weights ≥ 1.
merge_sorted
Merge two already-sorted sources preserving total order. Both inputs must be ascending; output is ascending. Buffers one element per side via tokio mpsc.
monitor
monitor(src, on_each) — invoke on_each(&item) for every element flowing through, without consuming or transforming it. Useful for telemetry instrumentation.
partition
partition(n, f) — fan one source into n output sources; each element is sent to the output picked by f(item). Out-of-range outputs are dropped.
prefix_and_tail
prefix_and_tail(n) — return the first n elements as a Vec alongside a Source<T> carrying the rest.
recover
Replace any Err(e) with f(e) and continue, mapping the stream to T (success values are unwrapped). The first error stops the upstream — subsequent elements are dropped.
recover_with
Replace the upstream’s tail with replacement upon the first Err(_). Pre-error Ok(_) values flow through unchanged.
recover_with_retries
Replace the upstream’s tail with replacement_factory() on each error, capped at max_attempts total replacements. After max_attempts, subsequent errors propagate as terminations.
select_error
Alias for map_error matching naming. Keeping both names makes porting tests verbatim possible.
split_after
split_after(pred) — like split_when, except the splitting element stays with the previous substream and the next element starts a new one.
split_when
split_when(pred) — split the source into a sequence of substreams; a new substream begins when pred(item) returns true, with the splitting element going to the new substream.
unzip
unzip(src) — split a source of (A, B) pairs into two sources. The second is buffered if the first lags (no backpressure across the split — matches fan-out semantics).
watch_termination
watch_termination(src) returns the original source plus a oneshot::Receiver<()> that fires when upstream completes (whether by exhaustion or by the receiver being polled past the final element).
with_decider
Apply decider to each error in src, emitting only the surviving Ok payloads.
zip
Pair corresponding elements.
zip_with
Pair corresponding elements and apply f.
zip_with_index

Type Aliases§

Decider
A decider is a closure mapping &E → SupervisionDirective.
SinkRef
SourceRef