Skip to main content

AsyncReceiver

Struct AsyncReceiver 

Source
pub struct AsyncReceiver<T> { /* private fields */ }
Expand description

AsyncReceiver is receiving side of the channel in async mode. Receivers can be cloned and produce receivers to operate in both sync and async modes.

§Examples

let (_s, receiver) = kanal::bounded_async::<u64>(0);
let sync_receiver=receiver.clone_sync();

Implementations§

Source§

impl<T> AsyncReceiver<T>

Source

pub fn recv(&self) -> ReceiveFuture<'_, T>

Returns a ReceiveFuture to receive data from the channel asynchronously.

§Cancellation and Polling Considerations

Due to current limitations in Rust’s handling of future cancellation, if a ReceiveFuture is dropped exactly at the time when new data is written to the channel, it may result in the loss of the received value. This behavior although memory-safe stems from the fact that Rust does not provide a built-in, correct mechanism for cancelling futures.

Additionally, it is important to note that constructs such as tokio::select! are not correct to use with kanal async channels. Kanal’s design does not rely on the conventional poll mechanism to read messages. Because of its internal optimizations, the future may complete without receiving the final poll, which prevents proper handling of the message.

As a result, once the ReceiveFuture is polled for the first time (which registers the request to receive data), the programmer must commit to completing the polling process. This ensures that messages are correctly delivered and avoids potential race conditions associated with cancellation.

