pub struct BroadcastOutputStream<D = BestEffortDelivery, R = NoReplay>{ /* private fields */ }Expand description
The output stream from a process using a multi-consumer broadcast backend.
Broadcast streams support multiple consumers and can optionally retain replay history for consumers that attach after output has already arrived. Use this backend when the same stream needs concurrent fanout, such as logging plus readiness checks or logging plus collection. Delivery policy still determines whether slow active consumers observe gaps or apply backpressure.
Implementations§
Source§impl<D, R> BroadcastOutputStream<D, R>
impl<D, R> BroadcastOutputStream<D, R>
Sourcepub fn from_stream<S: AsyncRead + Unpin + Send + 'static>(
stream: S,
stream_name: &'static str,
options: StreamConfig<D, R>,
) -> Self
pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>( stream: S, stream_name: &'static str, options: StreamConfig<D, R>, ) -> Self
Creates a new broadcast output stream from an async read stream and typed stream config.
Source§impl<D> BroadcastOutputStream<D, ReplayEnabled>where
D: Delivery,
impl<D> BroadcastOutputStream<D, ReplayEnabled>where
D: Delivery,
Sourcepub fn seal_replay(&self)
pub fn seal_replay(&self)
Seals replay history for future subscribers.
This is a one-way, idempotent operation. Active subscribers keep their unread tail data according to the configured delivery policy.
§Panics
Panics if the internal state mutex is poisoned.
Sourcepub fn is_replay_sealed(&self) -> bool
pub fn is_replay_sealed(&self) -> bool
Returns true once replay history has been sealed.
§Panics
Panics if the internal state mutex is poisoned.
Source§impl<D, R> BroadcastOutputStream<D, R>
impl<D, R> BroadcastOutputStream<D, R>
Sourcepub fn consume_with<V>(&self, visitor: V) -> Consumer<V::Output>where
V: StreamVisitor,
pub fn consume_with<V>(&self, visitor: V) -> Consumer<V::Output>where
V: StreamVisitor,
Drives the provided synchronous StreamVisitor over this stream and returns a
Consumer that owns the spawned task.
All built-in inspect_*, collect_*, and wait_for_line factories construct a
built-in visitor and call this method internally; reach for consume_with when the
closure-shaped factories don’t fit and you need direct access to the chunk/gap/EOF
lifecycle. The returned Consumer’s wait yields whatever the
visitor produces from StreamVisitor::into_output.
Sourcepub fn consume_with_async<V>(&self, visitor: V) -> Consumer<V::Output>where
V: AsyncStreamVisitor,
pub fn consume_with_async<V>(&self, visitor: V) -> Consumer<V::Output>where
V: AsyncStreamVisitor,
Drives the provided asynchronous AsyncStreamVisitor over this stream and returns a
Consumer that owns the spawned task.
Use this when observing a chunk requires .await (for example, forwarding chunks to an
async writer or channel). See consume_with for the synchronous
variant.
Sourcepub fn inspect_chunks(
&self,
f: impl FnMut(Chunk) -> Next + Send + 'static,
) -> Consumer<()>
pub fn inspect_chunks( &self, f: impl FnMut(Chunk) -> Next + Send + 'static, ) -> Consumer<()>
Inspects chunks of output from the stream without storing them.
The provided closure is called for each chunk of data. Return Next::Continue to
keep processing or Next::Break to stop.
Sourcepub fn inspect_chunks_async<Fut>(
&self,
f: impl FnMut(Chunk) -> Fut + Send + 'static,
) -> Consumer<()>
pub fn inspect_chunks_async<Fut>( &self, f: impl FnMut(Chunk) -> Fut + Send + 'static, ) -> Consumer<()>
Inspects chunks of output from the stream without storing them, using an async closure.
The provided async closure is called for each chunk of data. Return
Next::Continue to keep processing or Next::Break to stop.
Sourcepub fn inspect_lines(
&self,
f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static,
options: LineParsingOptions,
) -> Consumer<()>
pub fn inspect_lines( &self, f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static, options: LineParsingOptions, ) -> Consumer<()>
Inspects lines of output from the stream without storing them.
The provided closure is called for each line. Return Next::Continue to keep
processing or Next::Break to stop.
§Panics
Panics if options.max_line_length is zero.
Sourcepub fn inspect_lines_async<Fut>(
&self,
f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static,
options: LineParsingOptions,
) -> Consumer<()>
pub fn inspect_lines_async<Fut>( &self, f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static, options: LineParsingOptions, ) -> Consumer<()>
Inspects lines of output from the stream without storing them, using an async closure.
The provided async closure is called for each line. Return Next::Continue to
keep processing or Next::Break to stop.
§Panics
Panics if options.max_line_length is zero.
Sourcepub fn collect_chunks<S: Sink>(
&self,
into: S,
collect: impl FnMut(Chunk, &mut S) + Send + 'static,
) -> Consumer<S>
pub fn collect_chunks<S: Sink>( &self, into: S, collect: impl FnMut(Chunk, &mut S) + Send + 'static, ) -> Consumer<S>
Collects chunks from the stream into a sink.
The provided closure is called for each chunk, with mutable access to the sink.
Sourcepub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Consumer<S>where
S: Sink,
C: AsyncChunkCollector<S>,
pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Consumer<S>where
S: Sink,
C: AsyncChunkCollector<S>,
Collects chunks from the stream into a sink using an async collector.
The provided async collector is called for each chunk, with mutable access to the sink.
Sourcepub fn collect_lines<S: Sink>(
&self,
into: S,
collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static,
options: LineParsingOptions,
) -> Consumer<S>
pub fn collect_lines<S: Sink>( &self, into: S, collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static, options: LineParsingOptions, ) -> Consumer<S>
Collects lines from the stream into a sink.
The provided closure is called for each line, with mutable access to the sink.
Return Next::Continue to keep processing or Next::Break to stop.
§Panics
Panics if options.max_line_length is zero.
Sourcepub fn collect_lines_async<S, C>(
&self,
into: S,
collect: C,
options: LineParsingOptions,
) -> Consumer<S>where
S: Sink,
C: AsyncLineCollector<S>,
pub fn collect_lines_async<S, C>(
&self,
into: S,
collect: C,
options: LineParsingOptions,
) -> Consumer<S>where
S: Sink,
C: AsyncLineCollector<S>,
Collects lines from the stream into a sink using an async collector.
The provided async collector is called for each line, with mutable access to the
sink. Return Next::Continue to keep processing or Next::Break to stop.
§Panics
Panics if options.max_line_length is zero.
Sourcepub fn collect_chunks_into_vec(
&self,
options: RawCollectionOptions,
) -> Consumer<CollectedBytes>
pub fn collect_chunks_into_vec( &self, options: RawCollectionOptions, ) -> Consumer<CollectedBytes>
Convenience method to collect chunks into a bounded byte vector.
Sourcepub fn collect_lines_into_vec(
&self,
parsing_options: LineParsingOptions,
collection_options: LineCollectionOptions,
) -> Consumer<CollectedLines>
pub fn collect_lines_into_vec( &self, parsing_options: LineParsingOptions, collection_options: LineCollectionOptions, ) -> Consumer<CollectedLines>
Convenience method to collect lines into a line buffer.
parsing_options.max_line_length must be non-zero unless collection_options is
LineCollectionOptions::TrustedUnbounded.
§Panics
Panics if parsing_options.max_line_length is zero and bounded collection is used.
Sourcepub fn collect_chunks_into_write<W, H>(
&self,
write: W,
write_options: WriteCollectionOptions<H>,
) -> Consumer<Result<W, SinkWriteError>>
pub fn collect_chunks_into_write<W, H>( &self, write: W, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>
Collects chunks into an async writer.
Sink write failures are handled according to write_options. Use
WriteCollectionOptions::fail_fast to stop collection and surface the
SinkWriteError as the inner Err of the resulting Result<W, SinkWriteError>,
WriteCollectionOptions::log_and_continue to log each failure and keep
collecting, or WriteCollectionOptions::with_error_handler to make a per-error
continue-or-stop decision.
Sourcepub fn collect_lines_into_write<W, H>(
&self,
write: W,
options: LineParsingOptions,
mode: LineWriteMode,
write_options: WriteCollectionOptions<H>,
) -> Consumer<Result<W, SinkWriteError>>
pub fn collect_lines_into_write<W, H>( &self, write: W, options: LineParsingOptions, mode: LineWriteMode, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>
Collects lines into an async writer.
Parsed lines no longer include their trailing newline byte, so mode controls
whether a \n delimiter should be reintroduced for each emitted line.
Sink write failures are handled according to write_options — see
collect_chunks_into_write for the
failure-handling options.
Sourcepub fn collect_chunks_into_write_mapped<W, B, H>(
&self,
write: W,
mapper: impl Fn(Chunk) -> B + Send + Sync + 'static,
write_options: WriteCollectionOptions<H>,
) -> Consumer<Result<W, SinkWriteError>>
pub fn collect_chunks_into_write_mapped<W, B, H>( &self, write: W, mapper: impl Fn(Chunk) -> B + Send + Sync + 'static, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>
Collects chunks into an async writer after mapping them with the provided function.
Sink write failures are handled according to write_options — see
collect_chunks_into_write for the
failure-handling options.
Sourcepub fn collect_lines_into_write_mapped<W, B, H>(
&self,
write: W,
mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + 'static,
options: LineParsingOptions,
mode: LineWriteMode,
write_options: WriteCollectionOptions<H>,
) -> Consumer<Result<W, SinkWriteError>>
pub fn collect_lines_into_write_mapped<W, B, H>( &self, write: W, mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + 'static, options: LineParsingOptions, mode: LineWriteMode, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>
Collects lines into an async writer after mapping them with the provided function.
mode applies after mapper: choose LineWriteMode::AsIs when the mapped
output already contains delimiters, or LineWriteMode::AppendLf to append \n
after each mapped line.
Sink write failures are handled according to write_options — see
collect_chunks_into_write for the
failure-handling options.
Sourcepub fn wait_for_line(
&self,
timeout: Duration,
predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static,
options: LineParsingOptions,
) -> impl Future<Output = Result<WaitForLineResult, StreamReadError>> + Send + 'static
pub fn wait_for_line( &self, timeout: Duration, predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static, options: LineParsingOptions, ) -> impl Future<Output = Result<WaitForLineResult, StreamReadError>> + Send + 'static
Waits for a line that matches the given predicate within timeout.
The returned future resolves to
Ok(WaitForLineResult::Matched) if a matching line is found,
Ok(WaitForLineResult::StreamClosed) if the stream ends first, or
Ok(WaitForLineResult::Timeout) if the timeout expires first.
The stream subscription is acquired synchronously inside this method, before the
returned future is polled, so output produced between this call and the first
.await cannot race ahead of the matcher.
The waiter starts at the earliest output currently available to new consumers. With replay enabled and unsealed, that can include retained past output; otherwise it starts at live output.
When chunks are dropped in DeliveryGuarantee::BestEffort mode, this waiter discards
any partial line in progress and resynchronizes at the next newline instead of matching
across the gap.
§Errors
Returns crate::StreamReadError if the underlying stream fails while being read.
§Panics
Panics if options.max_line_length is zero.
Trait Implementations§
Source§impl<D, R> Debug for BroadcastOutputStream<D, R>
impl<D, R> Debug for BroadcastOutputStream<D, R>
Source§impl<D, R> Drop for BroadcastOutputStream<D, R>
impl<D, R> Drop for BroadcastOutputStream<D, R>
Source§impl<D, R> OutputStream for BroadcastOutputStream<D, R>
impl<D, R> OutputStream for BroadcastOutputStream<D, R>
Source§fn read_chunk_size(&self) -> NumBytes
fn read_chunk_size(&self) -> NumBytes
stream_reader.