kanal_plus/lib.rs
1#![doc = include_str!("../README.md")]
2#![warn(missing_docs, missing_debug_implementations)]
3
4pub(crate) mod backoff;
5pub(crate) mod internal;
6#[cfg(not(feature = "std-mutex"))]
7pub(crate) mod mutex;
8pub(crate) mod pointer;
9
10mod error;
11#[cfg(feature = "async")]
12mod future;
13mod signal;
14
15pub use error::*;
16#[cfg(feature = "async")]
17pub use future::*;
18
19#[cfg(feature = "async")]
20use core::mem::transmute;
21use core::{
22 fmt,
23 mem::{size_of, MaybeUninit},
24 pin::pin,
25 time::Duration,
26};
27use std::{collections::VecDeque, time::Instant};
28
29use branches::unlikely;
30use internal::{acquire_internal, try_acquire_internal, Internal};
31use pointer::KanalPtr;
32use signal::*;
33
34/// Sending side of the channel with sync API. It's possible to convert it to
35/// async [`AsyncSender`] with `as_async`, `to_async` or `clone_async` based on
36/// software requirement.
37#[cfg_attr(
38 feature = "async",
39 doc = r##"
40# Examples
41
42```
43let (sender, _r) = kanal_plus::bounded::<u64>(0);
44let sync_sender=sender.clone_async();
45```
46"##
47)]
48#[repr(C)]
49pub struct Sender<T> {
50 internal: Internal<T>,
51}
52
53/// Sending side of the channel with async API. It's possible to convert it to
54/// sync [`Sender`] with `as_sync`, `to_sync` or `clone_sync` based on software
55/// requirement.
56///
57/// # Examples
58///
59/// ```
60/// let (sender, _r) = kanal_plus::bounded_async::<u64>(0);
61/// let sync_sender=sender.clone_sync();
62/// ```
63#[cfg(feature = "async")]
64#[repr(C)]
65pub struct AsyncSender<T> {
66 internal: Internal<T>,
67}
68
69impl<T> Drop for Sender<T> {
70 fn drop(&mut self) {
71 self.internal.drop_send();
72 }
73}
74
75#[cfg(feature = "async")]
76impl<T> Drop for AsyncSender<T> {
77 fn drop(&mut self) {
78 self.internal.drop_send();
79 }
80}
81
82impl<T> Clone for Sender<T> {
83 fn clone(&self) -> Self {
84 Self {
85 internal: self.internal.clone_send(),
86 }
87 }
88}
89
90impl<T> fmt::Debug for Sender<T> {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 write!(f, "Sender {{ .. }}")
93 }
94}
95
96#[cfg(feature = "async")]
97impl<T> Clone for AsyncSender<T> {
98 fn clone(&self) -> Self {
99 Self {
100 internal: self.internal.clone_send(),
101 }
102 }
103}
104
105#[cfg(feature = "async")]
106impl<T> fmt::Debug for AsyncSender<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 write!(f, "AsyncSender {{ .. }}")
109 }
110}
111
112macro_rules! check_recv_closed_timeout {
113 ($internal:ident,$data:ident) => {
114 if unlikely($internal.recv_count == 0) {
115 // Avoid wasting lock time on dropping failed send object
116 drop($internal);
117 return Err(SendTimeoutError::Closed($data));
118 }
119 };
120}
121
122macro_rules! shared_impl {
123 () => {
124 /// Returns whether the channel is bounded or not.
125 ///
126 /// # Examples
127 ///
128 /// ```
129 /// let (s, r) = kanal_plus::bounded::<u64>(0);
130 /// assert_eq!(s.is_bounded(),true);
131 /// assert_eq!(r.is_bounded(),true);
132 /// ```
133 /// ```
134 /// let (s, r) = kanal_plus::unbounded::<u64>();
135 /// assert_eq!(s.is_bounded(),false);
136 /// assert_eq!(r.is_bounded(),false);
137 /// ```
138 pub fn is_bounded(&self) -> bool {
139 self.internal.capacity() != usize::MAX
140 }
141 /// Returns length of the queue.
142 ///
143 /// # Examples
144 ///
145 /// ```
146 /// let (s, r) = kanal_plus::unbounded::<u64>();
147 /// assert_eq!(s.len(),0);
148 /// assert_eq!(r.len(),0);
149 /// s.send(10);
150 /// assert_eq!(s.len(),1);
151 /// assert_eq!(r.len(),1);
152 /// ```
153 pub fn len(&self) -> usize {
154 acquire_internal(&self.internal).queue.len()
155 }
156 /// Returns whether the channel queue is empty or not.
157 ///
158 /// # Examples
159 ///
160 /// ```
161 /// let (s, r) = kanal_plus::unbounded::<u64>();
162 /// assert_eq!(s.is_empty(),true);
163 /// assert_eq!(r.is_empty(),true);
164 /// ```
165 pub fn is_empty(&self) -> bool {
166 acquire_internal(&self.internal).queue.is_empty()
167 }
168 /// Returns whether the channel queue is full or not
169 /// full channels will block on send and recv calls
170 /// it always returns true for zero sized channels.
171 ///
172 /// # Examples
173 ///
174 /// ```
175 /// let (s, r) = kanal_plus::bounded(1);
176 /// s.send("Hi!").unwrap();
177 /// assert_eq!(s.is_full(),true);
178 /// assert_eq!(r.is_full(),true);
179 /// ```
180 pub fn is_full(&self) -> bool {
181 self.internal.capacity() == acquire_internal(&self.internal).queue.len()
182 }
183 /// Returns capacity of channel (not the queue)
184 /// for unbounded channels, it will return usize::MAX.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// let (s, r) = kanal_plus::bounded::<u64>(0);
190 /// assert_eq!(s.capacity(),0);
191 /// assert_eq!(r.capacity(),0);
192 /// ```
193 /// ```
194 /// let (s, r) = kanal_plus::unbounded::<u64>();
195 /// assert_eq!(s.capacity(),usize::MAX);
196 /// assert_eq!(r.capacity(),usize::MAX);
197 /// ```
198 pub fn capacity(&self) -> usize {
199 self.internal.capacity()
200 }
201 /// Returns count of alive receiver instances of the channel.
202 ///
203 /// # Examples
204 ///
205 /// ```
206 /// let (s, r) = kanal_plus::unbounded::<u64>();
207 /// let receiver_clone=r.clone();
208 /// assert_eq!(r.receiver_count(),2);
209 /// ```
210 pub fn receiver_count(&self) -> usize {
211 acquire_internal(&self.internal).recv_count as usize
212 }
213 /// Returns count of alive sender instances of the channel.
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// let (s, r) = kanal_plus::unbounded::<u64>();
219 /// let sender_clone=s.clone();
220 /// assert_eq!(r.sender_count(),2);
221 /// ```
222 pub fn sender_count(&self) -> usize {
223 acquire_internal(&self.internal).send_count as usize
224 }
225 /// Closes the channel completely on both sides and terminates waiting
226 /// signals.
227 ///
228 /// # Examples
229 ///
230 /// ```
231 /// let (s, r) = kanal_plus::unbounded::<u64>();
232 /// // closes channel on both sides and has same effect as r.close();
233 /// s.close().unwrap();
234 /// assert_eq!(r.is_closed(),true);
235 /// assert_eq!(s.is_closed(),true);
236 /// ```
237 pub fn close(&self) -> Result<(), CloseError> {
238 let mut internal = acquire_internal(&self.internal);
239 if unlikely(internal.recv_count == 0 && internal.send_count == 0) {
240 return Err(CloseError());
241 }
242 internal.recv_count = 0;
243 internal.send_count = 0;
244 internal.terminate_signals();
245 internal.queue.clear();
246 Ok(())
247 }
248 /// Returns whether the channel is closed on both side of send and
249 /// receive or not.
250 ///
251 /// # Examples
252 ///
253 /// ```
254 /// let (s, r) = kanal_plus::unbounded::<u64>();
255 /// // closes channel on both sides and has same effect as r.close();
256 /// s.close();
257 /// assert_eq!(r.is_closed(),true);
258 /// assert_eq!(s.is_closed(),true);
259 /// ```
260 pub fn is_closed(&self) -> bool {
261 let internal = acquire_internal(&self.internal);
262 internal.send_count == 0 && internal.recv_count == 0
263 }
264 };
265}
266
267macro_rules! shared_send_impl {
268 () => {
269 /// Tries sending to the channel without waiting on the waitlist, if
270 /// send fails then the object will be dropped. It returns `Ok(true)` in
271 /// case of a successful operation and `Ok(false)` for a failed one, or
272 /// error in case that channel is closed. Important note: this function
273 /// is not lock-free as it acquires a mutex guard of the channel
274 /// internal for a short time.
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// # use std::thread::spawn;
280 /// let (s, r) = kanal_plus::bounded(0);
281 /// let t=spawn( move || {
282 /// loop{
283 /// if s.try_send(1).is_ok() {
284 /// break;
285 /// }
286 /// }
287 /// });
288 /// assert_eq!(r.recv()?,1);
289 /// # t.join();
290 /// # anyhow::Ok(())
291 /// ```
292 #[inline(always)]
293 pub fn try_send(&self, data: T) -> Result<(), SendTimeoutError<T>> {
294 let cap = self.internal.capacity();
295 let mut internal = acquire_internal(&self.internal);
296 check_recv_closed_timeout!(internal, data);
297 if let Some(first) = internal.next_recv() {
298 drop(internal);
299 // SAFETY: it's safe to send to owned signal once
300 unsafe { first.send(data) }
301 return Ok(());
302 }
303 if cap > 0 && internal.queue.len() < cap {
304 internal.queue.push_back(data);
305 return Ok(());
306 }
307 Err(SendTimeoutError::Timeout(data))
308 }
309
310 /// Tries sending to the channel without waiting on the waitlist or for
311 /// the internal mutex, if send fails then the object will be dropped.
312 /// It returns `Ok(true)` in case of a successful operation and
313 /// `Ok(false)` for a failed one, or error in case that channel is
314 /// closed. Do not use this function unless you know exactly what you
315 /// are doing.
316 ///
317 /// # Examples
318 ///
319 /// ```
320 /// # use std::thread::spawn;
321 /// let (s, r) = kanal_plus::bounded(0);
322 /// let t=spawn( move || {
323 /// loop{
324 /// if s.try_send_realtime(1).is_ok() {
325 /// break;
326 /// }
327 /// }
328 /// });
329 /// assert_eq!(r.recv()?,1);
330 /// # t.join();
331 /// # anyhow::Ok(())
332 /// ```
333 #[inline(always)]
334 pub fn try_send_realtime(&self, data: T) -> Result<(), SendTimeoutError<T>> {
335 let cap = self.internal.capacity();
336 if let Some(mut internal) = try_acquire_internal(&self.internal) {
337 check_recv_closed_timeout!(internal, data);
338 if let Some(first) = internal.next_recv() {
339 drop(internal);
340 // SAFETY: it's safe to send to owned signal once
341 unsafe { first.send(data) }
342 return Ok(());
343 }
344 if cap > 0 && internal.queue.len() < cap {
345 internal.queue.push_back(data);
346 return Ok(());
347 }
348 }
349 Err(SendTimeoutError::Timeout(data))
350 }
351
352 /// Returns whether the receive side of the channel is closed or not.
353 ///
354 /// # Examples
355 ///
356 /// ```
357 /// let (s, r) = kanal_plus::unbounded::<u64>();
358 /// drop(r); // drop receiver and disconnect the receive side from the channel
359 /// assert_eq!(s.is_disconnected(),true);
360 /// # anyhow::Ok(())
361 /// ```
362 pub fn is_disconnected(&self) -> bool {
363 acquire_internal(&self.internal).recv_count == 0
364 }
365 };
366}
367
368macro_rules! shared_recv_impl {
369 () => {
370 /// Tries receiving from the channel without waiting on the waitlist.
371 /// It returns `Ok(Some(T))` in case of successful operation and
372 /// `Ok(None)` for a failed one, or error in case that channel is
373 /// closed. Important note: this function is not lock-free as it
374 /// acquires a mutex guard of the channel internal for a short time.
375 ///
376 /// # Examples
377 ///
378 /// ```
379 /// # use std::thread::spawn;
380 /// # let (s, r) = kanal_plus::bounded(0);
381 /// # let t=spawn(move || {
382 /// # s.send("Buddy")?;
383 /// # anyhow::Ok(())
384 /// # });
385 /// loop {
386 /// if let Some(name)=r.try_recv()?{
387 /// println!("Hello {}!",name);
388 /// break;
389 /// }
390 /// }
391 /// # t.join();
392 /// # anyhow::Ok(())
393 /// ```
394 #[inline(always)]
395 pub fn try_recv(&self) -> Result<Option<T>, ReceiveError> {
396 let cap = self.internal.capacity();
397 let mut internal = acquire_internal(&self.internal);
398 if unlikely(internal.recv_count == 0) {
399 return Err(ReceiveError());
400 }
401 if cap > 0 {
402 if let Some(v) = internal.queue.pop_front() {
403 if let Some(p) = internal.next_send() {
404 // if there is a sender take its data and push it into the
405 // queue Safety: it's safe to receive from owned
406 // signal once
407 unsafe { internal.queue.push_back(p.recv()) }
408 }
409 return Ok(Some(v));
410 }
411 }
412 if let Some(p) = internal.next_send() {
413 // SAFETY: it's safe to receive from owned signal once
414 drop(internal);
415 return unsafe { Ok(Some(p.recv())) };
416 }
417 if unlikely(internal.send_count == 0) {
418 return Err(ReceiveError());
419 }
420 Ok(None)
421 // if the queue is not empty send the data
422 }
423 /// Tries receiving from the channel without waiting on the waitlist or
424 /// waiting for channel internal lock. It returns `Ok(Some(T))` in case
425 /// of successful operation and `Ok(None)` for a failed one, or error in
426 /// case that channel is closed. Do not use this function unless you
427 /// know exactly what you are doing.
428 ///
429 /// # Examples
430 ///
431 /// ```
432 /// # use std::thread::spawn;
433 /// # let (s, r) = kanal_plus::bounded(0);
434 /// # let t=spawn(move || {
435 /// # s.send("Buddy")?;
436 /// # anyhow::Ok(())
437 /// # });
438 /// loop {
439 /// if let Some(name)=r.try_recv_realtime()?{
440 /// println!("Hello {}!",name);
441 /// break;
442 /// }
443 /// }
444 /// # t.join();
445 /// # anyhow::Ok(())
446 /// ```
447 #[inline(always)]
448 pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError> {
449 let cap = self.internal.capacity();
450 if let Some(mut internal) = try_acquire_internal(&self.internal) {
451 if unlikely(internal.recv_count == 0) {
452 return Err(ReceiveError());
453 }
454 if cap > 0 {
455 if let Some(v) = internal.queue.pop_front() {
456 if let Some(p) = internal.next_send() {
457 // if there is a sender take its data and push it into
458 // the queue Safety: it's safe to
459 // receive from owned signal once
460 unsafe { internal.queue.push_back(p.recv()) }
461 }
462 return Ok(Some(v));
463 }
464 }
465 if let Some(p) = internal.next_send() {
466 // SAFETY: it's safe to receive from owned signal once
467 drop(internal);
468 return unsafe { Ok(Some(p.recv())) };
469 }
470 if unlikely(internal.send_count == 0) {
471 return Err(ReceiveError());
472 }
473 }
474 Ok(None)
475 }
476
477 /// Drains all available messages from the channel into the provided vector and
478 /// returns the number of received messages.
479 ///
480 /// The function is designed to be non-blocking, meaning it only processes
481 /// messages that are readily available and returns immediately with whatever
482 /// messages are present. It provides a count of received messages, which could
483 /// be zero if no messages are available at the time of the call.
484 ///
485 /// When using this function, it’s a good idea to check if the returned count is
486 /// zero to avoid busy-waiting in a loop. If blocking behavior is desired when
487 /// the count is zero, you can use the `recv()` function if count is zero. For
488 /// efficiency, reusing the same vector across multiple calls can help minimize
489 /// memory allocations. Between uses, you can clear the vector with
490 /// `vec.clear()` to prepare it for the next set of messages.
491 ///
492 /// # Examples
493 ///
494 /// ```
495 /// # use std::thread::spawn;
496 /// # let (s, r) = kanal_plus::bounded(1000);
497 /// # let t=spawn(move || {
498 /// # for i in 0..1000 {
499 /// # s.send(i)?;
500 /// # }
501 /// # anyhow::Ok(())
502 /// # });
503 ///
504 /// let mut buf = Vec::with_capacity(1000);
505 /// loop {
506 /// if let Ok(count) = r.drain_into(&mut buf) {
507 /// if count == 0 {
508 /// // count is 0, to avoid busy-wait using recv for
509 /// // the first next message
510 /// if let Ok(v) = r.recv() {
511 /// buf.push(v);
512 /// } else {
513 /// break;
514 /// }
515 /// }
516 /// // use buffer
517 /// buf.iter().for_each(|v| println!("{}",v));
518 /// }else{
519 /// println!("Channel closed");
520 /// break;
521 /// }
522 /// buf.clear();
523 /// }
524 /// # t.join();
525 /// # anyhow::Ok(())
526 /// ```
527 pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError> {
528 let vec_initial_length = vec.len();
529 let remaining_cap = vec.capacity() - vec_initial_length;
530 let mut internal = acquire_internal(&self.internal);
531 if unlikely(internal.recv_count == 0) {
532 return Err(ReceiveError());
533 }
534 let required_cap = internal.queue.len() + {
535 if internal.recv_blocking {
536 0
537 } else {
538 internal.wait_list.len()
539 }
540 };
541 if required_cap > remaining_cap {
542 vec.reserve(vec_initial_length + required_cap - remaining_cap);
543 }
544 while let Some(v) = internal.queue.pop_front() {
545 vec.push(v);
546 }
547 while let Some(p) = internal.next_send() {
548 // SAFETY: it's safe to receive from owned signal once
549 unsafe { vec.push(p.recv()) }
550 }
551 Ok(required_cap)
552 }
553
554 /// Returns, whether the send side of the channel, is closed or not.
555 ///
556 /// # Examples
557 ///
558 /// ```
559 /// let (s, r) = kanal_plus::unbounded::<u64>();
560 /// drop(s); // drop sender and disconnect the send side from the channel
561 /// assert_eq!(r.is_disconnected(),true);
562 /// ```
563 pub fn is_disconnected(&self) -> bool {
564 acquire_internal(&self.internal).send_count == 0
565 }
566
567 /// Returns, whether the channel receive side is terminated, and will
568 /// not return any result in future recv calls.
569 ///
570 /// # Examples
571 ///
572 /// ```
573 /// let (s, r) = kanal_plus::unbounded::<u64>();
574 /// s.send(1).unwrap();
575 /// drop(s); // drop sender and disconnect the send side from the channel
576 /// assert_eq!(r.is_disconnected(),true);
577 /// // Also channel is closed from send side, it's not terminated as there is data in channel queue
578 /// assert_eq!(r.is_terminated(),false);
579 /// assert_eq!(r.recv().unwrap(),1);
580 /// // Now channel receive side is terminated as there is no sender for channel and queue is empty
581 /// assert_eq!(r.is_terminated(),true);
582 /// ```
583 pub fn is_terminated(&self) -> bool {
584 let internal = acquire_internal(&self.internal);
585 internal.send_count == 0 && internal.queue.len() == 0
586 }
587 };
588}
589
590impl<T> Sender<T> {
591 /// Sends data to the channel.
592 ///
593 /// # Examples
594 ///
595 /// ```
596 /// # use std::thread::spawn;
597 /// # let (s, r) = kanal_plus::bounded(0);
598 /// # spawn(move || {
599 /// s.send("Hello").unwrap();
600 /// # anyhow::Ok(())
601 /// # });
602 /// # let name=r.recv()?;
603 /// # println!("Hello {}!",name);
604 /// # anyhow::Ok(())
605 /// ```
606 #[inline(always)]
607 pub fn send(&self, data: T) -> Result<(), SendError<T>> {
608 let cap = self.internal.capacity();
609 let mut internal = acquire_internal(&self.internal);
610 if unlikely(internal.recv_count == 0) {
611 drop(internal);
612 return Err(SendError(data));
613 }
614 if let Some(first) = internal.next_recv() {
615 drop(internal);
616 // SAFETY: it's safe to send to owned signal once
617 unsafe { first.send(data) }
618 return Ok(());
619 }
620 if cap > 0 && internal.queue.len() < cap {
621 // SAFETY: MaybeUninit is acting like a ManuallyDrop
622 internal.queue.push_back(data);
623 return Ok(());
624 }
625 let mut data = MaybeUninit::new(data);
626 // send directly to the waitlist
627 let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
628 internal.push_signal(sig.dynamic_ptr());
629 drop(internal);
630 if unlikely(!sig.wait()) {
631 // SAFETY: data failed to move, sender should drop it if it
632 // needs to
633
634 return Err(SendError(unsafe { data.assume_init() }));
635 }
636 Ok(())
637
638 // if the queue is not empty send the data
639 }
640
641 /// Sends multiple elements from a `VecDeque` into the channel.
642 ///
643 /// This method attempts to push as many items from `elements` as possible,
644 /// respecting the channel’s capacity and the current state of the receiver
645 /// side. It behaves similarly to repeatedly calling `send` for each
646 /// element, but is more efficient because it holds the internal lock
647 /// only while it can make progress.
648 ///
649 /// * If the channel is closed (no receivers), the first element that cannot
650 /// be sent is returned inside `SendError`.
651 /// * If the channel’s queue becomes full, mutex guard will be released and
652 /// remaining elements stay in the supplied `VecDeque` to be send in a
653 /// signal.
654 /// * Elements are taken from the front of the deque (FIFO order). When the
655 /// internal queue has spare capacity, elements are moved from the back of
656 /// the deque into the internal queue to fill it as quickly as possible.
657 ///
658 /// # Examples
659 ///
660 /// ```rust
661 /// use std::collections::VecDeque;
662 /// // Create a bounded channel with capacity 3
663 /// let (s, r) = kanal_plus::bounded::<i32>(3);
664 ///
665 /// // Move the sender and the buffer into a new thread that will
666 /// // push as many items as the channel can accept.
667 /// let handle = std::thread::spawn(move || {
668 /// /// // Prepare a deque with several values
669 /// let mut buf = VecDeque::from(vec![1, 2, 3, 4, 5]);
670 /// // `send_many` consumes items from the front of the deque.
671 /// // It returns `Ok(())` when all possible items have been sent
672 /// // or `Err` if the channel is closed. Here we unwrap the result
673 /// // because the channel stays alive for the whole test.
674 /// s.send_many(&mut buf).unwrap();
675 ///
676 /// // Return the (now‑partially‑filled) buffer so the main thread can
677 /// // inspect the remaining elements.
678 /// buf
679 /// });
680 ///
681 /// // In the current thread we receive the three items that fit into the
682 /// // channel's capacity.
683 /// assert_eq!(r.recv().unwrap(), 1);
684 /// assert_eq!(r.recv().unwrap(), 2);
685 /// assert_eq!(r.recv().unwrap(), 3);
686 ///
687 /// std::thread::sleep(std::time::Duration::from_millis(100));
688 ///
689 /// // Sender now written two more items into the channel queue and exited.
690 /// let remaining = handle.join().expect("sender thread panicked");
691 ///
692 /// assert_eq!(r.len(), 2);
693 /// assert_eq!(r.recv().unwrap(), 4);
694 /// assert_eq!(r.recv().unwrap(), 5);
695 /// ```
696 ///
697 /// The function returns `Ok(())` when all elements have been successfully
698 /// transferred, or `Err(SendError<T>)` containing the first element that
699 /// could not be sent (typically because the receiver side has been
700 /// closed).
701 pub fn send_many(&self, elements: &mut VecDeque<T>) -> Result<(), SendError<T>> {
702 if unlikely(elements.is_empty()) {
703 return Ok(());
704 }
705 let cap = self.internal.capacity();
706 loop {
707 let mut internal = acquire_internal(&self.internal);
708 if unlikely(internal.recv_count == 0) {
709 drop(internal);
710 return Err(SendError(elements.pop_front().unwrap()));
711 }
712 while let Some(first) = internal.next_recv() {
713 // SAFETY: it's safe to send to owned signal once
714 unsafe {
715 first.send(elements.pop_front().unwrap());
716 }
717 if unlikely(elements.is_empty()) {
718 return Ok(());
719 }
720 }
721 if cap > 0 {
722 while internal.queue.len() < cap {
723 if let Some(v) = elements.pop_front() {
724 internal.queue.push_back(v);
725 } else {
726 return Ok(());
727 }
728 }
729 if unlikely(elements.is_empty()) {
730 return Ok(());
731 }
732 }
733 let mut data = MaybeUninit::new(elements.pop_front().unwrap());
734 // send directly to the waitlist
735 let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
736 internal.recv_blocking = false;
737 internal.push_signal(sig.dynamic_ptr());
738 drop(internal);
739 if unlikely(!sig.wait()) {
740 // SAFETY: data failed to move, sender should drop it if it
741 // needs to
742 return Err(SendError(unsafe { data.assume_init() }));
743 }
744 if unlikely(elements.is_empty()) {
745 return Ok(());
746 }
747 }
748 }
749
750 /// Sends data to the channel with a deadline, if send fails then the object
751 /// will be dropped. you can use send_option_timeout if you like to keep
752 /// the object in case of timeout.
753 ///
754 /// # Examples
755 ///
756 /// ```
757 /// # use std::thread::spawn;
758 /// # use std::time::Duration;
759 /// # let (s, r) = kanal_plus::bounded(0);
760 /// # spawn(move || {
761 /// s.send_timeout("Hello",Duration::from_millis(500)).unwrap();
762 /// # anyhow::Ok(())
763 /// # });
764 /// # let name=r.recv()?;
765 /// # println!("Hello {}!",name);
766 /// # anyhow::Ok(())
767 /// ```
768 #[inline(always)]
769 pub fn send_timeout(&self, data: T, duration: Duration) -> Result<(), SendTimeoutError<T>> {
770 let cap = self.internal.capacity();
771 let deadline = Instant::now().checked_add(duration).unwrap();
772 let mut internal = acquire_internal(&self.internal);
773 if unlikely(internal.recv_count == 0) {
774 // Avoid wasting lock time on dropping failed send object
775 drop(internal);
776 return Err(SendTimeoutError::Closed(data));
777 }
778 if let Some(first) = internal.next_recv() {
779 drop(internal);
780 // SAFETY: it's safe to send to owned signal once
781 unsafe { first.send(data) }
782 return Ok(());
783 }
784 if cap > 0 && internal.queue.len() < cap {
785 // SAFETY: MaybeUninit is used as a ManuallyDrop, and data in it is
786 // valid.
787 internal.queue.push_back(data);
788 return Ok(());
789 }
790 let mut data = MaybeUninit::new(data);
791 // send directly to the waitlist
792 let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
793 internal.push_signal(sig.dynamic_ptr());
794 drop(internal);
795 if unlikely(!sig.wait_timeout(deadline)) {
796 if sig.is_terminated() {
797 // SAFETY: data failed to move, sender should drop it if it
798 // needs to
799 return Err(SendTimeoutError::Closed(unsafe { data.assume_init() }));
800 }
801 {
802 let mut internal = acquire_internal(&self.internal);
803 if internal.cancel_send_signal(sig.as_tagged_ptr()) {
804 // SAFETY: data failed to move, we return it to the user
805 return Err(SendTimeoutError::Timeout(unsafe { data.assume_init() }));
806 }
807 }
808 // removing receive failed to wait for the signal response
809 if unlikely(!sig.wait()) {
810 // SAFETY: data failed to move, we return it to the user
811
812 return Err(SendTimeoutError::Closed(unsafe { data.assume_init() }));
813 }
814 }
815 Ok(())
816
817 // if the queue is not empty send the data
818 }
819
820 shared_send_impl!();
821 /// Clones [`Sender`] as the async version of it and returns it
822 #[cfg(feature = "async")]
823 pub fn clone_async(&self) -> AsyncSender<T> {
824 AsyncSender::<T> {
825 internal: self.internal.clone_send(),
826 }
827 }
828
829 /// Converts [`Sender`] to [`AsyncSender`] and returns it
830 /// # Examples
831 ///
832 /// ```
833 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
834 /// # use tokio::{spawn as co};
835 /// # use std::time::Duration;
836 /// let (s, r) = kanal_plus::bounded(0);
837 /// co(async move {
838 /// let s=s.to_async();
839 /// s.send("World").await;
840 /// });
841 /// let name=r.recv()?;
842 /// println!("Hello {}!",name);
843 /// # anyhow::Ok(())
844 /// # });
845 /// ```
846 #[cfg(feature = "async")]
847 pub fn to_async(self) -> AsyncSender<T> {
848 // SAFETY: structure of Sender<T> and AsyncSender<T> is same
849 unsafe { transmute(self) }
850 }
851
852 /// Borrows [`Sender`] as [`AsyncSender`] and returns it
853 /// # Examples
854 ///
855 /// ```
856 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
857 /// # use tokio::{spawn as co};
858 /// # use std::time::Duration;
859 /// let (s, r) = kanal_plus::bounded(0);
860 /// co(async move {
861 /// s.as_async().send("World").await;
862 /// });
863 /// let name=r.recv()?;
864 /// println!("Hello {}!",name);
865 /// # anyhow::Ok(())
866 /// # });
867 /// ```
868 #[cfg(feature = "async")]
869 pub fn as_async(&self) -> &AsyncSender<T> {
870 // SAFETY: structure of Sender<T> and AsyncSender<T> is same
871 unsafe { transmute(self) }
872 }
873 shared_impl!();
874}
875
876#[cfg(feature = "async")]
877impl<T> AsyncSender<T> {
878 /// Sends data asynchronously to the channel.
879 ///
880 /// # Examples
881 ///
882 /// ```
883 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
884 /// # let (s, r) = kanal_plus::unbounded_async();
885 /// s.send(1).await?;
886 /// assert_eq!(r.recv().await?,1);
887 /// # anyhow::Ok(())
888 /// # });
889 /// ```
890 #[inline(always)]
891 pub fn send(&'_ self, data: T) -> SendFuture<'_, T> {
892 SendFuture::new(&self.internal, data)
893 }
894
895 /// Sends multiple elements from a `VecDeque` into the channel
896 /// asynchronously.
897 ///
898 /// This method consumes the provided `VecDeque` by repeatedly popping
899 /// elements from its front and sending each one over the channel. The
900 /// operation completes when the deque is empty or when the channel is
901 /// closed.
902 ///
903 /// # Examples
904 ///
905 /// ```
906 /// # use tokio::{spawn as co};
907 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
908 /// # use std::collections::VecDeque;
909 /// let (s, r) = kanal_plus::bounded_async(3);
910 /// let handle = co(async move {
911 /// let mut elems = VecDeque::from(vec![10, 20, 30, 40, 50]);
912 /// // Send all elements in the deque
913 /// s.send_many(&mut elems).await.unwrap();
914 /// });
915 ///
916 /// // Receive the values in the same order they were sent
917 /// assert_eq!(r.recv().await?, 10);
918 /// assert_eq!(r.recv().await?, 20);
919 /// assert_eq!(r.recv().await?, 30);
920 ///
921 /// //panic!("here");
922 ///
923 /// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
924 /// // Now the sender has sent the remaining elements
925 /// //handle.await.unwrap();
926 ///
927 /// assert_eq!(r.recv().await?, 40);
928 /// assert_eq!(r.recv().await?, 50);
929 ///
930 /// # anyhow::Ok(())
931 /// # });
932 /// ```
933 #[inline(always)]
934 pub fn send_many<'a, 'b>(&'a self, elements: &'b mut VecDeque<T>) -> SendManyFuture<'a, 'b, T> {
935 SendManyFuture::new(&self.internal, elements)
936 }
937
938 shared_send_impl!();
939
940 /// Clones [`AsyncSender`] as [`Sender`] with sync api of it.
941 ///
942 /// # Examples
943 ///
944 /// ```
945 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
946 /// let (s, r) = kanal_plus::unbounded_async();
947 /// let sync_sender=s.clone_sync();
948 /// // JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
949 /// sync_sender.send(1)?;
950 /// assert_eq!(r.recv().await?,1);
951 /// # anyhow::Ok(())
952 /// # });
953 /// ```
954 pub fn clone_sync(&self) -> Sender<T> {
955 Sender::<T> {
956 internal: self.internal.clone_send(),
957 }
958 }
959
960 /// Converts [`AsyncSender`] to [`Sender`] and returns it.
961 ///
962 /// # Examples
963 ///
964 /// ```
965 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
966 /// # use std::time::Duration;
967 /// let (s, r) = kanal_plus::bounded_async(0);
968 /// // move to sync environment
969 /// std::thread::spawn(move || {
970 /// let s=s.to_sync();
971 /// s.send("World")?;
972 /// anyhow::Ok(())
973 /// });
974 /// let name=r.recv().await?;
975 /// println!("Hello {}!",name);
976 /// # anyhow::Ok(())
977 /// # });
978 /// ```
979 pub fn to_sync(self) -> Sender<T> {
980 // SAFETY: structure of Sender<T> and AsyncSender<T> is same
981 unsafe { transmute(self) }
982 }
983
984 /// Borrows [`AsyncSender`] as [`Sender`] and returns it.
985 ///
986 /// # Examples
987 ///
988 /// ```
989 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
990 /// # use std::time::Duration;
991 /// let (s, r) = kanal_plus::bounded_async(0);
992 /// // move to sync environment
993 /// std::thread::spawn(move || {
994 /// s.as_sync().send("World")?;
995 /// anyhow::Ok(())
996 /// });
997 /// let name=r.recv().await?;
998 /// println!("Hello {}!",name);
999 /// # anyhow::Ok(())
1000 /// # });
1001 /// ```
1002 pub fn as_sync(&self) -> &Sender<T> {
1003 // SAFETY: structure of Sender<T> and AsyncSender<T> is same
1004 unsafe { transmute(self) }
1005 }
1006
1007 shared_impl!();
1008}
1009
1010/// Receiving side of the channel in sync mode.
1011/// Receivers can be cloned and produce receivers to operate in both sync and
1012/// async modes.
1013#[cfg_attr(
1014 feature = "async",
1015 doc = r##"
1016# Examples
1017
1018```
1019let (_s, receiver) = kanal_plus::bounded::<u64>(0);
1020let async_receiver=receiver.clone_async();
1021```
1022"##
1023)]
1024#[repr(C)]
1025pub struct Receiver<T> {
1026 internal: Internal<T>,
1027}
1028
1029impl<T> fmt::Debug for Receiver<T> {
1030 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1031 write!(f, "Receiver {{ .. }}")
1032 }
1033}
1034
1035/// [`AsyncReceiver`] is receiving side of the channel in async mode.
1036/// Receivers can be cloned and produce receivers to operate in both sync and
1037/// async modes.
1038///
1039/// # Examples
1040///
1041/// ```
1042/// let (_s, receiver) = kanal_plus::bounded_async::<u64>(0);
1043/// let sync_receiver=receiver.clone_sync();
1044/// ```
1045#[cfg(feature = "async")]
1046#[repr(C)]
1047pub struct AsyncReceiver<T> {
1048 internal: Internal<T>,
1049}
1050
1051#[cfg(feature = "async")]
1052impl<T> fmt::Debug for AsyncReceiver<T> {
1053 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1054 write!(f, "AsyncReceiver {{ .. }}")
1055 }
1056}
1057
1058impl<T> Receiver<T> {
1059 /// Receives data from the channel
1060 #[inline(always)]
1061 pub fn recv(&self) -> Result<T, ReceiveError> {
1062 let cap = self.internal.capacity();
1063 let mut internal = acquire_internal(&self.internal);
1064 if unlikely(internal.recv_count == 0) {
1065 return Err(ReceiveError());
1066 }
1067 if cap > 0 {
1068 if let Some(v) = internal.queue.pop_front() {
1069 if let Some(p) = internal.next_send() {
1070 // if there is a sender take its data and push it into the queue
1071 // SAFETY: it's safe to receive from owned signal once
1072 unsafe { internal.queue.push_back(p.recv()) }
1073 }
1074 return Ok(v);
1075 }
1076 }
1077 if let Some(p) = internal.next_send() {
1078 drop(internal);
1079 // SAFETY: it's safe to receive from owned signal once
1080 return unsafe { Ok(p.recv()) };
1081 }
1082 if unlikely(internal.send_count == 0) {
1083 return Err(ReceiveError());
1084 }
1085 // no active waiter so push to the queue
1086 let mut ret = MaybeUninit::<T>::uninit();
1087 let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
1088 ret.as_mut_ptr()
1089 )));
1090 internal.push_signal(sig.dynamic_ptr());
1091 drop(internal);
1092
1093 if unlikely(!sig.wait()) {
1094 return Err(ReceiveError());
1095 }
1096
1097 // SAFETY: it's safe to assume init as data is forgotten on another
1098 // side
1099 if size_of::<T>() > size_of::<*mut T>() {
1100 Ok(unsafe { ret.assume_init() })
1101 } else {
1102 Ok(unsafe { sig.assume_init() })
1103 }
1104
1105 // if the queue is not empty send the data
1106 }
1107 /// Tries receiving from the channel within a duration
1108 #[inline(always)]
1109 pub fn recv_timeout(&self, duration: Duration) -> Result<T, ReceiveErrorTimeout> {
1110 let cap = self.internal.capacity();
1111 let deadline = Instant::now().checked_add(duration).unwrap();
1112 let mut internal = acquire_internal(&self.internal);
1113 if unlikely(internal.recv_count == 0) {
1114 return Err(ReceiveErrorTimeout::Closed);
1115 }
1116 if cap > 0 {
1117 if let Some(v) = internal.queue.pop_front() {
1118 if let Some(p) = internal.next_send() {
1119 // if there is a sender take its data and push it into the queue
1120 // SAFETY: it's safe to receive from owned signal once
1121 unsafe { internal.queue.push_back(p.recv()) }
1122 }
1123 return Ok(v);
1124 }
1125 }
1126 if let Some(p) = internal.next_send() {
1127 drop(internal);
1128 // SAFETY: it's safe to receive from owned signal once
1129 return unsafe { Ok(p.recv()) };
1130 }
1131 if unlikely(Instant::now() > deadline) {
1132 return Err(ReceiveErrorTimeout::Timeout);
1133 }
1134 if unlikely(internal.send_count == 0) {
1135 return Err(ReceiveErrorTimeout::Closed);
1136 }
1137 // no active waiter so push to the queue
1138 let mut ret = MaybeUninit::<T>::uninit();
1139 let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
1140 ret.as_mut_ptr()
1141 )));
1142 internal.push_signal(sig.dynamic_ptr());
1143 drop(internal);
1144 if unlikely(!sig.wait_timeout(deadline)) {
1145 if sig.is_terminated() {
1146 return Err(ReceiveErrorTimeout::Closed);
1147 }
1148 {
1149 let mut internal = acquire_internal(&self.internal);
1150 if internal.cancel_recv_signal(sig.as_tagged_ptr()) {
1151 return Err(ReceiveErrorTimeout::Timeout);
1152 }
1153 }
1154 // removing receive failed to wait for the signal response
1155 if unlikely(!sig.wait()) {
1156 return Err(ReceiveErrorTimeout::Closed);
1157 }
1158 }
1159 // SAFETY: it's safe to assume init as data is forgotten on another
1160 // side
1161 if size_of::<T>() > size_of::<*mut T>() {
1162 Ok(unsafe { ret.assume_init() })
1163 } else {
1164 Ok(unsafe { sig.assume_init() })
1165 }
1166
1167 // if the queue is not empty send the data
1168 }
1169
1170 /// Drains all available messages from the channel into the provided vector,
1171 /// blocking until at least one message is received.
1172 ///
1173 /// This function combines the behavior of `drain_into` with blocking semantics:
1174 /// - If messages are available, it drains all of them and returns immediately
1175 /// - If no messages are available, it blocks the current thread until at least one message arrives
1176 ///
1177 /// Returns the number of messages received.
1178 ///
1179 /// # Examples
1180 ///
1181 /// ```
1182 /// # use std::thread::spawn;
1183 /// # let (s, r) = kanal_plus::bounded(100);
1184 /// # let t = spawn(move || {
1185 /// # for i in 0..100 {
1186 /// # s.send(i)?;
1187 /// # }
1188 /// # anyhow::Ok(())
1189 /// # });
1190 ///
1191 /// let mut buf = Vec::new();
1192 /// loop {
1193 /// match r.drain_into_blocking(&mut buf) {
1194 /// Ok(count) => {
1195 /// assert!(count > 0);
1196 /// // process buf...
1197 /// buf.clear();
1198 /// }
1199 /// Err(_) => break, // channel closed
1200 /// }
1201 /// }
1202 /// # t.join().unwrap()?;
1203 /// # anyhow::Ok(())
1204 /// ```
1205 pub fn drain_into_blocking(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError> {
1206 let vec_initial_length = vec.len();
1207 let mut internal = acquire_internal(&self.internal);
1208
1209 // Check if channel is closed
1210 if unlikely(internal.recv_count == 0) {
1211 return Err(ReceiveError());
1212 }
1213
1214 // Calculate required capacity and reserve
1215 let required_cap = internal.queue.len() + {
1216 if internal.recv_blocking {
1217 0
1218 } else {
1219 internal.wait_list.len()
1220 }
1221 };
1222 let remaining_cap = vec.capacity() - vec_initial_length;
1223 if required_cap > remaining_cap {
1224 vec.reserve(vec_initial_length + required_cap - remaining_cap);
1225 }
1226
1227 // Drain queue
1228 vec.extend(internal.queue.drain(..));
1229
1230 // Drain wait_list send signals
1231 while let Some(p) = internal.next_send() {
1232 // SAFETY: it's safe to receive from owned signal once
1233 unsafe { vec.push(p.recv()) }
1234 }
1235
1236 // If got data, return immediately
1237 let count = vec.len() - vec_initial_length;
1238 if count > 0 {
1239 return Ok(count);
1240 }
1241
1242 // No data, check if there are still senders
1243 if unlikely(internal.send_count == 0) {
1244 return Err(ReceiveError());
1245 }
1246
1247 // Register signal and wait
1248 let mut ret = MaybeUninit::<T>::uninit();
1249 let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
1250 ret.as_mut_ptr()
1251 )));
1252 internal.push_signal(sig.dynamic_ptr());
1253 drop(internal);
1254
1255 if unlikely(!sig.wait()) {
1256 return Err(ReceiveError());
1257 }
1258
1259 // Read data and return
1260 if size_of::<T>() > size_of::<*mut T>() {
1261 vec.push(unsafe { ret.assume_init() });
1262 } else {
1263 vec.push(unsafe { sig.assume_init() });
1264 }
1265 Ok(1)
1266 }
1267
1268 shared_recv_impl!();
1269 #[cfg(feature = "async")]
1270 /// Clones receiver as the async version of it
1271 pub fn clone_async(&self) -> AsyncReceiver<T> {
1272 AsyncReceiver::<T> {
1273 internal: self.internal.clone_recv(),
1274 }
1275 }
1276
1277 /// Converts [`Receiver`] to [`AsyncReceiver`] and returns it.
1278 ///
1279 /// # Examples
1280 ///
1281 /// ```
1282 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1283 /// # use tokio::{spawn as co};
1284 /// # use std::time::Duration;
1285 /// let (s, r) = kanal_plus::bounded(0);
1286 /// co(async move {
1287 /// let r=r.to_async();
1288 /// let name=r.recv().await?;
1289 /// println!("Hello {}!",name);
1290 /// anyhow::Ok(())
1291 /// });
1292 /// s.send("World")?;
1293 /// # anyhow::Ok(())
1294 /// # });
1295 /// ```
1296 #[cfg(feature = "async")]
1297 pub fn to_async(self) -> AsyncReceiver<T> {
1298 // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1299 unsafe { transmute(self) }
1300 }
1301
1302 /// Borrows [`Receiver`] as [`AsyncReceiver`] and returns it.
1303 ///
1304 /// # Examples
1305 ///
1306 /// ```
1307 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1308 /// # use tokio::{spawn as co};
1309 /// # use std::time::Duration;
1310 /// let (s, r) = kanal_plus::bounded(0);
1311 /// co(async move {
1312 /// let name=r.as_async().recv().await?;
1313 /// println!("Hello {}!",name);
1314 /// anyhow::Ok(())
1315 /// });
1316 /// s.send("World")?;
1317 /// # anyhow::Ok(())
1318 /// # });
1319 /// ```
1320 #[cfg(feature = "async")]
1321 pub fn as_async(&self) -> &AsyncReceiver<T> {
1322 // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1323 unsafe { transmute(self) }
1324 }
1325
1326 shared_impl!();
1327}
1328
1329impl<T> Iterator for Receiver<T> {
1330 type Item = T;
1331
1332 fn next(&mut self) -> Option<Self::Item> {
1333 self.recv().ok()
1334 }
1335}
1336
1337#[cfg(feature = "async")]
1338impl<T> AsyncReceiver<T> {
1339 /// Returns a [`ReceiveFuture`] to receive data from the channel
1340 /// asynchronously.
1341 ///
1342 /// # Cancellation and Polling Considerations
1343 ///
1344 /// Due to current limitations in Rust's handling of future cancellation, if
1345 /// a `ReceiveFuture` is dropped exactly at the time when new data is
1346 /// written to the channel, it may result in the loss of the received
1347 /// value. This behavior although memory-safe stems from the fact that
1348 /// Rust does not provide a built-in, correct mechanism for cancelling
1349 /// futures.
1350 ///
1351 /// Additionally, it is important to note that constructs such as
1352 /// `tokio::select!` are not correct to use with kanal async channels.
1353 /// Kanal's design does not rely on the conventional `poll` mechanism to
1354 /// read messages. Because of its internal optimizations, the future may
1355 /// complete without receiving the final poll, which prevents proper
1356 /// handling of the message.
1357 ///
1358 /// As a result, once the `ReceiveFuture` is polled for the first time
1359 /// (which registers the request to receive data), the programmer must
1360 /// commit to completing the polling process. This ensures that messages
1361 /// are correctly delivered and avoids potential race conditions associated
1362 /// with cancellation.
1363 ///
1364 /// # Examples
1365 ///
1366 /// ```
1367 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1368 /// # use tokio::{spawn as co};
1369 /// # let (s, r) = kanal_plus::bounded_async(0);
1370 /// # co(async move {
1371 /// # s.send("Buddy").await?;
1372 /// # anyhow::Ok(())
1373 /// # });
1374 /// let name=r.recv().await?;
1375 /// println!("Hello {}",name);
1376 /// # anyhow::Ok(())
1377 /// # });
1378 /// ```
1379 #[inline(always)]
1380 pub fn recv(&'_ self) -> ReceiveFuture<'_, T> {
1381 ReceiveFuture::new_ref(&self.internal)
1382 }
1383 /// Creates a asynchronous stream for the channel to receive messages,
1384 /// [`ReceiveStream`] borrows the [`AsyncReceiver`], after dropping it,
1385 /// receiver will be available and usable again.
1386 ///
1387 /// # Examples
1388 ///
1389 /// ```
1390 /// # use tokio::{spawn as co};
1391 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1392 /// // import to be able to use stream.next() function
1393 /// use futures::stream::StreamExt;
1394 /// // import to be able to use stream.is_terminated() function
1395 /// use futures::stream::FusedStream;
1396 ///
1397 /// let (s, r) = kanal_plus::unbounded_async();
1398 /// co(async move {
1399 /// for i in 0..100 {
1400 /// s.send(i).await.unwrap();
1401 /// }
1402 /// });
1403 /// let mut stream = r.stream();
1404 /// assert!(!stream.is_terminated());
1405 /// for i in 0..100 {
1406 /// assert_eq!(stream.next().await, Some(i));
1407 /// }
1408 /// // Stream will return None after it is terminated, and there is no other sender.
1409 /// assert_eq!(stream.next().await, None);
1410 /// assert!(stream.is_terminated());
1411 /// # });
1412 /// ```
1413 #[inline(always)]
1414 pub fn stream(&'_ self) -> ReceiveStream<'_, T> {
1415 ReceiveStream::new_borrowed(self)
1416 }
1417
1418 /// Creates an asynchronous stream that owns the receiver.
1419 ///
1420 /// This is useful when the stream needs to outlive the receiver borrow.
1421 #[inline(always)]
1422 pub fn into_stream(self) -> ReceiveStreamOwned<T> {
1423 ReceiveStreamOwned::new(self)
1424 }
1425
1426 /// Returns a [`DrainIntoBlockingFuture`] to drain all available messages from the channel
1427 /// into the provided vector, awaiting until at least one message is received.
1428 ///
1429 /// This function combines the behavior of `drain_into` with async semantics:
1430 /// - If messages are available, it drains all of them and returns immediately
1431 /// - If no messages are available, it awaits (yields to the async runtime) until at least one message arrives
1432 ///
1433 /// Note: The name "blocking" refers to the semantic behavior (waiting for data), not thread blocking.
1434 /// This method is fully async and will not block the thread.
1435 ///
1436 /// Returns the number of messages received.
1437 ///
1438 /// # Examples
1439 ///
1440 /// ```
1441 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1442 /// # use tokio::spawn;
1443 /// let (s, r) = kanal_plus::bounded_async(100);
1444 /// spawn(async move {
1445 /// for i in 0..100 {
1446 /// s.send(i).await.unwrap();
1447 /// }
1448 /// });
1449 ///
1450 /// let mut buf = Vec::new();
1451 /// loop {
1452 /// match r.drain_into_blocking(&mut buf).await {
1453 /// Ok(count) => {
1454 /// assert!(count > 0);
1455 /// // process buf...
1456 /// buf.clear();
1457 /// }
1458 /// Err(_) => break, // channel closed
1459 /// }
1460 /// }
1461 /// # anyhow::Ok(())
1462 /// # });
1463 /// ```
1464 #[inline(always)]
1465 pub fn drain_into_blocking<'a, 'b>(
1466 &'a self,
1467 vec: &'b mut Vec<T>,
1468 ) -> DrainIntoBlockingFuture<'a, 'b, T> {
1469 DrainIntoBlockingFuture::new(&self.internal, vec)
1470 }
1471
1472 shared_recv_impl!();
1473 /// Returns sync cloned version of the receiver.
1474 ///
1475 /// # Examples
1476 ///
1477 /// ```
1478 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1479 /// # use tokio::{spawn as co};
1480 /// let (s, r) = kanal_plus::unbounded_async();
1481 /// s.send(1).await?;
1482 /// let sync_receiver=r.clone_sync();
1483 /// // JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
1484 /// assert_eq!(sync_receiver.recv()?,1);
1485 /// # anyhow::Ok(())
1486 /// # });
1487 /// ```
1488 pub fn clone_sync(&self) -> Receiver<T> {
1489 Receiver::<T> {
1490 internal: self.internal.clone_recv(),
1491 }
1492 }
1493
1494 /// Converts [`AsyncReceiver`] to [`Receiver`] and returns it.
1495 ///
1496 /// # Examples
1497 ///
1498 /// ```
1499 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1500 /// # use std::time::Duration;
1501 /// let (s, r) = kanal_plus::bounded_async(0);
1502 /// // move to sync environment
1503 /// std::thread::spawn(move || {
1504 /// let r=r.to_sync();
1505 /// let name=r.recv()?;
1506 /// println!("Hello {}!",name);
1507 /// anyhow::Ok(())
1508 /// });
1509 /// s.send("World").await?;
1510 /// # anyhow::Ok(())
1511 /// # });
1512 /// ```
1513 pub fn to_sync(self) -> Receiver<T> {
1514 // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1515 unsafe { transmute(self) }
1516 }
1517
1518 /// Borrows [`AsyncReceiver`] as [`Receiver`] and returns it
1519 /// # Examples
1520 ///
1521 /// ```
1522 /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1523 /// # use std::time::Duration;
1524 /// let (s, r) = kanal_plus::bounded_async(0);
1525 /// // move to sync environment
1526 /// std::thread::spawn(move || {
1527 /// let name=r.as_sync().recv()?;
1528 /// println!("Hello {}!",name);
1529 /// anyhow::Ok(())
1530 /// });
1531 /// s.send("World").await?;
1532 /// # anyhow::Ok(())
1533 /// # });
1534 /// ```
1535 pub fn as_sync(&self) -> &Receiver<T> {
1536 // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1537 unsafe { transmute(self) }
1538 }
1539
1540 shared_impl!();
1541}
1542
1543impl<T> Drop for Receiver<T> {
1544 fn drop(&mut self) {
1545 self.internal.drop_recv();
1546 }
1547}
1548
1549#[cfg(feature = "async")]
1550impl<T> Drop for AsyncReceiver<T> {
1551 fn drop(&mut self) {
1552 self.internal.drop_recv();
1553 }
1554}
1555
1556impl<T> Clone for Receiver<T> {
1557 fn clone(&self) -> Self {
1558 Self {
1559 internal: self.internal.clone_recv(),
1560 }
1561 }
1562}
1563
1564#[cfg(feature = "async")]
1565impl<T> Clone for AsyncReceiver<T> {
1566 fn clone(&self) -> Self {
1567 Self {
1568 internal: self.internal.clone_recv(),
1569 }
1570 }
1571}
1572
1573/// Creates a new sync bounded channel with the requested buffer size, and
1574/// returns [`Sender`] and [`Receiver`] of the channel for type T, you can get
1575/// access to async API of [`AsyncSender`] and [`AsyncReceiver`] with `to_sync`,
1576/// `as_async` or `clone_sync` based on your requirements, by calling them on
1577/// sender or receiver.
1578///
1579/// # Examples
1580///
1581/// ```
1582/// use std::thread::spawn;
1583///
1584/// let (s, r) = kanal_plus::bounded(0); // for channel with zero size queue, this channel always block until successful send/recv
1585///
1586/// // spawn 8 threads, that will send 100 numbers to channel reader
1587/// for i in 0..8{
1588/// let s = s.clone();
1589/// spawn(move || {
1590/// for i in 1..100{
1591/// s.send(i);
1592/// }
1593/// });
1594/// }
1595/// // drop local sender so the channel send side gets closed when all of the senders finished their jobs
1596/// drop(s);
1597///
1598/// let first = r.recv().unwrap(); // receive first msg
1599/// let total: u32 = first+r.sum::<u32>(); // the receiver implements iterator so you can call sum to receive sum of rest of messages
1600/// assert_eq!(total, 39600);
1601/// ```
1602pub fn bounded<T>(size: usize) -> (Sender<T>, Receiver<T>) {
1603 let internal = Internal::new(true, size);
1604 (
1605 Sender {
1606 internal: internal.clone_unchecked(),
1607 },
1608 Receiver { internal },
1609 )
1610}
1611
1612/// Creates a new async bounded channel with the requested buffer size, and
1613/// returns [`AsyncSender`] and [`AsyncReceiver`] of the channel for type T, you
1614/// can get access to sync API of [`Sender`] and [`Receiver`] with `to_sync`,
1615/// `as_async` or `clone_sync` based on your requirements, by calling them on
1616/// async sender or receiver.
1617///
1618/// # Examples
1619///
1620/// ```
1621/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1622/// use tokio::{spawn as co};
1623///
1624/// let (s, r) = kanal_plus::bounded_async(0);
1625///
1626/// co(async move {
1627/// s.send("hello!").await?;
1628/// anyhow::Ok(())
1629/// });
1630///
1631/// assert_eq!(r.recv().await?, "hello!");
1632/// anyhow::Ok(())
1633/// # });
1634/// ```
1635#[cfg(feature = "async")]
1636pub fn bounded_async<T>(size: usize) -> (AsyncSender<T>, AsyncReceiver<T>) {
1637 let internal = Internal::new(true, size);
1638 (
1639 AsyncSender {
1640 internal: internal.clone_unchecked(),
1641 },
1642 AsyncReceiver { internal },
1643 )
1644}
1645
1646const UNBOUNDED_STARTING_SIZE: usize = 32;
1647
1648/// Creates a new sync unbounded channel, and returns [`Sender`] and
1649/// [`Receiver`] of the channel for type T, you can get access to async API
1650/// of [`AsyncSender`] and [`AsyncReceiver`] with `to_sync`, `as_async` or
1651/// `clone_sync` based on your requirements, by calling them on sender or
1652/// receiver.
1653///
1654/// # Warning
1655/// This unbounded channel does not shrink its queue. As a result, if the
1656/// receive side is exhausted or delayed, the internal queue may grow
1657/// substantially. This behavior is intentional and considered as a warmup
1658/// phase. If such growth is undesirable, consider using a bounded channel with
1659/// an appropriate queue size.
1660///
1661/// # Examples
1662///
1663/// ```
1664/// use std::thread::spawn;
1665///
1666/// let (s, r) = kanal_plus::unbounded(); // for channel with unbounded size queue, this channel never blocks on send
1667///
1668/// // spawn 8 threads, that will send 100 numbers to the channel reader
1669/// for i in 0..8{
1670/// let s = s.clone();
1671/// spawn(move || {
1672/// for i in 1..100{
1673/// s.send(i);
1674/// }
1675/// });
1676/// }
1677/// // drop local sender so the channel send side gets closed when all of the senders finished their jobs
1678/// drop(s);
1679///
1680/// let first = r.recv().unwrap(); // receive first msg
1681/// let total: u32 = first+r.sum::<u32>(); // the receiver implements iterator so you can call sum to receive sum of rest of messages
1682/// assert_eq!(total, 39600);
1683/// ```
1684pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1685 let internal = Internal::new(false, UNBOUNDED_STARTING_SIZE);
1686 (
1687 Sender {
1688 internal: internal.clone_unchecked(),
1689 },
1690 Receiver { internal },
1691 )
1692}
1693
1694/// Creates a new async unbounded channel, and returns [`AsyncSender`] and
1695/// [`AsyncReceiver`] of the channel for type T, you can get access to sync API
1696/// of [`Sender`] and [`Receiver`] with `to_sync`, `as_async` or `clone_sync`
1697/// based on your requirements, by calling them on async sender or receiver.
1698///
1699/// # Warning
1700/// This unbounded channel does not shrink its queue. As a result, if the
1701/// receive side is exhausted or delayed, the internal queue may grow
1702/// substantially. This behavior is intentional and considered as a warmup
1703/// phase. If such growth is undesirable, consider using a bounded channel with
1704/// an appropriate queue size.
1705///
1706/// # Examples
1707///
1708/// ```
1709/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1710/// use tokio::{spawn as co};
1711///
1712/// let (s, r) = kanal_plus::unbounded_async();
1713///
1714/// co(async move {
1715/// s.send("hello!").await?;
1716/// anyhow::Ok(())
1717/// });
1718///
1719/// assert_eq!(r.recv().await?, "hello!");
1720/// anyhow::Ok(())
1721/// # });
1722/// ```
1723#[cfg(feature = "async")]
1724pub fn unbounded_async<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
1725 let internal = Internal::new(false, UNBOUNDED_STARTING_SIZE);
1726 (
1727 AsyncSender {
1728 internal: internal.clone_unchecked(),
1729 },
1730 AsyncReceiver { internal },
1731 )
1732}