pub struct AsyncReceiver<T> { /* private fields */ }Expand description
AsyncReceiver is receiving side of the channel in async mode.
Receivers can be cloned and produce receivers to operate in both sync and
async modes.
§Examples
let (_s, receiver) = kanal_plus::bounded_async::<u64>(0);
let sync_receiver=receiver.clone_sync();Implementations§
Source§impl<T> AsyncReceiver<T>
impl<T> AsyncReceiver<T>
Sourcepub fn recv(&self) -> ReceiveFuture<'_, T> ⓘ
pub fn recv(&self) -> ReceiveFuture<'_, T> ⓘ
Returns a ReceiveFuture to receive data from the channel
asynchronously.
§Cancellation and Polling Considerations
Due to current limitations in Rust’s handling of future cancellation, if
a ReceiveFuture is dropped exactly at the time when new data is
written to the channel, it may result in the loss of the received
value. This behavior although memory-safe stems from the fact that
Rust does not provide a built-in, correct mechanism for cancelling
futures.
Additionally, it is important to note that constructs such as
tokio::select! are not correct to use with kanal async channels.
Kanal’s design does not rely on the conventional poll mechanism to
read messages. Because of its internal optimizations, the future may
complete without receiving the final poll, which prevents proper
handling of the message.
As a result, once the ReceiveFuture is polled for the first time
(which registers the request to receive data), the programmer must
commit to completing the polling process. This ensures that messages
are correctly delivered and avoids potential race conditions associated
with cancellation.
§Examples
let name=r.recv().await?;
println!("Hello {}",name);Sourcepub fn stream(&self) -> ReceiveStream<'_, T>
pub fn stream(&self) -> ReceiveStream<'_, T>
Creates a asynchronous stream for the channel to receive messages,
ReceiveStream borrows the AsyncReceiver, after dropping it,
receiver will be available and usable again.
§Examples
// import to be able to use stream.next() function
use futures::stream::StreamExt;
// import to be able to use stream.is_terminated() function
use futures::stream::FusedStream;
let (s, r) = kanal_plus::unbounded_async();
co(async move {
for i in 0..100 {
s.send(i).await.unwrap();
}
});
let mut stream = r.stream();
assert!(!stream.is_terminated());
for i in 0..100 {
assert_eq!(stream.next().await, Some(i));
}
// Stream will return None after it is terminated, and there is no other sender.
assert_eq!(stream.next().await, None);
assert!(stream.is_terminated());Sourcepub fn into_stream(self) -> ReceiveStreamOwned<T>
pub fn into_stream(self) -> ReceiveStreamOwned<T>
Creates an asynchronous stream that owns the receiver.
This is useful when the stream needs to outlive the receiver borrow.
Sourcepub fn drain_into_blocking<'a, 'b>(
&'a self,
vec: &'b mut Vec<T>,
) -> DrainIntoBlockingFuture<'a, 'b, T> ⓘ
pub fn drain_into_blocking<'a, 'b>( &'a self, vec: &'b mut Vec<T>, ) -> DrainIntoBlockingFuture<'a, 'b, T> ⓘ
Returns a DrainIntoBlockingFuture to drain all available messages from the channel
into the provided vector, awaiting until at least one message is received.
This function combines the behavior of drain_into with async semantics:
- If messages are available, it drains all of them and returns immediately
- If no messages are available, it awaits (yields to the async runtime) until at least one message arrives
Note: The name “blocking” refers to the semantic behavior (waiting for data), not thread blocking. This method is fully async and will not block the thread.
Returns the number of messages received.
§Examples
let (s, r) = kanal_plus::bounded_async(100);
spawn(async move {
for i in 0..100 {
s.send(i).await.unwrap();
}
});
let mut buf = Vec::new();
loop {
match r.drain_into_blocking(&mut buf).await {
Ok(count) => {
assert!(count > 0);
// process buf...
buf.clear();
}
Err(_) => break, // channel closed
}
}Sourcepub fn try_recv(&self) -> Result<Option<T>, ReceiveError>
pub fn try_recv(&self) -> Result<Option<T>, ReceiveError>
Tries receiving from the channel without waiting on the waitlist.
It returns Ok(Some(T)) in case of successful operation and
Ok(None) for a failed one, or error in case that channel is
closed. Important note: this function is not lock-free as it
acquires a mutex guard of the channel internal for a short time.
§Examples
loop {
if let Some(name)=r.try_recv()?{
println!("Hello {}!",name);
break;
}
}Sourcepub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError>
pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError>
Tries receiving from the channel without waiting on the waitlist or
waiting for channel internal lock. It returns Ok(Some(T)) in case
of successful operation and Ok(None) for a failed one, or error in
case that channel is closed. Do not use this function unless you
know exactly what you are doing.
§Examples
loop {
if let Some(name)=r.try_recv_realtime()?{
println!("Hello {}!",name);
break;
}
}Sourcepub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError>
pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError>
Drains all available messages from the channel into the provided vector and returns the number of received messages.
The function is designed to be non-blocking, meaning it only processes messages that are readily available and returns immediately with whatever messages are present. It provides a count of received messages, which could be zero if no messages are available at the time of the call.
When using this function, it’s a good idea to check if the returned count is
zero to avoid busy-waiting in a loop. If blocking behavior is desired when
the count is zero, you can use the recv() function if count is zero. For
efficiency, reusing the same vector across multiple calls can help minimize
memory allocations. Between uses, you can clear the vector with
vec.clear() to prepare it for the next set of messages.
§Examples
let mut buf = Vec::with_capacity(1000);
loop {
if let Ok(count) = r.drain_into(&mut buf) {
if count == 0 {
// count is 0, to avoid busy-wait using recv for
// the first next message
if let Ok(v) = r.recv() {
buf.push(v);
} else {
break;
}
}
// use buffer
buf.iter().for_each(|v| println!("{}",v));
}else{
println!("Channel closed");
break;
}
buf.clear();
}Sourcepub fn is_disconnected(&self) -> bool
pub fn is_disconnected(&self) -> bool
Returns, whether the send side of the channel, is closed or not.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);Sourcepub fn is_terminated(&self) -> bool
pub fn is_terminated(&self) -> bool
Returns, whether the channel receive side is terminated, and will not return any result in future recv calls.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
s.send(1).unwrap();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);
// Also channel is closed from send side, it's not terminated as there is data in channel queue
assert_eq!(r.is_terminated(),false);
assert_eq!(r.recv().unwrap(),1);
// Now channel receive side is terminated as there is no sender for channel and queue is empty
assert_eq!(r.is_terminated(),true);Sourcepub fn clone_sync(&self) -> Receiver<T> ⓘ
pub fn clone_sync(&self) -> Receiver<T> ⓘ
Returns sync cloned version of the receiver.
§Examples
let (s, r) = kanal_plus::unbounded_async();
s.send(1).await?;
let sync_receiver=r.clone_sync();
// JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
assert_eq!(sync_receiver.recv()?,1);Sourcepub fn to_sync(self) -> Receiver<T> ⓘ
pub fn to_sync(self) -> Receiver<T> ⓘ
Converts AsyncReceiver to Receiver and returns it.
§Examples
let (s, r) = kanal_plus::bounded_async(0);
// move to sync environment
std::thread::spawn(move || {
let r=r.to_sync();
let name=r.recv()?;
println!("Hello {}!",name);
anyhow::Ok(())
});
s.send("World").await?;Sourcepub fn as_sync(&self) -> &Receiver<T> ⓘ
pub fn as_sync(&self) -> &Receiver<T> ⓘ
Borrows AsyncReceiver as Receiver and returns it
§Examples
let (s, r) = kanal_plus::bounded_async(0);
// move to sync environment
std::thread::spawn(move || {
let name=r.as_sync().recv()?;
println!("Hello {}!",name);
anyhow::Ok(())
});
s.send("World").await?;Sourcepub fn is_bounded(&self) -> bool
pub fn is_bounded(&self) -> bool
Returns whether the channel is bounded or not.
§Examples
let (s, r) = kanal_plus::bounded::<u64>(0);
assert_eq!(s.is_bounded(),true);
assert_eq!(r.is_bounded(),true);let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.is_bounded(),false);
assert_eq!(r.is_bounded(),false);Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Returns length of the queue.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.len(),0);
assert_eq!(r.len(),0);
s.send(10);
assert_eq!(s.len(),1);
assert_eq!(r.len(),1);Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns whether the channel queue is empty or not.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.is_empty(),true);
assert_eq!(r.is_empty(),true);Sourcepub fn is_full(&self) -> bool
pub fn is_full(&self) -> bool
Returns whether the channel queue is full or not full channels will block on send and recv calls it always returns true for zero sized channels.
§Examples
let (s, r) = kanal_plus::bounded(1);
s.send("Hi!").unwrap();
assert_eq!(s.is_full(),true);
assert_eq!(r.is_full(),true);Sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns capacity of channel (not the queue) for unbounded channels, it will return usize::MAX.
§Examples
let (s, r) = kanal_plus::bounded::<u64>(0);
assert_eq!(s.capacity(),0);
assert_eq!(r.capacity(),0);let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.capacity(),usize::MAX);
assert_eq!(r.capacity(),usize::MAX);Sourcepub fn receiver_count(&self) -> usize
pub fn receiver_count(&self) -> usize
Returns count of alive receiver instances of the channel.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
let receiver_clone=r.clone();
assert_eq!(r.receiver_count(),2);Sourcepub fn sender_count(&self) -> usize
pub fn sender_count(&self) -> usize
Returns count of alive sender instances of the channel.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
let sender_clone=s.clone();
assert_eq!(r.sender_count(),2);Sourcepub fn close(&self) -> Result<(), CloseError>
pub fn close(&self) -> Result<(), CloseError>
Closes the channel completely on both sides and terminates waiting signals.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close().unwrap();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);Sourcepub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
Returns whether the channel is closed on both side of send and receive or not.
§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);Trait Implementations§
Source§impl<T> Clone for AsyncReceiver<T>
Available on crate feature async only.
impl<T> Clone for AsyncReceiver<T>
async only.Source§impl<T> Debug for AsyncReceiver<T>
Available on crate feature async only.
impl<T> Debug for AsyncReceiver<T>
async only.