Trait corona::prelude::CoroutineStream
[−]
[src]
pub trait CoroutineStream: Sized { type Item; type Error; fn iter_cleanup(self) -> CleanupIterator<Self>; fn extractor(&mut self) -> StreamExtractor<Self>; fn coro_next_cleanup(
&mut self
) -> Result<Result<Option<Self::Item>, Self::Error>, Dropped>; fn iter_ok(self) -> OkIterator<CleanupIterator<Self>> { ... } fn iter_result(self) -> ResultIterator<CleanupIterator<Self>> { ... } fn coro_next(&mut self) -> Result<Option<Self::Item>, Self::Error> { ... } }
An extension trait for Stream
s.
This is auto-implemented for Stream
s and adds some convenient coroutine-aware methods to
them.
Associated Types
Required Methods
fn iter_cleanup(self) -> CleanupIterator<Self>
Produces an iterator that doesn't panic on reactor drop.
This acts like iter_result
. However, the produced items are
wrapped inside another level of Result
and it returns Err(Dropped)
if the reactor is
dropped while iterating instead of panicking. This allows manual coroutine cleanup when
needed, but is probably less convenient for casual use.
Panics
If called outside of a coroutine.
fn extractor(&mut self) -> StreamExtractor<Self>
A future that pulls one item out of the stream.
This is like Stream::into_future
, but it doesn't consume and re-produce the stream.
Instead it borrows the stream mutably. Such thing is usable with coroutines, since
coroutines can easily wait on futures that are not 'static
.
Unlike the other methods here, this only builds the future, doesn't run it.
Examples
let mut core = Core::new().unwrap(); let (sender, mut receiver) = mpsc::unbounded(); sender.unbounded_send(21); // The second item is unused sender.unbounded_send(21); drop(sender); let coro = Coroutine::with_defaults(core.handle(), move || { receiver.extractor() .coro_wait() // Block until the item actually falls out .unwrap() // Unwrap the outer result .unwrap() // Unwrap the option, since it gives `Option<T>` }); assert_eq!(21, core.run(coro).unwrap());
fn coro_next_cleanup(
&mut self
) -> Result<Result<Option<Self::Item>, Self::Error>, Dropped>
&mut self
) -> Result<Result<Option<Self::Item>, Self::Error>, Dropped>
Provided Methods
fn iter_ok(self) -> OkIterator<CleanupIterator<Self>>
Produces an iterator through the successful items of the stream.
This allows iterating comfortably through the stream. It produces only the successful
items and stops when the stream terminates or when it reaches the first error. The error is
thrown away (you may want to use iter_result
if you care about the
errors).
When it waites for another item to come out of the stream, the coroutine suspends and switches to others if there are some ready.
Panics
If the reactor is dropped during the iteration, this method panics to clean up the coroutine.
It also panics when called from outside of a coroutine.
Examples
let mut core = Core::new().unwrap(); let (sender, receiver) = mpsc::unbounded(); sender.unbounded_send(21); sender.unbounded_send(21); // Make sure the channel is terminated, or it would wait forever. drop(sender); let coro = Coroutine::with_defaults(core.handle(), move || { let mut sum = 0; for num in receiver.iter_ok() { sum += num; } sum }); assert_eq!(42, core.run(coro).unwrap());
fn iter_result(self) -> ResultIterator<CleanupIterator<Self>>
Produces an iterator through results.
This is similar to iter_ok
. However, instead of terminating on errors,
the items produced by the iterator are complete Result
s. The iterator always runs to the
end of the stream (or breaked out of the for
).
Notes
In general, streams don't guarantee to be usable past their first error. So, when working
with an unknown stream, it is reasonable to break the for
on the first error. This is
similar to iter_ok
, but allows inspecting the error.
However, there are some specific streams that are usable past errors. Such example is
TcpListener::incoming
, which may signal an error accepting one connection, but then keeps
trying.
Panics
This panics when the reactor the current coroutine runs on is dropped while iterating.
It panics when called outside of a coroutine.
Examples
let mut core = Core::new().unwrap(); let (sender, receiver) = mpsc::unbounded(); sender.unbounded_send(21); sender.unbounded_send(21); // Make sure the channel is terminated, or it would wait forever. drop(sender); let coro = Coroutine::with_defaults(core.handle(), move || { let mut sum = 0; for num in receiver.iter_result() { sum += num.expect("MPSC should not error"); } sum }); assert_eq!(42, core.run(coro).unwrap());
fn coro_next(&mut self) -> Result<Option<Self::Item>, Self::Error>
Pulls one item out of the stream.
This extracts one item out of the stream, returning either the streams error or the item or
None
on end of the stream.
It blocks the current coroutine when waiting for the item to appear.
Panics
It panics when the reactor is dropped while waiting for the item.
It also panics when called outside of a coroutine.
Examples
let mut core = Core::new().unwrap(); let (sender, mut receiver) = mpsc::unbounded(); sender.unbounded_send(21); sender.unbounded_send(21); drop(sender); let coro = Coroutine::with_defaults(core.handle(), move || { let mut sum = 0; while let Some(num) = receiver.coro_next().unwrap() { sum += num; } sum }); assert_eq!(42, core.run(coro).unwrap());