pub struct Consumer<S: Sink> { /* private fields */ }Expand description
A handle for a tokio task that consumes a stream by driving a visitor over its events.
Consumers are produced by the inspect_*, collect_*, and wait_for_line factory methods on
BroadcastOutputStream and
SingleSubscriberOutputStream. The type parameter S
is the visitor’s output (a sink, a writer, (), or another value the visitor returns when the
stream ends).
For proper cleanup, call
wait(), which waits for the consumer task to complete.cancel(timeout), which asks the consumer to stop, waits for cooperative completion, and aborts the task if the timeout elapses first.abort(), which forcefully aborts the consumer task.
If not cleaned up, the termination signal will be sent when dropping this consumer, but the task will be aborted (forceful, not waiting for its regular completion).
Implementations§
Source§impl<S: Sink> Consumer<S>
impl<S: Sink> Consumer<S>
Sourcepub fn is_finished(&self) -> bool
pub fn is_finished(&self) -> bool
Sourcepub async fn wait(self) -> Result<S, ConsumerError>
pub async fn wait(self) -> Result<S, ConsumerError>
Waits for the consumer to terminate naturally and returns its sink.
A consumer will automatically terminate when either:
- The underlying write-side of the stream is dropped.
- The underlying stream is closed (by receiving an EOF / final read of 0 bytes).
- The first
Next::Breakis observed.
If none of these may occur in your case, this can hang forever. wait also waits for any
in-flight async visitor callback or writer call to complete.
The stdout/stderr streams naturally close when the process is terminated, so waiting
on a consumer after termination is fine:
let mut process = Process::new(cmd)
.name(AutoName::program_only())
.stdout_and_stderr(|stream| {
stream
.broadcast()
.best_effort_delivery()
.no_replay()
.read_chunk_size(DEFAULT_READ_CHUNK_SIZE)
.max_buffered_chunks(DEFAULT_MAX_BUFFERED_CHUNKS)
})
.spawn()
.unwrap();
let consumer = process.stdout().collect_lines_into_vec(
LineParsingOptions::default(),
LineCollectionOptions::Bounded {
max_bytes: 1.megabytes(),
max_lines: 1024,
overflow_behavior: CollectionOverflowBehavior::DropAdditionalData,
},
);
process.terminate(Duration::from_secs(1), Duration::from_secs(1)).await.unwrap();
let collected = consumer.wait().await.unwrap(); // This will return immediately.§Errors
Returns ConsumerError::TaskJoin if the consumer task cannot be joined, or
ConsumerError::StreamRead if the underlying stream fails while being read.
Visitor-specific outcomes (e.g. a writer-backed visitor’s sink failure) appear inside
the returned S, not in ConsumerError.
§Panics
Panics if the consumer’s internal task has already been taken.
Sourcepub async fn abort(self)
pub async fn abort(self)
Forcefully aborts the consumer task.
This drops any pending async visitor callback or writer future, releases the stream subscription, and drops the sink/writer instead of returning it. It cannot preempt blocking synchronous code that never yields to the async runtime.
For single-subscriber streams, the consumer claim is released after the aborted task has been joined during this method.
Sourcepub async fn cancel(
self,
timeout: Duration,
) -> Result<ConsumerCancelOutcome<S>, ConsumerError>
pub async fn cancel( self, timeout: Duration, ) -> Result<ConsumerCancelOutcome<S>, ConsumerError>
Cooperatively cancels the consumer, aborting it if timeout elapses first.
Returns ConsumerCancelOutcome::Cancelled with the sink when the consumer observes
cancellation and exits normally before the timeout. Returns
ConsumerCancelOutcome::Aborted when the timeout elapses; in that case the task is
aborted, any pending callback/write future is dropped, and the sink/writer is not returned.
Cancellation is still cooperative until the timeout boundary: an in-flight async callback or writer call must finish before cancellation can be observed. For single-subscriber streams, the consumer claim is released before this method returns, both after successful cooperative cancellation and after timeout-driven abort.
§Errors
Returns ConsumerError::TaskJoin if the consumer task cannot be joined before the
timeout, or ConsumerError::StreamRead if the underlying stream fails while being read
before cancellation is observed. Visitor-specific outcomes appear inside the returned
S (carried by ConsumerCancelOutcome::Cancelled).
§Panics
Panics if the consumer’s internal cancellation sender has already been taken.