§Examples
let name=r.recv().await?;
println!("Hello {}",name);
Examples found in repository?
examples/echo_client.rs (line 91)
88    async fn run(&mut self) -> CoreResult<()> {
89        loop {
90            tokio::select! {
91                Ok(cmd) = self.cmd_rx.recv() => {
92                    let _ = self.to_ws.send(Message::text(cmd)).await;
93                    self.echo.store(true, Ordering::SeqCst);
94                }
95                Ok(msg) = self.msg_rx.recv() => {
96                    if let Message::Text(txt) = &*msg {
97                        if self.echo.load(Ordering::SeqCst) {
98                            let _ = self.cmd_tx.send(txt.to_string()).await;
99                            self.echo.store(false, Ordering::SeqCst);
100                        }
101                    }
102                }
103            }
104        }
105    }
106
107    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
108        Box::new(move |msg: &Message| msg.is_text())
109    }
110}
111
112#[derive(Clone)]
113pub struct EchoHandle {
114    sender: AsyncSender<String>,
115    receiver: AsyncReceiver<String>,
116}
117
118impl EchoHandle {
119    pub async fn echo(&self, msg: String) -> CoreResult<String> {
120        let _ = self.sender.send(msg).await;
121        println!("In side echo handle, waiting for response...");
122        Ok(self.receiver.recv().await?)
123    }
124}
125
126// --- ApiModule 2: StreamModule ---
127pub struct StreamModule {
128    msg_rx: AsyncReceiver<Arc<Message>>,
129    cmd_rx: AsyncReceiver<bool>,
130    cmd_tx: AsyncSender<String>,
131    send: AtomicBool,
132}
133
134#[async_trait]
135impl ApiModule<()> for StreamModule {
136    type Command = bool;
137    type CommandResponse = String;
138    type Handle = StreamHandle;
139
140    fn new(
141        _state: Arc<()>,
142        cmd_rx: AsyncReceiver<Self::Command>,
143        cmd_ret_tx: AsyncSender<Self::CommandResponse>,
144        msg_rx: AsyncReceiver<Arc<Message>>,
145        _to_ws: AsyncSender<Message>,
146        _: AsyncSender<RunnerCommand>,
147    ) -> Self {
148        Self {
149            msg_rx,
150            cmd_tx: cmd_ret_tx,
151            cmd_rx,
152            send: AtomicBool::new(false),
153        }
154    }
155
156    fn create_handle(
157        sender: AsyncSender<Self::Command>,
158        receiver: AsyncReceiver<Self::CommandResponse>,
159    ) -> Self::Handle {
160        StreamHandle { sender, receiver }
161    }
162
163    async fn run(&mut self) -> CoreResult<()> {
164        loop {
165            tokio::select! {
166                Ok(cmd) = self.cmd_rx.recv() => {
167                    // Update the send flag based on the received command
168                    self.send.store(cmd, Ordering::SeqCst);
169                }
170                Ok(msg) = self.msg_rx.recv() => {
171                    if let Message::Text(txt) = &*msg {
172                        if self.send.load(Ordering::SeqCst) {
173                            // Process the message if send is true
174                            println!("[StreamModule] Received: {txt}");
175                            let _ = self.cmd_tx.send(txt.to_string()).await;
176                        }
177                    }
178                }
179                else => {
180                    println!("[Error] StreamModule: Channel closed");
181                },
182            }
183        }
184    }
185
186    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
187        Box::new(move |_msg: &Message| {
188            // Accept all messages
189            true
190        })
191    }
192}
193
194#[derive(Clone)]
195pub struct StreamHandle {
196    receiver: AsyncReceiver<String>,
197    sender: AsyncSender<bool>,
198}
199
200impl StreamHandle {
201    pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
202        self.sender.send(true).await?;
203        println!("StreamHandle: Waiting for messages...");
204        Ok(Box::pin(unfold(self.receiver, |state| async move {
205            let item = state.recv().await.map_err(CoreError::from);
206            Some((item, state))
207        })))
208    }
209}
210
211// --- ApiModule 3: PeriodicSenderModule ---
212pub struct PeriodicSenderModule {
213    cmd_rx: AsyncReceiver<bool>,
214    to_ws: AsyncSender<Message>,
215    running: AtomicBool,
216}
217
218#[async_trait]
219impl ApiModule<()> for PeriodicSenderModule {
220    type Command = bool; // true = start, false = stop
221    type CommandResponse = ();
222    type Handle = PeriodicSenderHandle;
223
224    fn new(
225        _state: Arc<()>,
226        cmd_rx: AsyncReceiver<Self::Command>,
227        _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
228        _msg_rx: AsyncReceiver<Arc<Message>>,
229        to_ws: AsyncSender<Message>,
230        _: AsyncSender<RunnerCommand>,
231    ) -> Self {
232        Self {
233            cmd_rx,
234            to_ws,
235            running: AtomicBool::new(false),
236        }
237    }
238
239    fn create_handle(
240        sender: AsyncSender<Self::Command>,
241        _receiver: AsyncReceiver<Self::CommandResponse>,
242    ) -> Self::Handle {
243        PeriodicSenderHandle { sender }
244    }
245
246    async fn run(&mut self) -> CoreResult<()> {
247        let to_ws = self.to_ws.clone();
248        let mut interval = tokio::time::interval(Duration::from_secs(5));
249        loop {
250            tokio::select! {
251                Ok(cmd) = self.cmd_rx.recv() => {
252                    self.running.store(cmd, Ordering::SeqCst);
253                }
254                _ = interval.tick() => {
255                    if self.running.load(Ordering::SeqCst) {
256                        let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
257                    }
258                }
259            }
260        }
261    }
More examples
Hide additional examples
examples/testing_echo_client.rs (line 81)
78    async fn run(&mut self) -> CoreResult<()> {
79        loop {
80            tokio::select! {
81                Ok(cmd) = self.cmd_rx.recv() => {
82                    let _ = self.to_ws.send(Message::text(cmd)).await;
83                    self.echo.store(true, Ordering::SeqCst);
84                }
85                Ok(msg) = self.msg_rx.recv() => {
86                    if let Message::Text(txt) = &*msg {
87                        if self.echo.load(Ordering::SeqCst) {
88                            let _ = self.cmd_tx.send(txt.to_string()).await;
89                            self.echo.store(false, Ordering::SeqCst);
90                        }
91                    }
92                }
93            }
94        }
95    }
96
97    fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
98        Box::new(move |msg: &Message| {
99            println!("Routing rule for EchoModule: {msg:?}");
100            msg.is_text()
101        })
102    }
103}
104
105#[derive(Clone)]
106pub struct EchoHandle {
107    sender: AsyncSender<String>,
108    receiver: AsyncReceiver<String>,
109}
110
111impl EchoHandle {
112    pub async fn echo(&self, msg: String) -> CoreResult<String> {
113        let _ = self.sender.send(msg).await;
114        println!("In side echo handle, waiting for response...");
115        Ok(self.receiver.recv().await?)
116    }
Source

pub fn stream(&self) -> ReceiveStream<'_, T>

Creates a asynchronous stream for the channel to receive messages, ReceiveStream borrows the AsyncReceiver, after dropping it, receiver will be available and usable again.

§Examples
// import to be able to use stream.next() function
use futures::stream::StreamExt;
// import to be able to use stream.is_terminated() function
use futures::stream::FusedStream;

let (s, r) = kanal::unbounded_async();
co(async move {
    for i in 0..100 {
        s.send(i).await.unwrap();
    }
});
let mut stream = r.stream();
assert!(!stream.is_terminated());
for i in 0..100 {
    assert_eq!(stream.next().await, Some(i));
}
// Stream will return None after it is terminated, and there is no other sender.
assert_eq!(stream.next().await, None);
assert!(stream.is_terminated());
Source

pub fn try_recv(&self) -> Result<Option<T>, ReceiveError>

Tries receiving from the channel without waiting on the waitlist. It returns Ok(Some(T)) in case of successful operation and Ok(None) for a failed one, or error in case that channel is closed. Important note: this function is not lock-free as it acquires a mutex guard of the channel internal for a short time.

§Examples
loop {
    if let Some(name)=r.try_recv()?{
        println!("Hello {}!",name);
        break;
    }
}
Source

pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError>

Tries receiving from the channel without waiting on the waitlist or waiting for channel internal lock. It returns Ok(Some(T)) in case of successful operation and Ok(None) for a failed one, or error in case that channel is closed. Do not use this function unless you know exactly what you are doing.

§Examples
loop {
    if let Some(name)=r.try_recv_realtime()?{
        println!("Hello {}!",name);
        break;
    }
}
Source

pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError>

Drains all available messages from the channel into the provided vector and returns the number of received messages.

The function is designed to be non-blocking, meaning it only processes messages that are readily available and returns immediately with whatever messages are present. It provides a count of received messages, which could be zero if no messages are available at the time of the call.

When using this function, it’s a good idea to check if the returned count is zero to avoid busy-waiting in a loop. If blocking behavior is desired when the count is zero, you can use the recv() function if count is zero. For efficiency, reusing the same vector across multiple calls can help minimize memory allocations. Between uses, you can clear the vector with vec.clear() to prepare it for the next set of messages.

§Examples

let mut buf = Vec::with_capacity(1000);
loop {
    if let Ok(count) = r.drain_into(&mut buf) {
        if count == 0 {
           // count is 0, to avoid busy-wait using recv for
           // the first next message
           if let Ok(v) = r.recv() {
              buf.push(v);
           } else {
             break;
           }
        }
        // use buffer
        buf.iter().for_each(|v| println!("{}",v));
    }else{
        println!("Channel closed");
        break;
    }
    buf.clear();
}
Source

pub fn is_disconnected(&self) -> bool

Returns, whether the send side of the channel, is closed or not.

§Examples
let (s, r) = kanal::unbounded::<u64>();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);
Source

