Struct async_broadcast::Receiver
source · pub struct Receiver<T> { /* private fields */ }
Expand description
The receiving side of a channel.
Receivers can be cloned and shared among threads. When all (active) receivers associated with a
channel are dropped, the channel becomes closed. You can deactivate a receiver using
Receiver::deactivate
if you would like the channel to remain open without keeping active
receivers around.
Implementations§
source§impl<T> Receiver<T>
impl<T> Receiver<T>
sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns the channel capacity.
Examples
use async_broadcast::broadcast;
let (_s, r) = broadcast::<i32>(5);
assert_eq!(r.capacity(), 5);
sourcepub fn set_capacity(&mut self, new_cap: usize)
pub fn set_capacity(&mut self, new_cap: usize)
Set the channel capacity.
There are times when you need to change the channel’s capacity after creating it. If the
new_cap
is less than the number of messages in the channel, the oldest messages will be
dropped to shrink the channel.
Examples
use async_broadcast::{broadcast, TrySendError, TryRecvError};
let (s, mut r) = broadcast::<i32>(3);
assert_eq!(r.capacity(), 3);
s.try_broadcast(1).unwrap();
s.try_broadcast(2).unwrap();
s.try_broadcast(3).unwrap();
r.set_capacity(1);
assert_eq!(r.capacity(), 1);
assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
assert_eq!(r.try_recv().unwrap(), 3);
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
s.try_broadcast(1).unwrap();
assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
r.set_capacity(2);
assert_eq!(r.capacity(), 2);
s.try_broadcast(2).unwrap();
assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
sourcepub fn overflow(&self) -> bool
pub fn overflow(&self) -> bool
If overflow mode is enabled on this channel.
Examples
use async_broadcast::broadcast;
let (_s, r) = broadcast::<i32>(5);
assert!(!r.overflow());
sourcepub fn set_overflow(&mut self, overflow: bool)
pub fn set_overflow(&mut self, overflow: bool)
Set overflow mode on the channel.
When overflow mode is set, broadcasting to the channel will succeed even if the channel is full. It achieves that by removing the oldest message from the channel.
Examples
use async_broadcast::{broadcast, TrySendError, TryRecvError};
let (s, mut r) = broadcast::<i32>(2);
s.try_broadcast(1).unwrap();
s.try_broadcast(2).unwrap();
assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
r.set_overflow(true);
assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
assert_eq!(r.try_recv().unwrap(), 3);
assert_eq!(r.try_recv().unwrap(), 4);
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
sourcepub fn await_active(&self) -> bool
pub fn await_active(&self) -> bool
sourcepub fn set_await_active(&mut self, await_active: bool)
pub fn set_await_active(&mut self, await_active: bool)
Specify if sender will wait for active receivers.
If set to false
, Send
will resolve immediately with a SendError
. Defaults to
true
.
Examples
use async_broadcast::broadcast;
let (s, mut r) = broadcast::<i32>(2);
s.broadcast(1).await.unwrap();
r.set_await_active(false);
let _ = r.deactivate();
assert!(s.broadcast(2).await.is_err());
sourcepub fn close(&self) -> bool
pub fn close(&self) -> bool
Closes the channel.
Returns true
if this call has closed the channel and it was not closed already.
The remaining messages can still be received.
Examples
use async_broadcast::{broadcast, RecvError};
let (s, mut r) = broadcast(1);
s.broadcast(1).await.unwrap();
assert!(s.close());
assert_eq!(r.recv().await.unwrap(), 1);
assert_eq!(r.recv().await, Err(RecvError::Closed));
sourcepub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
Returns true
if the channel is closed.
Examples
use async_broadcast::{broadcast, RecvError};
let (s, r) = broadcast::<()>(1);
assert!(!s.is_closed());
drop(r);
assert!(s.is_closed());
sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns true
if the channel is empty.
Examples
use async_broadcast::broadcast;
let (s, r) = broadcast(1);
assert!(s.is_empty());
s.broadcast(1).await;
assert!(!s.is_empty());
sourcepub fn is_full(&self) -> bool
pub fn is_full(&self) -> bool
Returns true
if the channel is full.
Examples
use async_broadcast::broadcast;
let (s, r) = broadcast(1);
assert!(!s.is_full());
s.broadcast(1).await;
assert!(s.is_full());
sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Returns the number of messages in the channel.
Examples
use async_broadcast::broadcast;
let (s, r) = broadcast(2);
assert_eq!(s.len(), 0);
s.broadcast(1).await;
s.broadcast(2).await;
assert_eq!(s.len(), 2);
sourcepub fn receiver_count(&self) -> usize
pub fn receiver_count(&self) -> usize
Returns the number of receivers for the channel.
This does not include inactive receivers. Use Receiver::inactive_receiver_count
if you
are interested in that.
Examples
use async_broadcast::broadcast;
let (s, r) = broadcast::<()>(1);
assert_eq!(s.receiver_count(), 1);
let r = r.deactivate();
assert_eq!(s.receiver_count(), 0);
let r2 = r.activate_cloned();
assert_eq!(r.receiver_count(), 1);
assert_eq!(r.inactive_receiver_count(), 1);
sourcepub fn inactive_receiver_count(&self) -> usize
pub fn inactive_receiver_count(&self) -> usize
Returns the number of inactive receivers for the channel.
Examples
use async_broadcast::broadcast;
let (s, r) = broadcast::<()>(1);
assert_eq!(s.receiver_count(), 1);
let r = r.deactivate();
assert_eq!(s.receiver_count(), 0);
let r2 = r.activate_cloned();
assert_eq!(r.receiver_count(), 1);
assert_eq!(r.inactive_receiver_count(), 1);
sourcepub fn sender_count(&self) -> usize
pub fn sender_count(&self) -> usize
Returns the number of senders for the channel.
Examples
use async_broadcast::broadcast;
let (s, r) = broadcast::<()>(1);
assert_eq!(s.sender_count(), 1);
let s2 = s.clone();
assert_eq!(s.sender_count(), 2);
sourcepub fn deactivate(self) -> InactiveReceiver<T>
pub fn deactivate(self) -> InactiveReceiver<T>
Downgrade to a InactiveReceiver
.
An inactive receiver is one that can not and does not receive any messages. Its only purpose
is keep the associated channel open even when there are no (active) receivers. An inactive
receiver can be upgraded into a Receiver
using InactiveReceiver::activate
or
InactiveReceiver::activate_cloned
.
Sender::try_broadcast
will return TrySendError::Inactive
if only inactive
receivers exists for the associated channel and Sender::broadcast
will wait until an
active receiver is available.
Examples
use async_broadcast::{broadcast, TrySendError};
let (s, r) = broadcast(1);
let inactive = r.deactivate();
assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
let mut r = inactive.activate();
assert_eq!(s.broadcast(10).await, Ok(None));
assert_eq!(r.recv().await, Ok(10));
source§impl<T: Clone> Receiver<T>
impl<T: Clone> Receiver<T>
sourcepub fn recv(&mut self) -> Recv<'_, T> ⓘ
pub fn recv(&mut self) -> Recv<'_, T> ⓘ
Receives a message from the channel.
If the channel is empty, this method waits until there is a message.
If the channel is closed, this method receives a message or returns an error if there are no more messages.
If this receiver has missed a message (only possible if overflow mode is enabled), then this method returns an error and readjusts its cursor to point to the first available message.
Examples
use async_broadcast::{broadcast, RecvError};
let (s, mut r1) = broadcast(1);
let mut r2 = r1.clone();
assert_eq!(s.broadcast(1).await, Ok(None));
drop(s);
assert_eq!(r1.recv().await, Ok(1));
assert_eq!(r1.recv().await, Err(RecvError::Closed));
assert_eq!(r2.recv().await, Ok(1));
assert_eq!(r2.recv().await, Err(RecvError::Closed));
sourcepub fn try_recv(&mut self) -> Result<T, TryRecvError>
pub fn try_recv(&mut self) -> Result<T, TryRecvError>
Attempts to receive a message from the channel.
If the channel is empty or closed, this method returns an error.
If this receiver has missed a message (only possible if overflow mode is enabled), then this method returns an error and readjusts its cursor to point to the first available message.
Examples
use async_broadcast::{broadcast, TryRecvError};
let (s, mut r1) = broadcast(1);
let mut r2 = r1.clone();
assert_eq!(s.broadcast(1).await, Ok(None));
assert_eq!(r1.try_recv(), Ok(1));
assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
assert_eq!(r2.try_recv(), Ok(1));
assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
drop(s);
assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
Examples found in repository?
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// If this stream is listening for events, first wait for a notification.
if let Some(listener) = self.listener.as_mut() {
ready!(Pin::new(listener).poll(cx));
self.listener = None;
}
loop {
// Attempt to receive a message.
match self.try_recv() {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(None);
}
Err(TryRecvError::Overflowed(_)) => continue,
Err(TryRecvError::Empty) => {}
}
// Receiving failed - now start listening for notifications or wait for one.
match self.listener.as_mut() {
None => {
// Start listening and then try receiving again.
self.listener = {
let inner = self.inner.write();
Some(inner.recv_ops.listen())
};
}
Some(_) => {
// Go back to the outer loop to poll the listener.
break;
}
}
}
}
}
}
impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
fn is_terminated(&self) -> bool {
let inner = self.inner.read();
inner.is_closed && inner.queue.is_empty()
}
}
/// An error returned from [`Sender::broadcast()`].
///
/// Received because the channel is closed or no active receivers were present while `await-active`
/// was set to `false` (See [`Sender::set_await_active`] for details).
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(pub T);
impl<T> SendError<T> {
/// Unwraps the message that couldn't be sent.
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SendError(..)")
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending into a closed channel")
}
}
/// An error returned from [`Sender::try_broadcast()`].
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
/// The channel is full but not closed.
Full(T),
/// The channel is closed.
Closed(T),
/// There are currently no active receivers, only inactive ones.
Inactive(T),
}
impl<T> TrySendError<T> {
/// Unwraps the message that couldn't be sent.
pub fn into_inner(self) -> T {
match self {
TrySendError::Full(t) => t,
TrySendError::Closed(t) => t,
TrySendError::Inactive(t) => t,
}
}
/// Returns `true` if the channel is full but not closed.
pub fn is_full(&self) -> bool {
match self {
TrySendError::Full(_) => true,
TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
}
}
/// Returns `true` if the channel is closed.
pub fn is_closed(&self) -> bool {
match self {
TrySendError::Full(_) | TrySendError::Inactive(_) => false,
TrySendError::Closed(_) => true,
}
}
/// Returns `true` if there are currently no active receivers, only inactive ones.
pub fn is_disconnected(&self) -> bool {
match self {
TrySendError::Full(_) | TrySendError::Closed(_) => false,
TrySendError::Inactive(_) => true,
}
}
}
impl<T> error::Error for TrySendError<T> {}
impl<T> fmt::Debug for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "Full(..)"),
TrySendError::Closed(..) => write!(f, "Closed(..)"),
TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
}
}
}
impl<T> fmt::Display for TrySendError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TrySendError::Full(..) => write!(f, "sending into a full channel"),
TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
}
}
}
/// An error returned from [`Receiver::recv()`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum RecvError {
/// The channel has overflowed since the last element was seen. Future recv operations will
/// succeed, but some messages have been skipped.
///
/// Contains the number of messages missed.
Overflowed(u64),
/// The channel is empty and closed.
Closed,
}
impl error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Overflowed(n) => write!(f, "receiving skipped {} messages", n),
Self::Closed => write!(f, "receiving from an empty and closed channel"),
}
}
}
/// An error returned from [`Receiver::try_recv()`].
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum TryRecvError {
/// The channel has overflowed since the last element was seen. Future recv operations will
/// succeed, but some messages have been skipped.
Overflowed(u64),
/// The channel is empty but not closed.
Empty,
/// The channel is empty and closed.
Closed,
}
impl TryRecvError {
/// Returns `true` if the channel is empty but not closed.
pub fn is_empty(&self) -> bool {
match self {
TryRecvError::Empty => true,
TryRecvError::Closed => false,
TryRecvError::Overflowed(_) => false,
}
}
/// Returns `true` if the channel is empty and closed.
pub fn is_closed(&self) -> bool {
match self {
TryRecvError::Empty => false,
TryRecvError::Closed => true,
TryRecvError::Overflowed(_) => false,
}
}
/// Returns `true` if this error indicates the receiver missed messages.
pub fn is_overflowed(&self) -> bool {
match self {
TryRecvError::Empty => false,
TryRecvError::Closed => false,
TryRecvError::Overflowed(_) => true,
}
}
}
impl error::Error for TryRecvError {}
impl fmt::Display for TryRecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
TryRecvError::Empty => write!(f, "receiving from an empty channel"),
TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
TryRecvError::Overflowed(n) => {
write!(f, "receiving operation observed {} lost messages", n)
}
}
}
}
/// A future returned by [`Sender::broadcast()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Send<'a, T> {
sender: &'a Sender<T>,
listener: Option<EventListener>,
msg: Option<T>,
}
impl<'a, T> Unpin for Send<'a, T> {}
impl<'a, T: Clone> Future for Send<'a, T> {
type Output = Result<Option<T>, SendError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);
loop {
let msg = this.msg.take().unwrap();
let inner = &this.sender.inner;
// Attempt to send a message.
match this.sender.try_broadcast(msg) {
Ok(msg) => {
let inner = inner.write();
if inner.queue.len() < inner.capacity {
// Not full still, so notify the next awaiting sender.
inner.send_ops.notify(1);
}
return Poll::Ready(Ok(msg));
}
Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
Err(TrySendError::Full(m)) => this.msg = Some(m),
Err(TrySendError::Inactive(m)) if inner.read().await_active => this.msg = Some(m),
Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
}
// Sending failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try sending again.
let inner = inner.write();
this.listener = Some(inner.send_ops.listen());
}
Some(l) => {
// Wait for a notification.
ready!(Pin::new(l).poll(cx));
this.listener = None;
}
}
}
}
}
/// A future returned by [`Receiver::recv()`].
#[derive(Debug)]
#[must_use = "futures do nothing unless .awaited"]
pub struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,
listener: Option<EventListener>,
}
impl<'a, T> Unpin for Recv<'a, T> {}
impl<'a, T: Clone> Future for Recv<'a, T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = Pin::new(self);
loop {
// Attempt to receive a message.
match this.receiver.try_recv() {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
Err(TryRecvError::Overflowed(n)) => {
return Poll::Ready(Err(RecvError::Overflowed(n)))
}
Err(TryRecvError::Empty) => {}
}
// Receiving failed - now start listening for notifications or wait for one.
match &mut this.listener {
None => {
// Start listening and then try receiving again.
this.listener = {
let inner = this.receiver.inner.write();
Some(inner.recv_ops.listen())
};
}
Some(l) => {
// Wait for a notification.
ready!(Pin::new(l).poll(cx));
this.listener = None;
}
}
}
}
sourcepub fn new_sender(&self) -> Sender<T>
pub fn new_sender(&self) -> Sender<T>
Produce a new Sender for this channel.
This will not re-open the channel if it was closed due to all senders being dropped.
Examples
use async_broadcast::{broadcast, RecvError};
let (s1, mut r) = broadcast(2);
assert_eq!(s1.broadcast(1).await, Ok(None));
let mut s2 = r.new_sender();
assert_eq!(s2.broadcast(2).await, Ok(None));
drop(s1);
drop(s2);
assert_eq!(r.recv().await, Ok(1));
assert_eq!(r.recv().await, Ok(2));
assert_eq!(r.recv().await, Err(RecvError::Closed));
sourcepub fn new_receiver(&self) -> Self
pub fn new_receiver(&self) -> Self
Produce a new Receiver for this channel.
Unlike Receiver::clone
, this method creates a new receiver that starts with zero
messages available. This is slightly faster than a real clone.
Examples
use async_broadcast::{broadcast, RecvError};
let (s, mut r1) = broadcast(2);
assert_eq!(s.broadcast(1).await, Ok(None));
let mut r2 = r1.new_receiver();
assert_eq!(s.broadcast(2).await, Ok(None));
drop(s);
assert_eq!(r1.recv().await, Ok(1));
assert_eq!(r1.recv().await, Ok(2));
assert_eq!(r1.recv().await, Err(RecvError::Closed));
assert_eq!(r2.recv().await, Ok(2));
assert_eq!(r2.recv().await, Err(RecvError::Closed));
Trait Implementations§
source§impl<T> Clone for Receiver<T>
impl<T> Clone for Receiver<T>
source§fn clone(&self) -> Self
fn clone(&self) -> Self
Produce a clone of this Receiver that has the same messages queued.
Examples
use async_broadcast::{broadcast, RecvError};
let (s, mut r1) = broadcast(1);
assert_eq!(s.broadcast(1).await, Ok(None));
drop(s);
let mut r2 = r1.clone();
assert_eq!(r1.recv().await, Ok(1));
assert_eq!(r1.recv().await, Err(RecvError::Closed));
assert_eq!(r2.recv().await, Ok(1));
assert_eq!(r2.recv().await, Err(RecvError::Closed));
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresource§impl<T: Clone> FusedStream for Receiver<T>
impl<T: Clone> FusedStream for Receiver<T>
source§fn is_terminated(&self) -> bool
fn is_terminated(&self) -> bool
true
if the stream should no longer be polled.