Skip to main content

AsyncReceiver

Struct AsyncReceiver 

Source
pub struct AsyncReceiver<T> { /* private fields */ }
Expand description

AsyncReceiver is receiving side of the channel in async mode. Receivers can be cloned and produce receivers to operate in both sync and async modes.

§Examples

let (_s, receiver) = kanal_plus::bounded_async::<u64>(0);
let sync_receiver=receiver.clone_sync();

Implementations§

Source§

impl<T> AsyncReceiver<T>

Source

pub fn recv(&self) -> ReceiveFuture<'_, T>

Returns a ReceiveFuture to receive data from the channel asynchronously.

§Cancellation and Polling Considerations

Due to current limitations in Rust’s handling of future cancellation, if a ReceiveFuture is dropped exactly at the time when new data is written to the channel, it may result in the loss of the received value. This behavior although memory-safe stems from the fact that Rust does not provide a built-in, correct mechanism for cancelling futures.

Additionally, it is important to note that constructs such as tokio::select! are not correct to use with kanal async channels. Kanal’s design does not rely on the conventional poll mechanism to read messages. Because of its internal optimizations, the future may complete without receiving the final poll, which prevents proper handling of the message.

As a result, once the ReceiveFuture is polled for the first time (which registers the request to receive data), the programmer must commit to completing the polling process. This ensures that messages are correctly delivered and avoids potential race conditions associated with cancellation.

§Examples
let name=r.recv().await?;
println!("Hello {}",name);
Source

pub fn stream(&self) -> ReceiveStream<'_, T>

Creates a asynchronous stream for the channel to receive messages, ReceiveStream borrows the AsyncReceiver, after dropping it, receiver will be available and usable again.

§Examples
// import to be able to use stream.next() function
use futures::stream::StreamExt;
// import to be able to use stream.is_terminated() function
use futures::stream::FusedStream;

let (s, r) = kanal_plus::unbounded_async();
co(async move {
    for i in 0..100 {
        s.send(i).await.unwrap();
    }
});
let mut stream = r.stream();
assert!(!stream.is_terminated());
for i in 0..100 {
    assert_eq!(stream.next().await, Some(i));
}
// Stream will return None after it is terminated, and there is no other sender.
assert_eq!(stream.next().await, None);
assert!(stream.is_terminated());
Source

pub fn into_stream(self) -> ReceiveStreamOwned<T>

Creates an asynchronous stream that owns the receiver.

This is useful when the stream needs to outlive the receiver borrow.

Source

pub fn drain_into_blocking<'a, 'b>( &'a self, vec: &'b mut Vec<T>, ) -> DrainIntoBlockingFuture<'a, 'b, T>

Returns a DrainIntoBlockingFuture to drain all available messages from the channel into the provided vector, awaiting until at least one message is received.

This function combines the behavior of drain_into with async semantics:

  • If messages are available, it drains all of them and returns immediately
  • If no messages are available, it awaits (yields to the async runtime) until at least one message arrives

Note: The name “blocking” refers to the semantic behavior (waiting for data), not thread blocking. This method is fully async and will not block the thread.

Returns the number of messages received.

§Examples
let (s, r) = kanal_plus::bounded_async(100);
spawn(async move {
    for i in 0..100 {
        s.send(i).await.unwrap();
    }
});

let mut buf = Vec::new();
loop {
    match r.drain_into_blocking(&mut buf).await {
        Ok(count) => {
            assert!(count > 0);
            // process buf...
            buf.clear();
        }
        Err(_) => break, // channel closed
    }
}
Source

pub fn try_recv(&self) -> Result<Option<T>, ReceiveError>

Tries receiving from the channel without waiting on the waitlist. It returns Ok(Some(T)) in case of successful operation and Ok(None) for a failed one, or error in case that channel is closed. Important note: this function is not lock-free as it acquires a mutex guard of the channel internal for a short time.

§Examples
loop {
    if let Some(name)=r.try_recv()?{
        println!("Hello {}!",name);
        break;
    }
}
Source

pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError>

Tries receiving from the channel without waiting on the waitlist or waiting for channel internal lock. It returns Ok(Some(T)) in case of successful operation and Ok(None) for a failed one, or error in case that channel is closed. Do not use this function unless you know exactly what you are doing.

§Examples
loop {
    if let Some(name)=r.try_recv_realtime()?{
        println!("Hello {}!",name);
        break;
    }
}
Source

pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError>

Drains all available messages from the channel into the provided vector and returns the number of received messages.

The function is designed to be non-blocking, meaning it only processes messages that are readily available and returns immediately with whatever messages are present. It provides a count of received messages, which could be zero if no messages are available at the time of the call.

When using this function, it’s a good idea to check if the returned count is zero to avoid busy-waiting in a loop. If blocking behavior is desired when the count is zero, you can use the recv() function if count is zero. For efficiency, reusing the same vector across multiple calls can help minimize memory allocations. Between uses, you can clear the vector with vec.clear() to prepare it for the next set of messages.

§Examples