pub fn is_terminated(&self) -> bool

Returns, whether the channel receive side is terminated, and will not return any result in future recv calls.

§Examples
let (s, r) = kanal::unbounded::<u64>();
s.send(1).unwrap();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);
// Also channel is closed from send side, it's not terminated as there is data in channel queue
assert_eq!(r.is_terminated(),false);
assert_eq!(r.recv().unwrap(),1);
// Now channel receive side is terminated as there is no sender for channel and queue is empty
assert_eq!(r.is_terminated(),true);
Source

pub fn clone_sync(&self) -> Receiver<T>

Returns sync cloned version of the receiver.

§Examples
let (s, r) = kanal::unbounded_async();
s.send(1).await?;
let sync_receiver=r.clone_sync();
// JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
assert_eq!(sync_receiver.recv()?,1);
Source

pub fn to_sync(self) -> Receiver<T>

Converts AsyncReceiver to Receiver and returns it.

§Examples
  let (s, r) = kanal::bounded_async(0);
  // move to sync environment
  std::thread::spawn(move || {
    let r=r.to_sync();
    let name=r.recv()?;
    println!("Hello {}!",name);
    anyhow::Ok(())
  });
  s.send("World").await?;
Source

pub fn as_sync(&self) -> &Receiver<T>

Borrows AsyncReceiver as Receiver and returns it

