Skip to main content

Consumer

Struct Consumer 

Source
pub struct Consumer<S: Sink> { /* private fields */ }
Expand description

A handle for a tokio task that consumes a stream by driving a visitor over its events.

Consumers are produced by the inspect_*, collect_*, and wait_for_line factory methods on BroadcastOutputStream and SingleSubscriberOutputStream. The type parameter S is the visitor’s output (a sink, a writer, (), or another value the visitor returns when the stream ends).

For proper cleanup, call

  • wait(), which waits for the consumer task to complete.
  • cancel(timeout), which asks the consumer to stop, waits for cooperative completion, and aborts the task if the timeout elapses first.
  • abort(), which forcefully aborts the consumer task.

If not cleaned up, the termination signal will be sent when dropping this consumer, but the task will be aborted (forceful, not waiting for its regular completion).

Implementations§

Source§

impl<S: Sink> Consumer<S>

Source

pub fn is_finished(&self) -> bool

Returns whether the consumer task has finished.

This is a non-blocking task-state check. A finished consumer still owns its task result until wait, cancel, or abort consumes it.

Source

pub async fn wait(self) -> Result<S, ConsumerError>

Waits for the consumer to terminate naturally and returns its sink.

A consumer will automatically terminate when either:

  1. The underlying write-side of the stream is dropped.
  2. The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
  3. The first Next::Break is observed.

If none of these may occur in your case, this can hang forever. wait also waits for any in-flight async visitor callback or writer call to complete.

The stdout/stderr streams naturally close when the process is terminated, so waiting on a consumer after termination is fine:


let mut process = Process::new(cmd)
    .name(AutoName::program_only())
    .stdout_and_stderr(|stream| {
        stream
            .broadcast()
            .best_effort_delivery()
            .no_replay()
            .read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
            .max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
    })
    .spawn()
    .unwrap();
let consumer = process.stdout().collect_lines_into_vec(
    LineParsingOptions::default(),
    LineCollectionOptions::Bounded {
        max_bytes: 1.megabytes(),
        max_lines: 1024,
        overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
    },
);
process.terminate(Duration::from_secs(1), Duration::from_secs(1)).await.unwrap();
let collected = consumer.wait().await.unwrap(); // This will return immediately.
§Errors

Returns ConsumerError::TaskJoin if the consumer task cannot be joined, or ConsumerError::StreamRead if the underlying stream fails while being read. Visitor-specific outcomes (e.g. a writer-backed visitor’s sink failure) appear inside the returned S, not in ConsumerError.

§Panics

Panics if the consumer’s internal task has already been taken.

Source

pub async fn abort(self)

Forcefully aborts the consumer task.

This drops any pending async visitor callback or writer future, releases the stream subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking synchronous code that never yields to the async runtime.

For single-subscriber streams, the consumer claim is released after the aborted task has been joined during this method.

Source

pub async fn cancel( self, timeout: Duration, ) -> Result<ConsumerCancelOutcome<S>, ConsumerError>

Cooperatively cancels the consumer, aborting it if timeout elapses first.

Returns ConsumerCancelOutcome::Cancelled with the sink when the consumer observes cancellation and exits normally before the timeout. Returns ConsumerCancelOutcome::Aborted when the timeout elapses; in that case the task is aborted, any pending callback/write future is dropped, and the sink/writer is not returned.

Cancellation is still cooperative until the timeout boundary: an in-flight async callback or writer call must finish before cancellation can be observed. For single-subscriber streams, the consumer claim is released before this method returns, both after successful cooperative cancellation and after timeout-driven abort.

§Errors

Returns ConsumerError::TaskJoin if the consumer task cannot be joined before the timeout, or ConsumerError::StreamRead if the underlying stream fails while being read before cancellation is observed. Visitor-specific outcomes appear inside the returned S (carried by ConsumerCancelOutcome::Cancelled).

§Panics

Panics if the consumer’s internal cancellation sender has already been taken.

Trait Implementations§

Source§

impl<S: Sink> Drop for Consumer<S>

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<S> Freeze for Consumer<S>

§

impl<S> !RefUnwindSafe for Consumer<S>

§

impl<S> Send for Consumer<S>

§

impl<S> Sync for Consumer<S>

§

impl<S> Unpin for Consumer<S>

§

impl<S> UnsafeUnpin for Consumer<S>

§

impl<S> !UnwindSafe for Consumer<S>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Sink for T
where T: Send + 'static,