Skip to main content

BroadcastOutputStream

Struct BroadcastOutputStream 

Source
pub struct BroadcastOutputStream<D = BestEffortDelivery, R = NoReplay>
where D: Delivery, R: Replay,
{ /* private fields */ }
Expand description

The output stream from a process using a multi-consumer broadcast backend.

Broadcast streams support multiple consumers and can optionally retain replay history for consumers that attach after output has already arrived. Use this backend when the same stream needs concurrent fanout, such as logging plus readiness checks or logging plus collection. Delivery policy still determines whether slow active consumers observe gaps or apply backpressure.

Implementations§

Source§

impl<D, R> BroadcastOutputStream<D, R>
where D: Delivery, R: Replay,

Source

pub fn from_stream<S: AsyncRead + Unpin + Send + 'static>( stream: S, stream_name: &'static str, options: StreamConfig<D, R>, ) -> Self

Creates a new broadcast output stream from an async read stream and typed stream config.

Source§

impl<D> BroadcastOutputStream<D, ReplayEnabled>
where D: Delivery,

Source

pub fn seal_replay(&self)

Seals replay history for future subscribers.

This is a one-way, idempotent operation. Active subscribers keep their unread tail data according to the configured delivery policy.

§Panics

Panics if the internal state mutex is poisoned.

Source

pub fn is_replay_sealed(&self) -> bool

Returns true once replay history has been sealed.

§Panics

Panics if the internal state mutex is poisoned.

Source§

impl<D, R> BroadcastOutputStream<D, R>
where D: Delivery, R: Replay,

Source

pub fn consume_with<V>(&self, visitor: V) -> Consumer<V::Output>
where V: StreamVisitor,

Drives the provided synchronous StreamVisitor over this stream and returns a Consumer that owns the spawned task.

All built-in inspect_*, collect_*, and wait_for_line factories construct a built-in visitor and call this method internally; reach for consume_with when the closure-shaped factories don’t fit and you need direct access to the chunk/gap/EOF lifecycle. The returned Consumer’s wait yields whatever the visitor produces from StreamVisitor::into_output.

Source

pub fn consume_with_async<V>(&self, visitor: V) -> Consumer<V::Output>

Drives the provided asynchronous AsyncStreamVisitor over this stream and returns a Consumer that owns the spawned task.

Use this when observing a chunk requires .await (for example, forwarding chunks to an async writer or channel). See consume_with for the synchronous variant.

Source

