Struct corona::Producer
[−]
[src]
pub struct Producer<'a, I: 'static> { /* fields omitted */ }
This is an extended Await
with ability to items.
Just like an ordinary coroutine returns produces a single return value when it finishes and can
suspend its execution using the Await
parameter, a generator can do all
this and, in addition, produce a serie of items of a given type through this parameter.
See Coroutine::generator
.
Methods
impl<'a, I: 'static> Producer<'a, I>
[src]
fn new(
await: &'a Await<'a>,
sink: ChannelSender<Result<I, Box<Any + Send + 'static>>>
) -> Self
await: &'a Await<'a>,
sink: ChannelSender<Result<I, Box<Any + Send + 'static>>>
) -> Self
Creates a new producer.
While the usual way to get a producer is through the
Coroutine::generator
, it is also possible to
create one manually, from an Await
and a channel sender of the right
type.
fn produce(&self, item: I)
Pushes another value through the internal channel, effectively sending it to another coroutine.
This takes a value and pushes it through a channel to another coroutine. It may suspend the execution of the current coroutine and yield to another one.
The same notes and panics as with the future
method
apply.
Methods from Deref<Target = Await<'a>>
fn handle(&self) -> &Handle
Accesses the handle to the corresponding reactor core.
This is simply a convenience method, since it is possible to get the handle explicitly into
every place where this can be used. But it is convenient not to have to pass another
variable and the Await
and the handle are usually used together.
fn future<I, E, Fut>(&self, fut: Fut) -> Result<I, E> where
I: 'static,
E: 'static,
Fut: Future<Item = I, Error = E> + 'static,
I: 'static,
E: 'static,
Fut: Future<Item = I, Error = E> + 'static,
Blocks the current coroutine until the future resolves.
This blocks or parks the current coroutine (and lets other coroutines run) until the provided future completes. The result of the coroutine is returned.
Notes
For the switching between coroutines to work, the reactor must be running.
Panics
This may panic if the reactor core is dropped before the waited-for future resolves. The panic is meant to unwind the coroutine's stack so all the memory can be cleaned up.
Examples
use std::time::Duration; use corona::Coroutine; use futures::Future; use tokio_core::reactor::{Core, Timeout}; let mut core = Core::new().unwrap(); let coroutine = Coroutine::with_defaults(core.handle(), |await| { let timeout = Timeout::new(Duration::from_millis(100), await.handle()).unwrap(); await.future(timeout).unwrap(); }); core.run(coroutine).unwrap();
fn future_cleanup<I, E, Fut>(&self, fut: Fut) -> Result<Result<I, E>, Dropped> where
I: 'static,
E: 'static,
Fut: Future<Item = I, Error = E> + 'static,
I: 'static,
E: 'static,
Fut: Future<Item = I, Error = E> + 'static,
Blocks the current coroutine, just like future
, but doesn't panic.
This works similar to the future
method. However, it signals if the reactor Core
has
been destroyed by returning Err(Dropped)
instead of panicking. This can be used to
manually clean up the coroutine instead of letting a panic do that.
This is important especially in cases when clean shutdown is needed even when the Core
in
the main coroutine is destroyed during a panic, since the future
method either causes a
double panic (making the program abort) or doesn't do any cleanup at all, depending on the
configuration.
fn stream<I, E, S>(&self, stream: S) -> StreamIterator<I, E, S> where
S: Stream<Item = I, Error = E> + 'static,
I: 'static,
E: 'static,
S: Stream<Item = I, Error = E> + 'static,
I: 'static,
E: 'static,
Blocks the current coroutine to get each element of the stream.
This acts in a very similar way as the future
method. The difference is
it acts on a stream and produces an iterator instead of a single result. Therefore it may
(and usually will) switch the coroutines more than once.
Similar notes as with the future
apply.
Panics
The same ones as with the future
method.
Examples
use corona::Coroutine; use futures::stream; use tokio_core::reactor::Core; let mut core = Core::new().unwrap(); let coroutine = Coroutine::with_defaults(core.handle(), |await| { let stream = stream::empty::<(), ()>(); for item in await.stream(stream) { // Streams can contain errors, so it iterates over `Result`s. let item = item.unwrap(); // Process them here } }); core.run(coroutine).unwrap();
fn stream_cleanup<I, E, S>(&self, stream: S) -> StreamCleanupIterator<I, E, S> where
S: Stream<Item = I, Error = E> + 'static,
I: 'static,
E: 'static,
S: Stream<Item = I, Error = E> + 'static,
I: 'static,
E: 'static,
Blocks the current coroutine to get each element of the stream.
This is the same as the stream
method, but it doesn't panic when the
core is dropped and the coroutine's stack needs to be cleaned up. Instead it returns
Err(Dropped)
and leaves the cleanup to the caller.
The advantage is this works even in case the reactor core is dropped during a panic. See
Coroutine::leak_on_panic
for more details.
fn yield_now(&self)
Switches to another coroutine.
This allows another coroutine to run. However, if there's no other coroutine ready to run,
this may terminate right away and continue execution. Also, it does not guarantee getting
more external events -- the other coroutines can do work on the data that is already
received, for example, but network events will probably arrive only after the coroutine
really waits on something or terminates. Therefore, doing CPU-intensive work in a coroutine
and repeatedly call yield_now
is not guaranteed to work well. Use a separate thread or
something like the futures-cpupool
crate to
off-load the heavy work.
fn yield_now_cleanup(&self) -> Result<(), Dropped>
Switches to another coroutine.
This is the same as yield_now
, but instead of panicking when the
reactor core is dropped, it returns Err(Dropped)
.