pub struct Receiver<T> { /* private fields */ }
Implementations§
Source§impl<T> Receiver<T>
impl<T> Receiver<T>
Sourcepub fn observe<F, R>(&self, f: F) -> R
pub fn observe<F, R>(&self, f: F) -> R
Observes the latest value sent to the channel.
This method takes a closure and calls it with a reference to the value.
While the closure is running send
calls are blocked.
Because of this, the closure should run only as long as needed.
A common pattern is to copy or clone the value inside the closure, then
return and work with the copy outside.
You can observe the value at any time, but usually you want to wait
until it changes. For that, use the changed
async method.
§Examples
let (tx, mut rx) = async_observe::channel(0);
// Send a new value
tx.send(1);
// Wait until the value changes
rx.changed().await?;
// Now we can read the new value
let n = rx.observe(|n| *n);
assert_eq!(n, 1);
If the value type implements Clone
, you can use
recv
instead, which waits for a change and returns
the new value.
let (tx, mut rx) = async_observe::channel(0);
// Send a new value
tx.send(1);
// Wait until the value changes and read it
let n = rx.recv().await?;
assert_eq!(n, 1);
§Possible deadlock
Calling send
inside the closure will deadlock:
let (tx, rx) = async_observe::channel(0);
rx.observe(|n| {
_ = tx.send(n + 1);
});
Sourcepub async fn changed(&mut self) -> Result<(), RecvError>
pub async fn changed(&mut self) -> Result<(), RecvError>
Waits for the value to change.
Call observe
to read the new value.
Sourcepub async fn recv(&mut self) -> Result<T, RecvError>where
T: Clone,
pub async fn recv(&mut self) -> Result<T, RecvError>where
T: Clone,
Waits for the value to change and then returns a clone of it.
§Examples
let (tx, mut rx) = async_observe::channel(0);
// Send a new value
tx.send(1);
// Wait until the value changes and read it
let n = rx.recv().await?;
assert_eq!(n, 1);
Sourcepub fn into_stream(self) -> impl Stream<Item = T>where
T: Clone,
pub fn into_stream(self) -> impl Stream<Item = T>where
T: Clone,
Creates a stream from the receiver.
The stream ends when the Sender
is dropped.
§Examples
use futures_lite::{StreamExt, future};
let (tx, rx) = async_observe::channel(0);
let producer = async move {
// ^^^^
// Move tx into the future so it is dropped after the loop ends.
// Dropping the sender is important so the receiver can
// see it and stop the stream.
for n in 1..10 {
wait_long_time().await;
_ = tx.send(n);
}
};
// Create a stream from the receiver
let consumer = rx
.into_stream()
.for_each(|n| println!("{n}"));
future::zip(producer, consumer).await;
§Return type
Due to implementation details, the stream currently does not have a named type. For example, you cannot store this stream as a struct field (without a generic) or use it as an associated type in a trait implementation. As long as you don’t need that, just use the returned anonymous type directly.
However, if you need a named type, one solution is to box the stream:
use {
async_observe::Receiver,
futures_lite::{StreamExt, stream::Boxed},
};
fn named_stream(rx: Receiver<u32>) -> Boxed<u32> {
rx.into_stream().boxed()
}