Skip to main content

SingleSubscriberOutputStream

Struct SingleSubscriberOutputStream 

Source
pub struct SingleSubscriberOutputStream<D = BestEffortDelivery, R = NoReplay>
where D: Delivery, R: Replay,
{ /* private fields */ }
Expand description

The output stream from a process. Either representing stdout or stderr.

This is the single-subscriber variant, allowing exactly one active consumer at a time. It is useful when one inspector, collector, or line waiter should own the stream and accidental concurrent fanout should be rejected early. It can reduce coordination overhead in some single-consumer paths, but it is not a categorical throughput replacement for broadcast. If multiple concurrent consumers are required, use the output_stream::backend::broadcast::BroadcastOutputStream.

Implementations§

Source§

impl<D, R> SingleSubscriberOutputStream<D, R>
where D: Delivery, R: Replay,

Source

pub fn from_stream<S>( stream: S, stream_name: &'static str, options: StreamConfig<D, R>, ) -> Self
where S: AsyncRead + Unpin + Send + 'static,

Creates a new single-subscriber output stream from an async read stream and typed stream config.

Source

pub fn replay_enabled(&self) -> bool

Returns whether replay-specific APIs are enabled for this stream.

Source

pub fn replay_retention(&self) -> Option<ReplayRetention>

Returns the configured replay retention.

Source§

impl<D> SingleSubscriberOutputStream<D, ReplayEnabled>
where D: Delivery,

Source

pub fn seal_replay(&self)

Seals replay history for future subscribers.

This is a one-way, idempotent operation.

§Panics

Panics if the internal state mutex is poisoned.

Source

pub fn is_replay_sealed(&self) -> bool

Returns true once replay history has been sealed.

§Panics

Panics if the internal state mutex is poisoned.

Source§

impl<D, R> SingleSubscriberOutputStream<D, R>
where D: Delivery, R: Replay,

Source

pub fn consume_with<V>( &self, visitor: V, ) -> Result<Consumer<V::Output>, StreamConsumerError>
where V: StreamVisitor,

Tries to drive the provided synchronous StreamVisitor over this stream.

Returns a Consumer that owns the spawned task driving the visitor. All built-in inspect_*, collect_*, and wait_for_line factories construct a built-in visitor and call this method internally.

§Errors

Returns StreamConsumerError if the backend rejects the consumer (single-subscriber streams allow only one active consumer or line waiter at a time).

Source

pub fn consume_with_async<V>( &self, visitor: V, ) -> Result<Consumer<V::Output>, StreamConsumerError>

Tries to drive the provided asynchronous AsyncStreamVisitor over this stream.

§Errors

Returns StreamConsumerError if the backend rejects the consumer.

Source

pub fn inspect_chunks( &self, f: impl FnMut(Chunk) -> Next + Send + 'static, ) -> Result<Consumer<()>, StreamConsumerError>

Inspects chunks of output from the stream without storing them.

The provided closure is called for each chunk of data. Return Next::Continue to keep processing or Next::Break to stop.

Source

pub fn inspect_chunks_async<Fut>( &self, f: impl FnMut(Chunk) -> Fut + Send + 'static, ) -> Result<Consumer<()>, StreamConsumerError>
where Fut: Future<Output = Next> + Send + 'static,

Inspects chunks of output from the stream without storing them, using an async closure.

The provided async closure is called for each chunk of data. Return Next::Continue to keep processing or Next::Break to stop.

Source

pub fn inspect_lines( &self, f: impl FnMut(Cow<'_, str>) -> Next + Send + 'static, options: LineParsingOptions, ) -> Result<Consumer<()>, StreamConsumerError>

Inspects lines of output from the stream without storing them.

The provided closure is called for each line. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn inspect_lines_async<Fut>( &self, f: impl FnMut(Cow<'_, str>) -> Fut + Send + 'static, options: LineParsingOptions, ) -> Result<Consumer<()>, StreamConsumerError>
where Fut: Future<Output = Next> + Send + 'static,

Inspects lines of output from the stream without storing them, using an async closure.

The provided async closure is called for each line. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn collect_chunks<S: Sink>( &self, into: S, collect: impl FnMut(Chunk, &mut S) + Send + 'static, ) -> Result<Consumer<S>, StreamConsumerError>

Collects chunks from the stream into a sink.

The provided closure is called for each chunk, with mutable access to the sink.

Source

pub fn collect_chunks_async<S, C>( &self, into: S, collect: C, ) -> Result<Consumer<S>, StreamConsumerError>
where S: Sink, C: AsyncChunkCollector<S>,

Collects chunks from the stream into a sink using an async collector.

The provided async collector is called for each chunk, with mutable access to the sink.

Source

pub fn collect_lines<S: Sink>( &self, into: S, collect: impl FnMut(Cow<'_, str>, &mut S) -> Next + Send + 'static, options: LineParsingOptions, ) -> Result<Consumer<S>, StreamConsumerError>

Collects lines from the stream into a sink.

The provided closure is called for each line, with mutable access to the sink. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn collect_lines_async<S, C>( &self, into: S, collect: C, options: LineParsingOptions, ) -> Result<Consumer<S>, StreamConsumerError>
where S: Sink, C: AsyncLineCollector<S>,

Collects lines from the stream into a sink using an async collector.

The provided async collector is called for each line, with mutable access to the sink. Return Next::Continue to keep processing or Next::Break to stop.

§Panics

Panics if options.max_line_length is zero.

Source

pub fn collect_chunks_into_vec( &self, options: RawCollectionOptions, ) -> Result<Consumer<CollectedBytes>, StreamConsumerError>

Convenience method to collect chunks into a bounded byte vector.

Source

pub fn collect_lines_into_vec( &self, parsing_options: LineParsingOptions, collection_options: LineCollectionOptions, ) -> Result<Consumer<CollectedLines>, StreamConsumerError>

Convenience method to collect lines into a line buffer.

parsing_options.max_line_length must be non-zero unless collection_options is LineCollectionOptions::TrustedUnbounded.

§Panics

Panics if parsing_options.max_line_length is zero and bounded collection is used.

Source

pub fn collect_chunks_into_write<W, H>( &self, write: W, write_options: WriteCollectionOptions<H>, ) -> Result<Consumer<Result<W, SinkWriteError>>, StreamConsumerError>

Collects chunks into an async writer.

Sink write failures are handled according to write_options. Use WriteCollectionOptions::fail_fast to stop collection and surface the SinkWriteError as the inner Err of the resulting Result<W, SinkWriteError>, WriteCollectionOptions::log_and_continue to log each failure and keep collecting, or WriteCollectionOptions::with_error_handler to make a per-error continue-or-stop decision.

Source

pub fn collect_lines_into_write<W, H>( &self, write: W, options: LineParsingOptions, mode: LineWriteMode, write_options: WriteCollectionOptions<H>, ) -> Result<Consumer<Result<W, SinkWriteError>>, StreamConsumerError>

Collects lines into an async writer.

Parsed lines no longer include their trailing newline byte, so mode controls whether a \n delimiter should be reintroduced for each emitted line.

Sink write failures are handled according to write_options — see collect_chunks_into_write for the failure-handling options.

Source

pub fn collect_chunks_into_write_mapped<W, B, H>( &self, write: W, mapper: impl Fn(Chunk) -> B + Send + Sync + 'static, write_options: WriteCollectionOptions<H>, ) -> Result<Consumer<Result<W, SinkWriteError>>, StreamConsumerError>
where W: Sink + AsyncWriteExt + Unpin, B: AsRef<[u8]> + Send + 'static, H: SinkWriteErrorHandler,

Collects chunks into an async writer after mapping them with the provided function.

Sink write failures are handled according to write_options — see collect_chunks_into_write for the failure-handling options.

Source

pub fn collect_lines_into_write_mapped<W, B, H>( &self, write: W, mapper: impl Fn(Cow<'_, str>) -> B + Send + Sync + 'static, options: LineParsingOptions, mode: LineWriteMode, write_options: WriteCollectionOptions<H>, ) -> Result<Consumer<Result<W, SinkWriteError>>, StreamConsumerError>
where W: Sink + AsyncWriteExt + Unpin, B: AsRef<[u8]> + Send + 'static, H: SinkWriteErrorHandler,

Collects lines into an async writer after mapping them with the provided function.

mode applies after mapper: choose LineWriteMode::AsIs when the mapped output already contains delimiters, or LineWriteMode::AppendLf to append \n after each mapped line.

Sink write failures are handled according to write_options — see collect_chunks_into_write for the failure-handling options.

Source

pub fn wait_for_line( &self, timeout: Duration, predicate: impl Fn(Cow<'_, str>) -> bool + Send + Sync + 'static, options: LineParsingOptions, ) -> Result<impl Future<Output = Result<WaitForLineResult, StreamReadError>> + Send + 'static, StreamConsumerError>

Tries to wait for a line that matches the given predicate within timeout.

§Errors

Returns StreamConsumerError if the backend rejects the line waiter.

§Panics

Panics if options.max_line_length is zero.

Trait Implementations§

Source§

impl<D, R> Debug for SingleSubscriberOutputStream<D, R>
where D: Delivery + Debug, R: Replay + Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<D, R> Drop for SingleSubscriberOutputStream<D, R>
where D: Delivery, R: Replay,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<D, R> OutputStream for SingleSubscriberOutputStream<D, R>
where D: Delivery, R: Replay,

Source§

fn read_chunk_size(&self) -> NumBytes

The maximum size of every chunk read by the backing stream_reader.
Source§

fn max_buffered_chunks(&self) -> usize

The number of chunks held by the underlying async channel.
Source§

fn name(&self) -> &'static str

Type of stream. Can be “stdout” or “stderr”. But we do not guarantee this output. It should only be used for logging/debugging.
Source§

impl<D, R> TrySubscribable for SingleSubscriberOutputStream<D, R>
where D: Delivery, R: Replay,

Source§

fn try_subscribe(&self) -> Result<impl Subscription, StreamConsumerError>

Creates a new subscription for a consumer, or returns why the consumer cannot be started. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Sink for T
where T: Send + 'static,