let mut buf = Vec::with_capacity(1000);
loop {
    if let Ok(count) = r.drain_into(&mut buf) {
        if count == 0 {
           // count is 0, to avoid busy-wait using recv for
           // the first next message
           if let Ok(v) = r.recv() {
              buf.push(v);
           } else {
             break;
           }
        }
        // use buffer
        buf.iter().for_each(|v| println!("{}",v));
    }else{
        println!("Channel closed");
        break;
    }
    buf.clear();
}
Source

pub fn is_disconnected(&self) -> bool

Returns, whether the send side of the channel, is closed or not.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);
Source

pub fn is_terminated(&self) -> bool

Returns, whether the channel receive side is terminated, and will not return any result in future recv calls.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
s.send(1).unwrap();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);
// Also channel is closed from send side, it's not terminated as there is data in channel queue
assert_eq!(r.is_terminated(),false);
assert_eq!(r.recv().unwrap(),1);
// Now channel receive side is terminated as there is no sender for channel and queue is empty
assert_eq!(r.is_terminated(),true);
Source

pub fn clone_sync(&self) -> Receiver<T>

Returns sync cloned version of the receiver.

§Examples
let (s, r) = kanal_plus::unbounded_async();
s.send(1).await?;
let sync_receiver=r.clone_sync();
// JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
assert_eq!(sync_receiver.recv()?,1);
Source

pub fn to_sync(self) -> Receiver<T>

Converts AsyncReceiver to Receiver and returns it.

§Examples
  let (s, r) = kanal_plus::bounded_async(0);
  // move to sync environment
  std::thread::spawn(move || {
    let r=r.to_sync();
    let name=r.recv()?;
    println!("Hello {}!",name);
    anyhow::Ok(())
  });
  s.send("World").await?;
Source

pub fn as_sync(&self) -> &Receiver<T>

Borrows AsyncReceiver as Receiver and returns it

§Examples
  let (s, r) = kanal_plus::bounded_async(0);
  // move to sync environment
  std::thread::spawn(move || {
    let name=r.as_sync().recv()?;
    println!("Hello {}!",name);
    anyhow::Ok(())
  });
  s.send("World").await?;
Source

pub fn is_bounded(&self) -> bool

Returns whether the channel is bounded or not.

§Examples
let (s, r) = kanal_plus::bounded::<u64>(0);
assert_eq!(s.is_bounded(),true);
assert_eq!(r.is_bounded(),true);
let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.is_bounded(),false);
assert_eq!(r.is_bounded(),false);
Source

pub fn len(&self) -> usize

Returns length of the queue.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.len(),0);
assert_eq!(r.len(),0);
s.send(10);
assert_eq!(s.len(),1);
assert_eq!(r.len(),1);
Source

pub fn is_empty(&self) -> bool

Returns whether the channel queue is empty or not.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.is_empty(),true);
assert_eq!(r.is_empty(),true);
Source

pub fn is_full(&self) -> bool

Returns whether the channel queue is full or not full channels will block on send and recv calls it always returns true for zero sized channels.

§Examples
let (s, r) = kanal_plus::bounded(1);
s.send("Hi!").unwrap();
assert_eq!(s.is_full(),true);
assert_eq!(r.is_full(),true);
Source

pub fn capacity(&self) -> usize

Returns capacity of channel (not the queue) for unbounded channels, it will return usize::MAX.

§Examples
let (s, r) = kanal_plus::bounded::<u64>(0);
assert_eq!(s.capacity(),0);
assert_eq!(r.capacity(),0);
let (s, r) = kanal_plus::unbounded::<u64>();
assert_eq!(s.capacity(),usize::MAX);
assert_eq!(r.capacity(),usize::MAX);
Source

pub fn receiver_count(&self) -> usize

Returns count of alive receiver instances of the channel.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
let receiver_clone=r.clone();
assert_eq!(r.receiver_count(),2);
Source

pub fn sender_count(&self) -> usize

Returns count of alive sender instances of the channel.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
let sender_clone=s.clone();
assert_eq!(r.sender_count(),2);
Source

pub fn close(&self) -> Result<(), CloseError>

Closes the channel completely on both sides and terminates waiting signals.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close().unwrap();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);
Source

pub fn is_closed(&self) -> bool

Returns whether the channel is closed on both side of send and receive or not.

§Examples
let (s, r) = kanal_plus::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);

Trait Implementations§

Source§

impl<T> Clone for AsyncReceiver<T>

Available on crate feature async only.
Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T> Debug for AsyncReceiver<T>

Available on crate feature async only.
Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<T> Drop for AsyncReceiver<T>

Available on crate feature async only.
Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<T> Freeze for AsyncReceiver<T>

§

impl<T> !RefUnwindSafe for AsyncReceiver<T>

§

impl<T> Send for AsyncReceiver<T>
where T: Send,

§

impl<T> Sync for AsyncReceiver<T>

§

impl<T> Unpin for AsyncReceiver<T>

§

impl<T> !UnwindSafe for AsyncReceiver<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.