pub struct Receiver<T> { /* private fields */ }Implementations§
Source§impl<T> Receiver<T>
impl<T> Receiver<T>
Sourcepub fn observe<F, R>(&self, f: F) -> R
pub fn observe<F, R>(&self, f: F) -> R
Observes the latest value sent to the channel.
This method takes a closure and calls it with a reference to the value.
While the closure is running send calls are blocked.
Because of this, the closure should run only as long as needed.
A common pattern is to copy or clone the value inside the closure, then
return and work with the copy outside.
You can observe the value at any time, but usually you want to wait
until it changes. For that, use the changed
async method.
§Examples
let (tx, mut rx) = async_observe::channel(0);
// Send a new value
tx.send(1);
// Wait until the value changes
rx.changed().await?;
// Now we can read the new value
let n = rx.observe(|n| *n);
assert_eq!(n, 1);If the value type implements Clone, you can use
recv instead, which waits for a change and returns
the new value.
let (tx, mut rx) = async_observe::channel(0);
// Send a new value
tx.send(1);
// Wait until the value changes and read it
let n = rx.recv().await?;
assert_eq!(n, 1);§Possible deadlock
Calling send inside the closure will deadlock:
let (tx, rx) = async_observe::channel(0);
rx.observe(|n| {
_ = tx.send(n + 1);
});Sourcepub async fn changed(&mut self) -> Result<(), RecvError>
pub async fn changed(&mut self) -> Result<(), RecvError>
Waits for the value to change.
Call observe to read the new value.
Sourcepub async fn recv(&mut self) -> Result<T, RecvError>where
T: Clone,
pub async fn recv(&mut self) -> Result<T, RecvError>where
T: Clone,
Waits for the value to change and then returns a clone of it.
§Examples
let (tx, mut rx) = async_observe::channel(0);
// Send a new value
tx.send(1);
// Wait until the value changes and read it
let n = rx.recv().await?;
assert_eq!(n, 1);Sourcepub fn into_stream<'item>(self) -> Updates<'item, T>
pub fn into_stream<'item>(self) -> Updates<'item, T>
Creates a stream from the receiver.
The stream ends when the Sender is dropped.
§Examples
use futures_lite::{StreamExt, future};
let (tx, rx) = async_observe::channel(0);
let producer = async move {
// ^^^^
// Move tx into the future so it is dropped after the loop ends.
// Dropping the sender is important so the receiver can
// see it and stop the stream.
for n in 1..10 {
wait_long_time().await;
_ = tx.send(n);
}
};
// Create a stream from the receiver
let consumer = rx
.into_stream()
.for_each(|n| println!("{n}"));
future::zip(producer, consumer).await;