pub fn inspect_chunks( &self, f: impl FnMut(Chunk) -> Next + Send + 'static, ) -> Consumer<()>

Inspects chunks of output from the stream without storing them.

The provided closure is called for each chunk of data. Return Next::Continue to keep processing or Next::Break to stop.

Source

pub fn inspect_chunks_async<Fut>( &self, f: impl FnMut(Chunk) -> Fut + Send + 'static, ) -> Consumer<()>
where Fut: Future<Output = Next> + Send + 'static,

Inspects chunks of output from the stream without storing them, using an async closure.

The provided async closure is called for each chunk of data. Return Next::Continue to keep processing or Next::Break to stop.

Source

pub fn inspect_lines( &self, f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static, options: LineParsingOptions, ) -> Consumer<()>

Inspects lines of output from the stream without storing them.

The provided closure is called for each line. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn inspect_lines_async<Fut>( &self, f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static, options: LineParsingOptions, ) -> Consumer<()>
where Fut: Future<Output = Next> + Send + 'static,

Inspects lines of output from the stream without storing them, using an async closure.

The provided async closure is called for each line. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn collect_chunks<S: Sink>( &self, into: S, collect: impl FnMut(Chunk, &mut S) + Send + 'static, ) -> Consumer<S>

Collects chunks from the stream into a sink.

The provided closure is called for each chunk, with mutable access to the sink.

Source

pub fn collect_chunks_async<S, C>(&self, into: S, collect: C) -> Consumer<S>
where S: Sink, C: AsyncChunkCollector<S>,

Collects chunks from the stream into a sink using an async collector.

The provided async collector is called for each chunk, with mutable access to the sink.

Source

pub fn collect_lines<S: Sink>( &self, into: S, collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static, options: LineParsingOptions, ) -> Consumer<S>

Collects lines from the stream into a sink.

The provided closure is called for each line, with mutable access to the sink. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn collect_lines_async<S, C>( &self, into: S, collect: C, options: LineParsingOptions, ) -> Consumer<S>
where S: Sink, C: AsyncLineCollector<S>,

Collects lines from the stream into a sink using an async collector.

The provided async collector is called for each line, with mutable access to the sink. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn collect_chunks_into_vec( &self, options: RawCollectionOptions, ) -> Consumer<CollectedBytes>

Convenience method to collect chunks into a bounded byte vector.

Source

pub fn collect_lines_into_vec( &self, parsing_options: LineParsingOptions, collection_options: LineCollectionOptions, ) -> Consumer<CollectedLines>

Convenience method to collect lines into a line buffer.

parsing_options.max_line_length must be non-zero unless collection_options is LineCollectionOptions::TrustedUnbounded.

§Panics

Panics if parsing_options.max_line_length is zero and bounded collection is used.

Source

pub fn collect_chunks_into_write<W, H>( &self, write: W, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>

Collects chunks into an async writer.

Sink write failures are handled according to write_options. Use WriteCollectionOptions::fail_fast to stop collection and surface the SinkWriteError as the inner Err of the resulting Result<W, SinkWriteError>, WriteCollectionOptions::log_and_continue to log each failure and keep collecting, or WriteCollectionOptions::with_error_handler to make a per-error continue-or-stop decision.

Source

pub fn collect_lines_into_write<W, H>( &self, write: W, options: LineParsingOptions, mode: LineWriteMode, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>

Collects lines into an async writer.

Parsed lines no longer include their trailing newline byte, so mode controls whether a \n delimiter should be reintroduced for each emitted line.

Sink write failures are handled according to write_options — see collect_chunks_into_write for the failure-handling options.

Source

pub fn collect_chunks_into_write_mapped<W, B, H>( &self, write: W, mapper: impl Fn(Chunk) -> B + Send + Sync + 'static, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>
where W: Sink + AsyncWriteExt + Unpin, B: AsRef<[u8]> + Send + 'static, H: SinkWriteErrorHandler,

Collects chunks into an async writer after mapping them with the provided function.

Sink write failures are handled according to write_options — see collect_chunks_into_write for the failure-handling options.

Source

pub fn collect_lines_into_write_mapped<W, B, H>( &self, write: W, mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + 'static, options: LineParsingOptions, mode: LineWriteMode, write_options: WriteCollectionOptions<H>, ) -> Consumer<Result<W, SinkWriteError>>
where W: Sink + AsyncWriteExt + Unpin, B: AsRef<[u8]> + Send + 'static, H: SinkWriteErrorHandler,

Collects lines into an async writer after mapping them with the provided function.

mode applies after mapper: choose LineWriteMode::AsIs when the mapped output already contains delimiters, or LineWriteMode::AppendLf to append \n after each mapped line.

Sink write failures are handled according to write_options — see collect_chunks_into_write for the failure-handling options.

Source

pub fn wait_for_line( &self, timeout: Duration, predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static, options: LineParsingOptions, ) -> impl Future<Output = Result<WaitForLineResult, StreamReadError>> + Send + 'static

Waits for a line that matches the given predicate within timeout.

The returned future resolves to Ok(WaitForLineResult::Matched) if a matching line is found, Ok(WaitForLineResult::StreamClosed) if the stream ends first, or Ok(WaitForLineResult::Timeout) if the timeout expires first.

The stream subscription is acquired synchronously inside this method, before the returned future is polled, so output produced between this call and the first .await cannot race ahead of the matcher.

The waiter starts at the earliest output currently available to new consumers. With replay enabled and unsealed, that can include retained past output; otherwise it starts at live output.

When chunks are dropped in DeliveryGuarantee::BestEffort mode, this waiter discards any partial line in progress and resynchronizes at the next newline instead of matching across the gap.

§Errors

Returns crate::StreamReadError if the underlying stream fails while being read.

§Panics

Panics if options.max_line_length is zero.

Trait Implementations§

Source§

impl<D, R> Debug for BroadcastOutputStream<D, R>
where D: Delivery + Debug, R: Replay + Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<D, R> Drop for BroadcastOutputStream<D, R>
where D: Delivery, R: Replay,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<D, R> OutputStream for BroadcastOutputStream<D, R>
where D: Delivery, R: Replay,

Source§

fn read_chunk_size(&self) -> NumBytes

The maximum size of every chunk read by the backing stream_reader.
Source§

fn max_buffered_chunks(&self) -> usize

The number of chunks held by the underlying async channel.
Source§

fn name(&self) -> &'static str

Type of stream. Can be “stdout” or “stderr”. But we do not guarantee this output. It should only be used for logging/debugging.
Source§

impl<D, R> TrySubscribable for BroadcastOutputStream<D, R>
where D: Delivery, R: Replay,

Source§

fn try_subscribe(&self) -> Result<impl Subscription, StreamConsumerError>

Creates a new subscription for a consumer, or returns why the consumer cannot be started. Read more

Auto Trait Implementations§

§

impl<D, R> Freeze for BroadcastOutputStream<D, R>
where D: Freeze, R: Freeze,

§

impl<D, R> RefUnwindSafe for BroadcastOutputStream<D, R>

§

impl<D, R> Send for BroadcastOutputStream<D, R>

§

impl<D, R> Sync for BroadcastOutputStream<D, R>

§

impl<D, R> Unpin for BroadcastOutputStream<D, R>
where D: Unpin, R: Unpin,

§

impl<D, R> UnsafeUnpin for BroadcastOutputStream<D, R>
where D: UnsafeUnpin, R: UnsafeUnpin,

§

impl<D, R> UnwindSafe for BroadcastOutputStream<D, R>
where D: UnwindSafe, R: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Sink for T
where T: Send + 'static,