§Examples
  let (s, r) = kanal::bounded_async(0);
  // move to sync environment
  std::thread::spawn(move || {
    let name=r.as_sync().recv()?;
    println!("Hello {}!",name);
    anyhow::Ok(())
  });
  s.send("World").await?;
Source

pub fn is_bounded(&self) -> bool

Returns whether the channel is bounded or not.

§Examples
let (s, r) = kanal::bounded::<u64>(0);
assert_eq!(s.is_bounded(),true);
assert_eq!(r.is_bounded(),true);
let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.is_bounded(),false);
assert_eq!(r.is_bounded(),false);
Source

pub fn len(&self) -> usize

Returns length of the queue.

§Examples
let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.len(),0);
assert_eq!(r.len(),0);
s.send(10);
assert_eq!(s.len(),1);
assert_eq!(r.len(),1);
Source

pub fn is_empty(&self) -> bool

Returns whether the channel queue is empty or not.

§Examples
let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.is_empty(),true);
assert_eq!(r.is_empty(),true);
Source

pub fn is_full(&self) -> bool

Returns whether the channel queue is full or not full channels will block on send and recv calls it always returns true for zero sized channels.

§Examples
let (s, r) = kanal::bounded(1);
s.send("Hi!").unwrap();
assert_eq!(s.is_full(),true);
assert_eq!(r.is_full(),true);
Source

pub fn capacity(&self) -> usize

Returns capacity of channel (not the queue) for unbounded channels, it will return usize::MAX.

§Examples
let (s, r) = kanal::bounded::<u64>(0);
assert_eq!(s.capacity(),0);
assert_eq!(r.capacity(),0);
let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.capacity(),usize::MAX);
assert_eq!(r.capacity(),usize::MAX);
Source

pub fn receiver_count(&self) -> u32

Returns count of alive receiver instances of the channel.

§Examples
let (s, r) = kanal::unbounded::<u64>();
let receiver_clone=r.clone();
assert_eq!(r.receiver_count(),2);
Source

pub fn sender_count(&self) -> u32

Returns count of alive sender instances of the channel.

§Examples
let (s, r) = kanal::unbounded::<u64>();
let sender_clone=s.clone();
assert_eq!(r.sender_count(),2);
Source

pub fn close(&self) -> Result<(), CloseError>

Closes the channel completely on both sides and terminates waiting signals.

§Examples
let (s, r) = kanal::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close().unwrap();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);
Source

pub fn is_closed(&self) -> bool

Returns whether the channel is closed on both side of send and receive or not.

§Examples
let (s, r) = kanal::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);

Trait Implementations§

Source§

impl<T> Clone for AsyncReceiver<T>

Available on crate feature async only.
Source§

fn clone(&self) -> AsyncReceiver<T>

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<T> Debug for AsyncReceiver<T>

Available on crate feature async only.
Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl<T> Drop for AsyncReceiver<T>

Available on crate feature async only.
Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

§

impl<T> Freeze for AsyncReceiver<T>

§

impl<T> !RefUnwindSafe for AsyncReceiver<T>

§

impl<T> Send for AsyncReceiver<T>
where T: Send,

§

impl<T> Sync for AsyncReceiver<T>
where T: Send,

§

impl<T> Unpin for AsyncReceiver<T>

§

impl<T> UnsafeUnpin for AsyncReceiver<T>

§

impl<T> !UnwindSafe for AsyncReceiver<T>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more