pub struct AsyncReceiver<T> { /* private fields */ }Expand description
AsyncReceiver is receiving side of the channel in async mode.
Receivers can be cloned and produce receivers to operate in both sync and
async modes.
§Examples
let (_s, receiver) = kanal::bounded_async::<u64>(0);
let sync_receiver=receiver.clone_sync();Implementations§
Source§impl<T> AsyncReceiver<T>
impl<T> AsyncReceiver<T>
Sourcepub fn recv(&self) -> ReceiveFuture<'_, T>
pub fn recv(&self) -> ReceiveFuture<'_, T>
Returns a ReceiveFuture to receive data from the channel
asynchronously.
§Cancellation and Polling Considerations
Due to current limitations in Rust’s handling of future cancellation, if a
ReceiveFuture is dropped exactly at the time when new data is written to the
channel, it may result in the loss of the received value. This behavior although memory-safe stems from
the fact that Rust does not provide a built-in, correct mechanism for cancelling futures.
Additionally, it is important to note that constructs such as tokio::select! are not correct to use
with kanal async channels. Kanal’s design does not rely on the conventional poll mechanism to
read messages. Because of its internal optimizations, the future may complete without receiving the
final poll, which prevents proper handling of the message.
As a result, once the ReceiveFuture is polled for the first time (which registers the request to
receive data), the programmer must commit to completing the polling process. This ensures that
messages are correctly delivered and avoids potential race conditions associated with cancellation.
§Examples
let name=r.recv().await?;
println!("Hello {}",name);Examples found in repository?
88 async fn run(&mut self) -> CoreResult<()> {
89 loop {
90 tokio::select! {
91 Ok(cmd) = self.cmd_rx.recv() => {
92 let _ = self.to_ws.send(Message::text(cmd)).await;
93 self.echo.store(true, Ordering::SeqCst);
94 }
95 Ok(msg) = self.msg_rx.recv() => {
96 if let Message::Text(txt) = &*msg {
97 if self.echo.load(Ordering::SeqCst) {
98 let _ = self.cmd_tx.send(txt.to_string()).await;
99 self.echo.store(false, Ordering::SeqCst);
100 }
101 }
102 }
103 }
104 }
105 }
106
107 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
108 Box::new(move |msg: &Message| msg.is_text())
109 }
110}
111
112#[derive(Clone)]
113pub struct EchoHandle {
114 sender: AsyncSender<String>,
115 receiver: AsyncReceiver<String>,
116}
117
118impl EchoHandle {
119 pub async fn echo(&self, msg: String) -> CoreResult<String> {
120 let _ = self.sender.send(msg).await;
121 println!("In side echo handle, waiting for response...");
122 Ok(self.receiver.recv().await?)
123 }
124}
125
126// --- ApiModule 2: StreamModule ---
127pub struct StreamModule {
128 msg_rx: AsyncReceiver<Arc<Message>>,
129 cmd_rx: AsyncReceiver<bool>,
130 cmd_tx: AsyncSender<String>,
131 send: AtomicBool,
132}
133
134#[async_trait]
135impl ApiModule<()> for StreamModule {
136 type Command = bool;
137 type CommandResponse = String;
138 type Handle = StreamHandle;
139
140 fn new(
141 _state: Arc<()>,
142 cmd_rx: AsyncReceiver<Self::Command>,
143 cmd_ret_tx: AsyncSender<Self::CommandResponse>,
144 msg_rx: AsyncReceiver<Arc<Message>>,
145 _to_ws: AsyncSender<Message>,
146 _: AsyncSender<RunnerCommand>,
147 ) -> Self {
148 Self {
149 msg_rx,
150 cmd_tx: cmd_ret_tx,
151 cmd_rx,
152 send: AtomicBool::new(false),
153 }
154 }
155
156 fn create_handle(
157 sender: AsyncSender<Self::Command>,
158 receiver: AsyncReceiver<Self::CommandResponse>,
159 ) -> Self::Handle {
160 StreamHandle { sender, receiver }
161 }
162
163 async fn run(&mut self) -> CoreResult<()> {
164 loop {
165 tokio::select! {
166 Ok(cmd) = self.cmd_rx.recv() => {
167 // Update the send flag based on the received command
168 self.send.store(cmd, Ordering::SeqCst);
169 }
170 Ok(msg) = self.msg_rx.recv() => {
171 if let Message::Text(txt) = &*msg {
172 if self.send.load(Ordering::SeqCst) {
173 // Process the message if send is true
174 println!("[StreamModule] Received: {txt}");
175 let _ = self.cmd_tx.send(txt.to_string()).await;
176 }
177 }
178 }
179 else => {
180 println!("[Error] StreamModule: Channel closed");
181 },
182 }
183 }
184 }
185
186 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
187 Box::new(move |_msg: &Message| {
188 // Accept all messages
189 true
190 })
191 }
192}
193
194#[derive(Clone)]
195pub struct StreamHandle {
196 receiver: AsyncReceiver<String>,
197 sender: AsyncSender<bool>,
198}
199
200impl StreamHandle {
201 pub async fn stream(self) -> CoreResult<impl Stream<Item = CoreResult<String>>> {
202 self.sender.send(true).await?;
203 println!("StreamHandle: Waiting for messages...");
204 Ok(Box::pin(unfold(self.receiver, |state| async move {
205 let item = state.recv().await.map_err(CoreError::from);
206 Some((item, state))
207 })))
208 }
209}
210
211// --- ApiModule 3: PeriodicSenderModule ---
212pub struct PeriodicSenderModule {
213 cmd_rx: AsyncReceiver<bool>,
214 to_ws: AsyncSender<Message>,
215 running: AtomicBool,
216}
217
218#[async_trait]
219impl ApiModule<()> for PeriodicSenderModule {
220 type Command = bool; // true = start, false = stop
221 type CommandResponse = ();
222 type Handle = PeriodicSenderHandle;
223
224 fn new(
225 _state: Arc<()>,
226 cmd_rx: AsyncReceiver<Self::Command>,
227 _cmd_ret_tx: AsyncSender<Self::CommandResponse>,
228 _msg_rx: AsyncReceiver<Arc<Message>>,
229 to_ws: AsyncSender<Message>,
230 _: AsyncSender<RunnerCommand>,
231 ) -> Self {
232 Self {
233 cmd_rx,
234 to_ws,
235 running: AtomicBool::new(false),
236 }
237 }
238
239 fn create_handle(
240 sender: AsyncSender<Self::Command>,
241 _receiver: AsyncReceiver<Self::CommandResponse>,
242 ) -> Self::Handle {
243 PeriodicSenderHandle { sender }
244 }
245
246 async fn run(&mut self) -> CoreResult<()> {
247 let to_ws = self.to_ws.clone();
248 let mut interval = tokio::time::interval(Duration::from_secs(5));
249 loop {
250 tokio::select! {
251 Ok(cmd) = self.cmd_rx.recv() => {
252 self.running.store(cmd, Ordering::SeqCst);
253 }
254 _ = interval.tick() => {
255 if self.running.load(Ordering::SeqCst) {
256 let _ = to_ws.send(Message::text("Ping from periodic sender")).await;
257 }
258 }
259 }
260 }
261 }More examples
78 async fn run(&mut self) -> CoreResult<()> {
79 loop {
80 tokio::select! {
81 Ok(cmd) = self.cmd_rx.recv() => {
82 let _ = self.to_ws.send(Message::text(cmd)).await;
83 self.echo.store(true, Ordering::SeqCst);
84 }
85 Ok(msg) = self.msg_rx.recv() => {
86 if let Message::Text(txt) = &*msg {
87 if self.echo.load(Ordering::SeqCst) {
88 let _ = self.cmd_tx.send(txt.to_string()).await;
89 self.echo.store(false, Ordering::SeqCst);
90 }
91 }
92 }
93 }
94 }
95 }
96
97 fn rule(_: Arc<()>) -> Box<dyn Rule + Send + Sync> {
98 Box::new(move |msg: &Message| {
99 println!("Routing rule for EchoModule: {msg:?}");
100 msg.is_text()
101 })
102 }
103}
104
105#[derive(Clone)]
106pub struct EchoHandle {
107 sender: AsyncSender<String>,
108 receiver: AsyncReceiver<String>,
109}
110
111impl EchoHandle {
112 pub async fn echo(&self, msg: String) -> CoreResult<String> {
113 let _ = self.sender.send(msg).await;
114 println!("In side echo handle, waiting for response...");
115 Ok(self.receiver.recv().await?)
116 }Sourcepub fn stream(&self) -> ReceiveStream<'_, T>
pub fn stream(&self) -> ReceiveStream<'_, T>
Creates a asynchronous stream for the channel to receive messages,
ReceiveStream borrows the AsyncReceiver, after dropping it,
receiver will be available and usable again.
§Examples
// import to be able to use stream.next() function
use futures::stream::StreamExt;
// import to be able to use stream.is_terminated() function
use futures::stream::FusedStream;
let (s, r) = kanal::unbounded_async();
co(async move {
for i in 0..100 {
s.send(i).await.unwrap();
}
});
let mut stream = r.stream();
assert!(!stream.is_terminated());
for i in 0..100 {
assert_eq!(stream.next().await, Some(i));
}
// Stream will return None after it is terminated, and there is no other sender.
assert_eq!(stream.next().await, None);
assert!(stream.is_terminated());Sourcepub fn try_recv(&self) -> Result<Option<T>, ReceiveError>
pub fn try_recv(&self) -> Result<Option<T>, ReceiveError>
Tries receiving from the channel without waiting on the waitlist.
It returns Ok(Some(T)) in case of successful operation and
Ok(None) for a failed one, or error in case that channel is
closed. Important note: this function is not lock-free as it
acquires a mutex guard of the channel internal for a short time.
§Examples
loop {
if let Some(name)=r.try_recv()?{
println!("Hello {}!",name);
break;
}
}Sourcepub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError>
pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError>
Tries receiving from the channel without waiting on the waitlist or
waiting for channel internal lock. It returns Ok(Some(T)) in case
of successful operation and Ok(None) for a failed one, or error in
case that channel is closed. Do not use this function unless you
know exactly what you are doing.
§Examples
loop {
if let Some(name)=r.try_recv_realtime()?{
println!("Hello {}!",name);
break;
}
}Sourcepub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError>
pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError>
Drains all available messages from the channel into the provided vector and returns the number of received messages.
The function is designed to be non-blocking, meaning it only processes messages that are readily available and returns immediately with whatever messages are present. It provides a count of received messages, which could be zero if no messages are available at the time of the call.
When using this function, it’s a good idea to check if the returned count is zero to avoid busy-waiting in a loop.
If blocking behavior is desired when the count is zero, you can use the recv() function if count is zero. For efficiency,
reusing the same vector across multiple calls can help minimize memory allocations. Between uses, you can clear
the vector with vec.clear() to prepare it for the next set of messages.
§Examples
let mut buf = Vec::with_capacity(1000);
loop {
if let Ok(count) = r.drain_into(&mut buf) {
if count == 0 {
// count is 0, to avoid busy-wait using recv for
// the first next message
if let Ok(v) = r.recv() {
buf.push(v);
} else {
break;
}
}
// use buffer
buf.iter().for_each(|v| println!("{}",v));
}else{
println!("Channel closed");
break;
}
buf.clear();
}Sourcepub fn is_disconnected(&self) -> bool
pub fn is_disconnected(&self) -> bool
Returns, whether the send side of the channel, is closed or not.
§Examples
let (s, r) = kanal::unbounded::<u64>();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);Sourcepub fn is_terminated(&self) -> bool
pub fn is_terminated(&self) -> bool
Returns, whether the channel receive side is terminated, and will not return any result in future recv calls.
§Examples
let (s, r) = kanal::unbounded::<u64>();
s.send(1).unwrap();
drop(s); // drop sender and disconnect the send side from the channel
assert_eq!(r.is_disconnected(),true);
// Also channel is closed from send side, it's not terminated as there is data in channel queue
assert_eq!(r.is_terminated(),false);
assert_eq!(r.recv().unwrap(),1);
// Now channel receive side is terminated as there is no sender for channel and queue is empty
assert_eq!(r.is_terminated(),true);Sourcepub fn clone_sync(&self) -> Receiver<T>
pub fn clone_sync(&self) -> Receiver<T>
Returns sync cloned version of the receiver.
§Examples
let (s, r) = kanal::unbounded_async();
s.send(1).await?;
let sync_receiver=r.clone_sync();
// JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
assert_eq!(sync_receiver.recv()?,1);Sourcepub fn to_sync(self) -> Receiver<T>
pub fn to_sync(self) -> Receiver<T>
Converts AsyncReceiver to Receiver and returns it.
§Examples
let (s, r) = kanal::bounded_async(0);
// move to sync environment
std::thread::spawn(move || {
let r=r.to_sync();
let name=r.recv()?;
println!("Hello {}!",name);
anyhow::Ok(())
});
s.send("World").await?;Sourcepub fn as_sync(&self) -> &Receiver<T>
pub fn as_sync(&self) -> &Receiver<T>
Borrows AsyncReceiver as Receiver and returns it
§Examples
let (s, r) = kanal::bounded_async(0);
// move to sync environment
std::thread::spawn(move || {
let name=r.as_sync().recv()?;
println!("Hello {}!",name);
anyhow::Ok(())
});
s.send("World").await?;Sourcepub fn is_bounded(&self) -> bool
pub fn is_bounded(&self) -> bool
Returns whether the channel is bounded or not.
§Examples
let (s, r) = kanal::bounded::<u64>(0);
assert_eq!(s.is_bounded(),true);
assert_eq!(r.is_bounded(),true);let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.is_bounded(),false);
assert_eq!(r.is_bounded(),false);Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Returns length of the queue.
§Examples
let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.len(),0);
assert_eq!(r.len(),0);
s.send(10);
assert_eq!(s.len(),1);
assert_eq!(r.len(),1);Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns whether the channel queue is empty or not.
§Examples
let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.is_empty(),true);
assert_eq!(r.is_empty(),true);Sourcepub fn is_full(&self) -> bool
pub fn is_full(&self) -> bool
Returns whether the channel queue is full or not full channels will block on send and recv calls it always returns true for zero sized channels.
§Examples
let (s, r) = kanal::bounded(1);
s.send("Hi!").unwrap();
assert_eq!(s.is_full(),true);
assert_eq!(r.is_full(),true);Sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns capacity of channel (not the queue) for unbounded channels, it will return usize::MAX.
§Examples
let (s, r) = kanal::bounded::<u64>(0);
assert_eq!(s.capacity(),0);
assert_eq!(r.capacity(),0);let (s, r) = kanal::unbounded::<u64>();
assert_eq!(s.capacity(),usize::MAX);
assert_eq!(r.capacity(),usize::MAX);Sourcepub fn receiver_count(&self) -> u32
pub fn receiver_count(&self) -> u32
Returns count of alive receiver instances of the channel.
§Examples
let (s, r) = kanal::unbounded::<u64>();
let receiver_clone=r.clone();
assert_eq!(r.receiver_count(),2);Sourcepub fn sender_count(&self) -> u32
pub fn sender_count(&self) -> u32
Returns count of alive sender instances of the channel.
§Examples
let (s, r) = kanal::unbounded::<u64>();
let sender_clone=s.clone();
assert_eq!(r.sender_count(),2);Sourcepub fn close(&self) -> Result<(), CloseError>
pub fn close(&self) -> Result<(), CloseError>
Closes the channel completely on both sides and terminates waiting signals.
§Examples
let (s, r) = kanal::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close().unwrap();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);Sourcepub fn is_closed(&self) -> bool
pub fn is_closed(&self) -> bool
Returns whether the channel is closed on both side of send and receive or not.
§Examples
let (s, r) = kanal::unbounded::<u64>();
// closes channel on both sides and has same effect as r.close();
s.close();
assert_eq!(r.is_closed(),true);
assert_eq!(s.is_closed(),true);Trait Implementations§
Source§impl<T> Clone for AsyncReceiver<T>
Available on crate feature async only.
impl<T> Clone for AsyncReceiver<T>
async only.Source§fn clone(&self) -> AsyncReceiver<T>
fn clone(&self) -> AsyncReceiver<T>
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl<T> Debug for AsyncReceiver<T>
Available on crate feature async only.
impl<T> Debug for AsyncReceiver<T>
async only.