Skip to main content

kanal_plus/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(missing_docs, missing_debug_implementations)]
3
4pub(crate) mod backoff;
5pub(crate) mod internal;
6#[cfg(not(feature = "std-mutex"))]
7pub(crate) mod mutex;
8pub(crate) mod pointer;
9
10mod error;
11#[cfg(feature = "async")]
12mod future;
13mod signal;
14
15pub use error::*;
16#[cfg(feature = "async")]
17pub use future::*;
18
19#[cfg(feature = "async")]
20use core::mem::transmute;
21use core::{
22    fmt,
23    mem::{size_of, MaybeUninit},
24    pin::pin,
25    time::Duration,
26};
27use std::{collections::VecDeque, time::Instant};
28
29use branches::unlikely;
30use internal::{acquire_internal, try_acquire_internal, Internal};
31use pointer::KanalPtr;
32use signal::*;
33
34/// Sending side of the channel with sync API. It's possible to convert it to
35/// async [`AsyncSender`] with `as_async`, `to_async` or `clone_async` based on
36/// software requirement.
37#[cfg_attr(
38    feature = "async",
39    doc = r##"
40# Examples
41
42```
43let (sender, _r) = kanal_plus::bounded::<u64>(0);
44let sync_sender=sender.clone_async();
45```
46"##
47)]
48#[repr(C)]
49pub struct Sender<T> {
50    internal: Internal<T>,
51}
52
53/// Sending side of the channel with async API.  It's possible to convert it to
54/// sync [`Sender`] with `as_sync`, `to_sync` or `clone_sync` based on software
55/// requirement.
56///
57/// # Examples
58///
59/// ```
60/// let (sender, _r) = kanal_plus::bounded_async::<u64>(0);
61/// let sync_sender=sender.clone_sync();
62/// ```
63#[cfg(feature = "async")]
64#[repr(C)]
65pub struct AsyncSender<T> {
66    internal: Internal<T>,
67}
68
69impl<T> Drop for Sender<T> {
70    fn drop(&mut self) {
71        self.internal.drop_send();
72    }
73}
74
75#[cfg(feature = "async")]
76impl<T> Drop for AsyncSender<T> {
77    fn drop(&mut self) {
78        self.internal.drop_send();
79    }
80}
81
82impl<T> Clone for Sender<T> {
83    fn clone(&self) -> Self {
84        Self {
85            internal: self.internal.clone_send(),
86        }
87    }
88}
89
90impl<T> fmt::Debug for Sender<T> {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        write!(f, "Sender {{ .. }}")
93    }
94}
95
96#[cfg(feature = "async")]
97impl<T> Clone for AsyncSender<T> {
98    fn clone(&self) -> Self {
99        Self {
100            internal: self.internal.clone_send(),
101        }
102    }
103}
104
105#[cfg(feature = "async")]
106impl<T> fmt::Debug for AsyncSender<T> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        write!(f, "AsyncSender {{ .. }}")
109    }
110}
111
112macro_rules! check_recv_closed_timeout {
113    ($internal:ident,$data:ident) => {
114        if unlikely($internal.recv_count == 0) {
115            // Avoid wasting lock time on dropping failed send object
116            drop($internal);
117            return Err(SendTimeoutError::Closed($data));
118        }
119    };
120}
121
122macro_rules! shared_impl {
123    () => {
124        /// Returns whether the channel is bounded or not.
125        ///
126        /// # Examples
127        ///
128        /// ```
129        /// let (s, r) = kanal_plus::bounded::<u64>(0);
130        /// assert_eq!(s.is_bounded(),true);
131        /// assert_eq!(r.is_bounded(),true);
132        /// ```
133        /// ```
134        /// let (s, r) = kanal_plus::unbounded::<u64>();
135        /// assert_eq!(s.is_bounded(),false);
136        /// assert_eq!(r.is_bounded(),false);
137        /// ```
138        pub fn is_bounded(&self) -> bool {
139            self.internal.capacity() != usize::MAX
140        }
141        /// Returns length of the queue.
142        ///
143        /// # Examples
144        ///
145        /// ```
146        /// let (s, r) = kanal_plus::unbounded::<u64>();
147        /// assert_eq!(s.len(),0);
148        /// assert_eq!(r.len(),0);
149        /// s.send(10);
150        /// assert_eq!(s.len(),1);
151        /// assert_eq!(r.len(),1);
152        /// ```
153        pub fn len(&self) -> usize {
154            acquire_internal(&self.internal).queue.len()
155        }
156        /// Returns whether the channel queue is empty or not.
157        ///
158        /// # Examples
159        ///
160        /// ```
161        /// let (s, r) = kanal_plus::unbounded::<u64>();
162        /// assert_eq!(s.is_empty(),true);
163        /// assert_eq!(r.is_empty(),true);
164        /// ```
165        pub fn is_empty(&self) -> bool {
166            acquire_internal(&self.internal).queue.is_empty()
167        }
168        /// Returns whether the channel queue is full or not
169        /// full channels will block on send and recv calls
170        /// it always returns true for zero sized channels.
171        ///
172        /// # Examples
173        ///
174        /// ```
175        /// let (s, r) = kanal_plus::bounded(1);
176        /// s.send("Hi!").unwrap();
177        /// assert_eq!(s.is_full(),true);
178        /// assert_eq!(r.is_full(),true);
179        /// ```
180        pub fn is_full(&self) -> bool {
181            self.internal.capacity() == acquire_internal(&self.internal).queue.len()
182        }
183        /// Returns capacity of channel (not the queue)
184        /// for unbounded channels, it will return usize::MAX.
185        ///
186        /// # Examples
187        ///
188        /// ```
189        /// let (s, r) = kanal_plus::bounded::<u64>(0);
190        /// assert_eq!(s.capacity(),0);
191        /// assert_eq!(r.capacity(),0);
192        /// ```
193        /// ```
194        /// let (s, r) = kanal_plus::unbounded::<u64>();
195        /// assert_eq!(s.capacity(),usize::MAX);
196        /// assert_eq!(r.capacity(),usize::MAX);
197        /// ```
198        pub fn capacity(&self) -> usize {
199            self.internal.capacity()
200        }
201        /// Returns count of alive receiver instances of the channel.
202        ///
203        /// # Examples
204        ///
205        /// ```
206        /// let (s, r) = kanal_plus::unbounded::<u64>();
207        /// let receiver_clone=r.clone();
208        /// assert_eq!(r.receiver_count(),2);
209        /// ```
210        pub fn receiver_count(&self) -> usize {
211            acquire_internal(&self.internal).recv_count as usize
212        }
213        /// Returns count of alive sender instances of the channel.
214        ///
215        /// # Examples
216        ///
217        /// ```
218        /// let (s, r) = kanal_plus::unbounded::<u64>();
219        /// let sender_clone=s.clone();
220        /// assert_eq!(r.sender_count(),2);
221        /// ```
222        pub fn sender_count(&self) -> usize {
223            acquire_internal(&self.internal).send_count as usize
224        }
225        /// Closes the channel completely on both sides and terminates waiting
226        /// signals.
227        ///
228        /// # Examples
229        ///
230        /// ```
231        /// let (s, r) = kanal_plus::unbounded::<u64>();
232        /// // closes channel on both sides and has same effect as r.close();
233        /// s.close().unwrap();
234        /// assert_eq!(r.is_closed(),true);
235        /// assert_eq!(s.is_closed(),true);
236        /// ```
237        pub fn close(&self) -> Result<(), CloseError> {
238            let mut internal = acquire_internal(&self.internal);
239            if unlikely(internal.recv_count == 0 && internal.send_count == 0) {
240                return Err(CloseError());
241            }
242            internal.recv_count = 0;
243            internal.send_count = 0;
244            internal.terminate_signals();
245            internal.queue.clear();
246            Ok(())
247        }
248        /// Returns whether the channel is closed on both side of send and
249        /// receive or not.
250        ///
251        /// # Examples
252        ///
253        /// ```
254        /// let (s, r) = kanal_plus::unbounded::<u64>();
255        /// // closes channel on both sides and has same effect as r.close();
256        /// s.close();
257        /// assert_eq!(r.is_closed(),true);
258        /// assert_eq!(s.is_closed(),true);
259        /// ```
260        pub fn is_closed(&self) -> bool {
261            let internal = acquire_internal(&self.internal);
262            internal.send_count == 0 && internal.recv_count == 0
263        }
264    };
265}
266
267macro_rules! shared_send_impl {
268    () => {
269        /// Tries sending to the channel without waiting on the waitlist, if
270        /// send fails then the object will be dropped. It returns `Ok(true)` in
271        /// case of a successful operation and `Ok(false)` for a failed one, or
272        /// error in case that channel is closed. Important note: this function
273        /// is not lock-free as it acquires a mutex guard of the channel
274        /// internal for a short time.
275        ///
276        /// # Examples
277        ///
278        /// ```
279        /// # use std::thread::spawn;
280        /// let (s, r) = kanal_plus::bounded(0);
281        /// let t=spawn( move || {
282        ///     loop{
283        ///         if s.try_send(1).is_ok() {
284        ///             break;
285        ///         }
286        ///     }
287        /// });
288        /// assert_eq!(r.recv()?,1);
289        /// # t.join();
290        /// # anyhow::Ok(())
291        /// ```
292        #[inline(always)]
293        pub fn try_send(&self, data: T) -> Result<(), SendTimeoutError<T>> {
294            let cap = self.internal.capacity();
295            let mut internal = acquire_internal(&self.internal);
296            check_recv_closed_timeout!(internal, data);
297            if let Some(first) = internal.next_recv() {
298                drop(internal);
299                // SAFETY: it's safe to send to owned signal once
300                unsafe { first.send(data) }
301                return Ok(());
302            }
303            if cap > 0 && internal.queue.len() < cap {
304                internal.queue.push_back(data);
305                return Ok(());
306            }
307            Err(SendTimeoutError::Timeout(data))
308        }
309
310        /// Tries sending to the channel without waiting on the waitlist or for
311        /// the internal mutex, if send fails then the object will be dropped.
312        /// It returns `Ok(true)` in case of a successful operation and
313        /// `Ok(false)` for a failed one, or error in case that channel is
314        /// closed. Do not use this function unless you know exactly what you
315        /// are doing.
316        ///
317        /// # Examples
318        ///
319        /// ```
320        /// # use std::thread::spawn;
321        /// let (s, r) = kanal_plus::bounded(0);
322        /// let t=spawn( move || {
323        ///     loop{
324        ///         if s.try_send_realtime(1).is_ok() {
325        ///             break;
326        ///         }
327        ///     }
328        /// });
329        /// assert_eq!(r.recv()?,1);
330        /// # t.join();
331        /// # anyhow::Ok(())
332        /// ```
333        #[inline(always)]
334        pub fn try_send_realtime(&self, data: T) -> Result<(), SendTimeoutError<T>> {
335            let cap = self.internal.capacity();
336            if let Some(mut internal) = try_acquire_internal(&self.internal) {
337                check_recv_closed_timeout!(internal, data);
338                if let Some(first) = internal.next_recv() {
339                    drop(internal);
340                    // SAFETY: it's safe to send to owned signal once
341                    unsafe { first.send(data) }
342                    return Ok(());
343                }
344                if cap > 0 && internal.queue.len() < cap {
345                    internal.queue.push_back(data);
346                    return Ok(());
347                }
348            }
349            Err(SendTimeoutError::Timeout(data))
350        }
351
352        /// Returns whether the receive side of the channel is closed or not.
353        ///
354        /// # Examples
355        ///
356        /// ```
357        /// let (s, r) = kanal_plus::unbounded::<u64>();
358        /// drop(r); // drop receiver and disconnect the receive side from the channel
359        /// assert_eq!(s.is_disconnected(),true);
360        /// # anyhow::Ok(())
361        /// ```
362        pub fn is_disconnected(&self) -> bool {
363            acquire_internal(&self.internal).recv_count == 0
364        }
365    };
366}
367
368macro_rules! shared_recv_impl {
369    () => {
370        /// Tries receiving from the channel without waiting on the waitlist.
371        /// It returns `Ok(Some(T))` in case of successful operation and
372        /// `Ok(None)` for a failed one, or error in case that channel is
373        /// closed. Important note: this function is not lock-free as it
374        /// acquires a mutex guard of the channel internal for a short time.
375        ///
376        /// # Examples
377        ///
378        /// ```
379        /// # use std::thread::spawn;
380        /// # let (s, r) = kanal_plus::bounded(0);
381        /// # let t=spawn(move || {
382        /// #      s.send("Buddy")?;
383        /// #      anyhow::Ok(())
384        /// # });
385        /// loop {
386        ///     if let Some(name)=r.try_recv()?{
387        ///         println!("Hello {}!",name);
388        ///         break;
389        ///     }
390        /// }
391        /// # t.join();
392        /// # anyhow::Ok(())
393        /// ```
394        #[inline(always)]
395        pub fn try_recv(&self) -> Result<Option<T>, ReceiveError> {
396            let cap = self.internal.capacity();
397            let mut internal = acquire_internal(&self.internal);
398            if unlikely(internal.recv_count == 0) {
399                return Err(ReceiveError());
400            }
401            if cap > 0 {
402                if let Some(v) = internal.queue.pop_front() {
403                    if let Some(p) = internal.next_send() {
404                        // if there is a sender take its data and push it into the
405                        // queue Safety: it's safe to receive from owned
406                        // signal once
407                        unsafe { internal.queue.push_back(p.recv()) }
408                    }
409                    return Ok(Some(v));
410                }
411            }
412            if let Some(p) = internal.next_send() {
413                // SAFETY: it's safe to receive from owned signal once
414                drop(internal);
415                return unsafe { Ok(Some(p.recv())) };
416            }
417            if unlikely(internal.send_count == 0) {
418                return Err(ReceiveError());
419            }
420            Ok(None)
421            // if the queue is not empty send the data
422        }
423        /// Tries receiving from the channel without waiting on the waitlist or
424        /// waiting for channel internal lock. It returns `Ok(Some(T))` in case
425        /// of successful operation and `Ok(None)` for a failed one, or error in
426        /// case that channel is closed. Do not use this function unless you
427        /// know exactly what you are doing.
428        ///
429        /// # Examples
430        ///
431        /// ```
432        /// # use std::thread::spawn;
433        /// # let (s, r) = kanal_plus::bounded(0);
434        /// # let t=spawn(move || {
435        /// #      s.send("Buddy")?;
436        /// #      anyhow::Ok(())
437        /// # });
438        /// loop {
439        ///     if let Some(name)=r.try_recv_realtime()?{
440        ///         println!("Hello {}!",name);
441        ///         break;
442        ///     }
443        /// }
444        /// # t.join();
445        /// # anyhow::Ok(())
446        /// ```
447        #[inline(always)]
448        pub fn try_recv_realtime(&self) -> Result<Option<T>, ReceiveError> {
449            let cap = self.internal.capacity();
450            if let Some(mut internal) = try_acquire_internal(&self.internal) {
451                if unlikely(internal.recv_count == 0) {
452                    return Err(ReceiveError());
453                }
454                if cap > 0 {
455                    if let Some(v) = internal.queue.pop_front() {
456                        if let Some(p) = internal.next_send() {
457                            // if there is a sender take its data and push it into
458                            // the queue Safety: it's safe to
459                            // receive from owned signal once
460                            unsafe { internal.queue.push_back(p.recv()) }
461                        }
462                        return Ok(Some(v));
463                    }
464                }
465                if let Some(p) = internal.next_send() {
466                    // SAFETY: it's safe to receive from owned signal once
467                    drop(internal);
468                    return unsafe { Ok(Some(p.recv())) };
469                }
470                if unlikely(internal.send_count == 0) {
471                    return Err(ReceiveError());
472                }
473            }
474            Ok(None)
475        }
476
477        /// Drains all available messages from the channel into the provided vector and
478        /// returns the number of received messages.
479        ///
480        /// The function is designed to be non-blocking, meaning it only processes
481        /// messages that are readily available and returns immediately with whatever
482        /// messages are present. It provides a count of received messages, which could
483        /// be zero if no messages are available at the time of the call.
484        ///
485        /// When using this function, it’s a good idea to check if the returned count is
486        /// zero to avoid busy-waiting in a loop. If blocking behavior is desired when
487        /// the count is zero, you can use the `recv()` function if count is zero. For
488        /// efficiency, reusing the same vector across multiple calls can help minimize
489        /// memory allocations. Between uses, you can clear the vector with
490        /// `vec.clear()` to prepare it for the next set of messages.
491        ///
492        /// # Examples
493        ///
494        /// ```
495        /// # use std::thread::spawn;
496        /// # let (s, r) = kanal_plus::bounded(1000);
497        /// # let t=spawn(move || {
498        /// #   for i in 0..1000 {
499        /// #     s.send(i)?;
500        /// #   }
501        /// #   anyhow::Ok(())
502        /// # });
503        ///
504        /// let mut buf = Vec::with_capacity(1000);
505        /// loop {
506        ///     if let Ok(count) = r.drain_into(&mut buf) {
507        ///         if count == 0 {
508        ///            // count is 0, to avoid busy-wait using recv for
509        ///            // the first next message
510        ///            if let Ok(v) = r.recv() {
511        ///               buf.push(v);
512        ///            } else {
513        ///              break;
514        ///            }
515        ///         }
516        ///         // use buffer
517        ///         buf.iter().for_each(|v| println!("{}",v));
518        ///     }else{
519        ///         println!("Channel closed");
520        ///         break;
521        ///     }
522        ///     buf.clear();
523        /// }
524        /// # t.join();
525        /// # anyhow::Ok(())
526        /// ```
527        pub fn drain_into(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError> {
528            let vec_initial_length = vec.len();
529            let remaining_cap = vec.capacity() - vec_initial_length;
530            let mut internal = acquire_internal(&self.internal);
531            if unlikely(internal.recv_count == 0) {
532                return Err(ReceiveError());
533            }
534            let required_cap = internal.queue.len() + {
535                if internal.recv_blocking {
536                    0
537                } else {
538                    internal.wait_list.len()
539                }
540            };
541            if required_cap > remaining_cap {
542                vec.reserve(vec_initial_length + required_cap - remaining_cap);
543            }
544            while let Some(v) = internal.queue.pop_front() {
545                vec.push(v);
546            }
547            while let Some(p) = internal.next_send() {
548                // SAFETY: it's safe to receive from owned signal once
549                unsafe { vec.push(p.recv()) }
550            }
551            Ok(required_cap)
552        }
553
554        /// Returns, whether the send side of the channel, is closed or not.
555        ///
556        /// # Examples
557        ///
558        /// ```
559        /// let (s, r) = kanal_plus::unbounded::<u64>();
560        /// drop(s); // drop sender and disconnect the send side from the channel
561        /// assert_eq!(r.is_disconnected(),true);
562        /// ```
563        pub fn is_disconnected(&self) -> bool {
564            acquire_internal(&self.internal).send_count == 0
565        }
566
567        /// Returns, whether the channel receive side is terminated, and will
568        /// not return any result in future recv calls.
569        ///
570        /// # Examples
571        ///
572        /// ```
573        /// let (s, r) = kanal_plus::unbounded::<u64>();
574        /// s.send(1).unwrap();
575        /// drop(s); // drop sender and disconnect the send side from the channel
576        /// assert_eq!(r.is_disconnected(),true);
577        /// // Also channel is closed from send side, it's not terminated as there is data in channel queue
578        /// assert_eq!(r.is_terminated(),false);
579        /// assert_eq!(r.recv().unwrap(),1);
580        /// // Now channel receive side is terminated as there is no sender for channel and queue is empty
581        /// assert_eq!(r.is_terminated(),true);
582        /// ```
583        pub fn is_terminated(&self) -> bool {
584            let internal = acquire_internal(&self.internal);
585            internal.send_count == 0 && internal.queue.len() == 0
586        }
587    };
588}
589
590impl<T> Sender<T> {
591    /// Sends data to the channel.
592    ///
593    /// # Examples
594    ///
595    /// ```
596    /// # use std::thread::spawn;
597    /// # let (s, r) = kanal_plus::bounded(0);
598    /// # spawn(move || {
599    ///  s.send("Hello").unwrap();
600    /// #      anyhow::Ok(())
601    /// # });
602    /// # let name=r.recv()?;
603    /// # println!("Hello {}!",name);
604    /// # anyhow::Ok(())
605    /// ```
606    #[inline(always)]
607    pub fn send(&self, data: T) -> Result<(), SendError<T>> {
608        let cap = self.internal.capacity();
609        let mut internal = acquire_internal(&self.internal);
610        if unlikely(internal.recv_count == 0) {
611            drop(internal);
612            return Err(SendError(data));
613        }
614        if let Some(first) = internal.next_recv() {
615            drop(internal);
616            // SAFETY: it's safe to send to owned signal once
617            unsafe { first.send(data) }
618            return Ok(());
619        }
620        if cap > 0 && internal.queue.len() < cap {
621            // SAFETY: MaybeUninit is acting like a ManuallyDrop
622            internal.queue.push_back(data);
623            return Ok(());
624        }
625        let mut data = MaybeUninit::new(data);
626        // send directly to the waitlist
627        let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
628        internal.push_signal(sig.dynamic_ptr());
629        drop(internal);
630        if unlikely(!sig.wait()) {
631            // SAFETY: data failed to move, sender should drop it if it
632            // needs to
633
634            return Err(SendError(unsafe { data.assume_init() }));
635        }
636        Ok(())
637
638        // if the queue is not empty send the data
639    }
640
641    /// Sends multiple elements from a `VecDeque` into the channel.
642    ///
643    /// This method attempts to push as many items from `elements` as possible,
644    /// respecting the channel’s capacity and the current state of the receiver
645    /// side. It behaves similarly to repeatedly calling `send` for each
646    /// element, but is more efficient because it holds the internal lock
647    /// only while it can make progress.
648    ///
649    /// * If the channel is closed (no receivers), the first element that cannot
650    ///   be sent is returned inside `SendError`.
651    /// * If the channel’s queue becomes full, mutex guard will be released and
652    ///   remaining elements stay in the supplied `VecDeque` to be send in a
653    ///   signal.
654    /// * Elements are taken from the front of the deque (FIFO order). When the
655    ///   internal queue has spare capacity, elements are moved from the back of
656    ///   the deque into the internal queue to fill it as quickly as possible.
657    ///
658    /// # Examples
659    ///
660    /// ```rust
661    /// use std::collections::VecDeque;
662    /// // Create a bounded channel with capacity 3
663    /// let (s, r) = kanal_plus::bounded::<i32>(3);
664    ///
665    /// // Move the sender and the buffer into a new thread that will
666    /// // push as many items as the channel can accept.
667    /// let handle = std::thread::spawn(move || {
668    ///     /// // Prepare a deque with several values
669    ///      let mut buf = VecDeque::from(vec![1, 2, 3, 4, 5]);
670    ///     // `send_many` consumes items from the front of the deque.
671    ///     // It returns `Ok(())` when all possible items have been sent
672    ///     // or `Err` if the channel is closed. Here we unwrap the result
673    ///     // because the channel stays alive for the whole test.
674    ///     s.send_many(&mut buf).unwrap();
675    ///
676    ///     // Return the (now‑partially‑filled) buffer so the main thread can
677    ///     // inspect the remaining elements.
678    ///     buf
679    /// });
680    ///
681    /// // In the current thread we receive the three items that fit into the
682    /// // channel's capacity.
683    /// assert_eq!(r.recv().unwrap(), 1);
684    /// assert_eq!(r.recv().unwrap(), 2);
685    /// assert_eq!(r.recv().unwrap(), 3);
686    ///
687    /// std::thread::sleep(std::time::Duration::from_millis(100));
688    ///
689    /// // Sender now written two more items into the channel queue and exited.
690    /// let remaining = handle.join().expect("sender thread panicked");
691    ///
692    /// assert_eq!(r.len(), 2);
693    /// assert_eq!(r.recv().unwrap(), 4);
694    /// assert_eq!(r.recv().unwrap(), 5);
695    /// ```
696    ///
697    /// The function returns `Ok(())` when all elements have been successfully
698    /// transferred, or `Err(SendError<T>)` containing the first element that
699    /// could not be sent (typically because the receiver side has been
700    /// closed).
701    pub fn send_many(&self, elements: &mut VecDeque<T>) -> Result<(), SendError<T>> {
702        if unlikely(elements.is_empty()) {
703            return Ok(());
704        }
705        let cap = self.internal.capacity();
706        loop {
707            let mut internal = acquire_internal(&self.internal);
708            if unlikely(internal.recv_count == 0) {
709                drop(internal);
710                return Err(SendError(elements.pop_front().unwrap()));
711            }
712            while let Some(first) = internal.next_recv() {
713                // SAFETY: it's safe to send to owned signal once
714                unsafe {
715                    first.send(elements.pop_front().unwrap());
716                }
717                if unlikely(elements.is_empty()) {
718                    return Ok(());
719                }
720            }
721            if cap > 0 {
722                while internal.queue.len() < cap {
723                    if let Some(v) = elements.pop_front() {
724                        internal.queue.push_back(v);
725                    } else {
726                        return Ok(());
727                    }
728                }
729                if unlikely(elements.is_empty()) {
730                    return Ok(());
731                }
732            }
733            let mut data = MaybeUninit::new(elements.pop_front().unwrap());
734            // send directly to the waitlist
735            let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
736            internal.recv_blocking = false;
737            internal.push_signal(sig.dynamic_ptr());
738            drop(internal);
739            if unlikely(!sig.wait()) {
740                // SAFETY: data failed to move, sender should drop it if it
741                // needs to
742                return Err(SendError(unsafe { data.assume_init() }));
743            }
744            if unlikely(elements.is_empty()) {
745                return Ok(());
746            }
747        }
748    }
749
750    /// Sends data to the channel with a deadline, if send fails then the object
751    /// will be dropped. you can use send_option_timeout if you like to keep
752    /// the object in case of timeout.
753    ///
754    /// # Examples
755    ///
756    /// ```
757    /// # use std::thread::spawn;
758    /// # use std::time::Duration;
759    /// # let (s, r) = kanal_plus::bounded(0);
760    /// # spawn(move || {
761    ///  s.send_timeout("Hello",Duration::from_millis(500)).unwrap();
762    /// #      anyhow::Ok(())
763    /// # });
764    /// # let name=r.recv()?;
765    /// # println!("Hello {}!",name);
766    /// # anyhow::Ok(())
767    /// ```
768    #[inline(always)]
769    pub fn send_timeout(&self, data: T, duration: Duration) -> Result<(), SendTimeoutError<T>> {
770        let cap = self.internal.capacity();
771        let deadline = Instant::now().checked_add(duration).unwrap();
772        let mut internal = acquire_internal(&self.internal);
773        if unlikely(internal.recv_count == 0) {
774            // Avoid wasting lock time on dropping failed send object
775            drop(internal);
776            return Err(SendTimeoutError::Closed(data));
777        }
778        if let Some(first) = internal.next_recv() {
779            drop(internal);
780            // SAFETY: it's safe to send to owned signal once
781            unsafe { first.send(data) }
782            return Ok(());
783        }
784        if cap > 0 && internal.queue.len() < cap {
785            // SAFETY: MaybeUninit is used as a ManuallyDrop, and data in it is
786            // valid.
787            internal.queue.push_back(data);
788            return Ok(());
789        }
790        let mut data = MaybeUninit::new(data);
791        // send directly to the waitlist
792        let sig = pin!(SyncSignal::new(KanalPtr::new_from(data.as_mut_ptr())));
793        internal.push_signal(sig.dynamic_ptr());
794        drop(internal);
795        if unlikely(!sig.wait_timeout(deadline)) {
796            if sig.is_terminated() {
797                // SAFETY: data failed to move, sender should drop it if it
798                // needs to
799                return Err(SendTimeoutError::Closed(unsafe { data.assume_init() }));
800            }
801            {
802                let mut internal = acquire_internal(&self.internal);
803                if internal.cancel_send_signal(sig.as_tagged_ptr()) {
804                    // SAFETY: data failed to move, we return it to the user
805                    return Err(SendTimeoutError::Timeout(unsafe { data.assume_init() }));
806                }
807            }
808            // removing receive failed to wait for the signal response
809            if unlikely(!sig.wait()) {
810                // SAFETY: data failed to move, we return it to the user
811
812                return Err(SendTimeoutError::Closed(unsafe { data.assume_init() }));
813            }
814        }
815        Ok(())
816
817        // if the queue is not empty send the data
818    }
819
820    shared_send_impl!();
821    /// Clones [`Sender`] as the async version of it and returns it
822    #[cfg(feature = "async")]
823    pub fn clone_async(&self) -> AsyncSender<T> {
824        AsyncSender::<T> {
825            internal: self.internal.clone_send(),
826        }
827    }
828
829    /// Converts [`Sender`] to [`AsyncSender`] and returns it
830    /// # Examples
831    ///
832    /// ```
833    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
834    /// # use tokio::{spawn as co};
835    /// # use std::time::Duration;
836    ///   let (s, r) = kanal_plus::bounded(0);
837    ///   co(async move {
838    ///     let s=s.to_async();
839    ///     s.send("World").await;
840    ///   });
841    ///   let name=r.recv()?;
842    ///   println!("Hello {}!",name);
843    /// # anyhow::Ok(())
844    /// # });
845    /// ```
846    #[cfg(feature = "async")]
847    pub fn to_async(self) -> AsyncSender<T> {
848        // SAFETY: structure of Sender<T> and AsyncSender<T> is same
849        unsafe { transmute(self) }
850    }
851
852    /// Borrows [`Sender`] as [`AsyncSender`] and returns it
853    /// # Examples
854    ///
855    /// ```
856    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
857    /// # use tokio::{spawn as co};
858    /// # use std::time::Duration;
859    ///   let (s, r) = kanal_plus::bounded(0);
860    ///   co(async move {
861    ///     s.as_async().send("World").await;
862    ///   });
863    ///   let name=r.recv()?;
864    ///   println!("Hello {}!",name);
865    /// # anyhow::Ok(())
866    /// # });
867    /// ```
868    #[cfg(feature = "async")]
869    pub fn as_async(&self) -> &AsyncSender<T> {
870        // SAFETY: structure of Sender<T> and AsyncSender<T> is same
871        unsafe { transmute(self) }
872    }
873    shared_impl!();
874}
875
876#[cfg(feature = "async")]
877impl<T> AsyncSender<T> {
878    /// Sends data asynchronously to the channel.
879    ///
880    /// # Examples
881    ///
882    /// ```
883    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
884    /// # let (s, r) = kanal_plus::unbounded_async();
885    /// s.send(1).await?;
886    /// assert_eq!(r.recv().await?,1);
887    /// # anyhow::Ok(())
888    /// # });
889    /// ```
890    #[inline(always)]
891    pub fn send(&'_ self, data: T) -> SendFuture<'_, T> {
892        SendFuture::new(&self.internal, data)
893    }
894
895    /// Sends multiple elements from a `VecDeque` into the channel
896    /// asynchronously.
897    ///
898    /// This method consumes the provided `VecDeque` by repeatedly popping
899    /// elements from its front and sending each one over the channel. The
900    /// operation completes when the deque is empty or when the channel is
901    /// closed.
902    ///
903    /// # Examples
904    ///
905    /// ```
906    /// # use tokio::{spawn as co};
907    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
908    /// # use std::collections::VecDeque;
909    /// let (s, r) = kanal_plus::bounded_async(3);
910    /// let handle = co(async move {
911    ///     let mut elems = VecDeque::from(vec![10, 20, 30, 40, 50]);
912    ///     // Send all elements in the deque
913    ///     s.send_many(&mut elems).await.unwrap();
914    /// });
915    ///
916    /// // Receive the values in the same order they were sent
917    /// assert_eq!(r.recv().await?, 10);
918    /// assert_eq!(r.recv().await?, 20);
919    /// assert_eq!(r.recv().await?, 30);
920    ///
921    /// //panic!("here");
922    ///
923    /// tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
924    /// // Now the sender has sent the remaining elements
925    /// //handle.await.unwrap();
926    ///
927    /// assert_eq!(r.recv().await?, 40);
928    /// assert_eq!(r.recv().await?, 50);
929    ///
930    /// # anyhow::Ok(())
931    /// # });
932    /// ```
933    #[inline(always)]
934    pub fn send_many<'a, 'b>(&'a self, elements: &'b mut VecDeque<T>) -> SendManyFuture<'a, 'b, T> {
935        SendManyFuture::new(&self.internal, elements)
936    }
937
938    shared_send_impl!();
939
940    /// Clones [`AsyncSender`] as [`Sender`] with sync api of it.
941    ///
942    /// # Examples
943    ///
944    /// ```
945    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
946    /// let (s, r) = kanal_plus::unbounded_async();
947    /// let sync_sender=s.clone_sync();
948    /// // JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
949    /// sync_sender.send(1)?;
950    /// assert_eq!(r.recv().await?,1);
951    /// # anyhow::Ok(())
952    /// # });
953    /// ```
954    pub fn clone_sync(&self) -> Sender<T> {
955        Sender::<T> {
956            internal: self.internal.clone_send(),
957        }
958    }
959
960    /// Converts [`AsyncSender`] to [`Sender`] and returns it.
961    ///
962    /// # Examples
963    ///
964    /// ```
965    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
966    /// # use std::time::Duration;
967    ///   let (s, r) = kanal_plus::bounded_async(0);
968    ///   // move to sync environment
969    ///   std::thread::spawn(move || {
970    ///     let s=s.to_sync();
971    ///     s.send("World")?;
972    ///     anyhow::Ok(())
973    ///   });
974    ///   let name=r.recv().await?;
975    ///   println!("Hello {}!",name);
976    /// # anyhow::Ok(())
977    /// # });
978    /// ```
979    pub fn to_sync(self) -> Sender<T> {
980        // SAFETY: structure of Sender<T> and AsyncSender<T> is same
981        unsafe { transmute(self) }
982    }
983
984    /// Borrows [`AsyncSender`] as [`Sender`] and returns it.
985    ///
986    /// # Examples
987    ///
988    /// ```
989    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
990    /// # use std::time::Duration;
991    ///   let (s, r) = kanal_plus::bounded_async(0);
992    ///   // move to sync environment
993    ///   std::thread::spawn(move || {
994    ///     s.as_sync().send("World")?;
995    ///     anyhow::Ok(())
996    ///   });
997    ///   let name=r.recv().await?;
998    ///   println!("Hello {}!",name);
999    /// # anyhow::Ok(())
1000    /// # });
1001    /// ```
1002    pub fn as_sync(&self) -> &Sender<T> {
1003        // SAFETY: structure of Sender<T> and AsyncSender<T> is same
1004        unsafe { transmute(self) }
1005    }
1006
1007    shared_impl!();
1008}
1009
1010/// Receiving side of the channel in sync mode.
1011/// Receivers can be cloned and produce receivers to operate in both sync and
1012/// async modes.
1013#[cfg_attr(
1014    feature = "async",
1015    doc = r##"
1016# Examples
1017
1018```
1019let (_s, receiver) = kanal_plus::bounded::<u64>(0);
1020let async_receiver=receiver.clone_async();
1021```
1022"##
1023)]
1024#[repr(C)]
1025pub struct Receiver<T> {
1026    internal: Internal<T>,
1027}
1028
1029impl<T> fmt::Debug for Receiver<T> {
1030    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1031        write!(f, "Receiver {{ .. }}")
1032    }
1033}
1034
1035/// [`AsyncReceiver`] is receiving side of the channel in async mode.
1036/// Receivers can be cloned and produce receivers to operate in both sync and
1037/// async modes.
1038///
1039/// # Examples
1040///
1041/// ```
1042/// let (_s, receiver) = kanal_plus::bounded_async::<u64>(0);
1043/// let sync_receiver=receiver.clone_sync();
1044/// ```
1045#[cfg(feature = "async")]
1046#[repr(C)]
1047pub struct AsyncReceiver<T> {
1048    internal: Internal<T>,
1049}
1050
1051#[cfg(feature = "async")]
1052impl<T> fmt::Debug for AsyncReceiver<T> {
1053    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1054        write!(f, "AsyncReceiver {{ .. }}")
1055    }
1056}
1057
1058impl<T> Receiver<T> {
1059    /// Receives data from the channel
1060    #[inline(always)]
1061    pub fn recv(&self) -> Result<T, ReceiveError> {
1062        let cap = self.internal.capacity();
1063        let mut internal = acquire_internal(&self.internal);
1064        if unlikely(internal.recv_count == 0) {
1065            return Err(ReceiveError());
1066        }
1067        if cap > 0 {
1068            if let Some(v) = internal.queue.pop_front() {
1069                if let Some(p) = internal.next_send() {
1070                    // if there is a sender take its data and push it into the queue
1071                    // SAFETY: it's safe to receive from owned signal once
1072                    unsafe { internal.queue.push_back(p.recv()) }
1073                }
1074                return Ok(v);
1075            }
1076        }
1077        if let Some(p) = internal.next_send() {
1078            drop(internal);
1079            // SAFETY: it's safe to receive from owned signal once
1080            return unsafe { Ok(p.recv()) };
1081        }
1082        if unlikely(internal.send_count == 0) {
1083            return Err(ReceiveError());
1084        }
1085        // no active waiter so push to the queue
1086        let mut ret = MaybeUninit::<T>::uninit();
1087        let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
1088            ret.as_mut_ptr()
1089        )));
1090        internal.push_signal(sig.dynamic_ptr());
1091        drop(internal);
1092
1093        if unlikely(!sig.wait()) {
1094            return Err(ReceiveError());
1095        }
1096
1097        // SAFETY: it's safe to assume init as data is forgotten on another
1098        // side
1099        if size_of::<T>() > size_of::<*mut T>() {
1100            Ok(unsafe { ret.assume_init() })
1101        } else {
1102            Ok(unsafe { sig.assume_init() })
1103        }
1104
1105        // if the queue is not empty send the data
1106    }
1107    /// Tries receiving from the channel within a duration
1108    #[inline(always)]
1109    pub fn recv_timeout(&self, duration: Duration) -> Result<T, ReceiveErrorTimeout> {
1110        let cap = self.internal.capacity();
1111        let deadline = Instant::now().checked_add(duration).unwrap();
1112        let mut internal = acquire_internal(&self.internal);
1113        if unlikely(internal.recv_count == 0) {
1114            return Err(ReceiveErrorTimeout::Closed);
1115        }
1116        if cap > 0 {
1117            if let Some(v) = internal.queue.pop_front() {
1118                if let Some(p) = internal.next_send() {
1119                    // if there is a sender take its data and push it into the queue
1120                    // SAFETY: it's safe to receive from owned signal once
1121                    unsafe { internal.queue.push_back(p.recv()) }
1122                }
1123                return Ok(v);
1124            }
1125        }
1126        if let Some(p) = internal.next_send() {
1127            drop(internal);
1128            // SAFETY: it's safe to receive from owned signal once
1129            return unsafe { Ok(p.recv()) };
1130        }
1131        if unlikely(Instant::now() > deadline) {
1132            return Err(ReceiveErrorTimeout::Timeout);
1133        }
1134        if unlikely(internal.send_count == 0) {
1135            return Err(ReceiveErrorTimeout::Closed);
1136        }
1137        // no active waiter so push to the queue
1138        let mut ret = MaybeUninit::<T>::uninit();
1139        let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
1140            ret.as_mut_ptr()
1141        )));
1142        internal.push_signal(sig.dynamic_ptr());
1143        drop(internal);
1144        if unlikely(!sig.wait_timeout(deadline)) {
1145            if sig.is_terminated() {
1146                return Err(ReceiveErrorTimeout::Closed);
1147            }
1148            {
1149                let mut internal = acquire_internal(&self.internal);
1150                if internal.cancel_recv_signal(sig.as_tagged_ptr()) {
1151                    return Err(ReceiveErrorTimeout::Timeout);
1152                }
1153            }
1154            // removing receive failed to wait for the signal response
1155            if unlikely(!sig.wait()) {
1156                return Err(ReceiveErrorTimeout::Closed);
1157            }
1158        }
1159        // SAFETY: it's safe to assume init as data is forgotten on another
1160        // side
1161        if size_of::<T>() > size_of::<*mut T>() {
1162            Ok(unsafe { ret.assume_init() })
1163        } else {
1164            Ok(unsafe { sig.assume_init() })
1165        }
1166
1167        // if the queue is not empty send the data
1168    }
1169
1170    /// Drains all available messages from the channel into the provided vector,
1171    /// blocking until at least one message is received.
1172    ///
1173    /// This function combines the behavior of `drain_into` with blocking semantics:
1174    /// - If messages are available, it drains all of them and returns immediately
1175    /// - If no messages are available, it blocks the current thread until at least one message arrives
1176    ///
1177    /// Returns the number of messages received.
1178    ///
1179    /// # Examples
1180    ///
1181    /// ```
1182    /// # use std::thread::spawn;
1183    /// # let (s, r) = kanal_plus::bounded(100);
1184    /// # let t = spawn(move || {
1185    /// #   for i in 0..100 {
1186    /// #     s.send(i)?;
1187    /// #   }
1188    /// #   anyhow::Ok(())
1189    /// # });
1190    ///
1191    /// let mut buf = Vec::new();
1192    /// loop {
1193    ///     match r.drain_into_blocking(&mut buf) {
1194    ///         Ok(count) => {
1195    ///             assert!(count > 0);
1196    ///             // process buf...
1197    ///             buf.clear();
1198    ///         }
1199    ///         Err(_) => break, // channel closed
1200    ///     }
1201    /// }
1202    /// # t.join().unwrap()?;
1203    /// # anyhow::Ok(())
1204    /// ```
1205    pub fn drain_into_blocking(&self, vec: &mut Vec<T>) -> Result<usize, ReceiveError> {
1206        let vec_initial_length = vec.len();
1207        let mut internal = acquire_internal(&self.internal);
1208
1209        // Check if channel is closed
1210        if unlikely(internal.recv_count == 0) {
1211            return Err(ReceiveError());
1212        }
1213
1214        // Calculate required capacity and reserve
1215        let required_cap = internal.queue.len() + {
1216            if internal.recv_blocking {
1217                0
1218            } else {
1219                internal.wait_list.len()
1220            }
1221        };
1222        let remaining_cap = vec.capacity() - vec_initial_length;
1223        if required_cap > remaining_cap {
1224            vec.reserve(vec_initial_length + required_cap - remaining_cap);
1225        }
1226
1227        // Drain queue
1228        vec.extend(internal.queue.drain(..));
1229
1230        // Drain wait_list send signals
1231        while let Some(p) = internal.next_send() {
1232            // SAFETY: it's safe to receive from owned signal once
1233            unsafe { vec.push(p.recv()) }
1234        }
1235
1236        // If got data, return immediately
1237        let count = vec.len() - vec_initial_length;
1238        if count > 0 {
1239            return Ok(count);
1240        }
1241
1242        // No data, check if there are still senders
1243        if unlikely(internal.send_count == 0) {
1244            return Err(ReceiveError());
1245        }
1246
1247        // Register signal and wait
1248        let mut ret = MaybeUninit::<T>::uninit();
1249        let sig = pin!(SyncSignal::new(KanalPtr::new_write_address_ptr(
1250            ret.as_mut_ptr()
1251        )));
1252        internal.push_signal(sig.dynamic_ptr());
1253        drop(internal);
1254
1255        if unlikely(!sig.wait()) {
1256            return Err(ReceiveError());
1257        }
1258
1259        // Read data and return
1260        if size_of::<T>() > size_of::<*mut T>() {
1261            vec.push(unsafe { ret.assume_init() });
1262        } else {
1263            vec.push(unsafe { sig.assume_init() });
1264        }
1265        Ok(1)
1266    }
1267
1268    shared_recv_impl!();
1269    #[cfg(feature = "async")]
1270    /// Clones receiver as the async version of it
1271    pub fn clone_async(&self) -> AsyncReceiver<T> {
1272        AsyncReceiver::<T> {
1273            internal: self.internal.clone_recv(),
1274        }
1275    }
1276
1277    /// Converts [`Receiver`] to [`AsyncReceiver`] and returns it.
1278    ///
1279    /// # Examples
1280    ///
1281    /// ```
1282    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1283    /// # use tokio::{spawn as co};
1284    /// # use std::time::Duration;
1285    ///   let (s, r) = kanal_plus::bounded(0);
1286    ///   co(async move {
1287    ///     let r=r.to_async();
1288    ///     let name=r.recv().await?;
1289    ///     println!("Hello {}!",name);
1290    ///     anyhow::Ok(())
1291    ///   });
1292    ///   s.send("World")?;
1293    /// # anyhow::Ok(())
1294    /// # });
1295    /// ```
1296    #[cfg(feature = "async")]
1297    pub fn to_async(self) -> AsyncReceiver<T> {
1298        // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1299        unsafe { transmute(self) }
1300    }
1301
1302    /// Borrows [`Receiver`] as [`AsyncReceiver`] and returns it.
1303    ///
1304    /// # Examples
1305    ///
1306    /// ```
1307    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1308    /// # use tokio::{spawn as co};
1309    /// # use std::time::Duration;
1310    ///   let (s, r) = kanal_plus::bounded(0);
1311    ///   co(async move {
1312    ///     let name=r.as_async().recv().await?;
1313    ///     println!("Hello {}!",name);
1314    ///     anyhow::Ok(())
1315    ///   });
1316    ///   s.send("World")?;
1317    /// # anyhow::Ok(())
1318    /// # });
1319    /// ```
1320    #[cfg(feature = "async")]
1321    pub fn as_async(&self) -> &AsyncReceiver<T> {
1322        // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1323        unsafe { transmute(self) }
1324    }
1325
1326    shared_impl!();
1327}
1328
1329impl<T> Iterator for Receiver<T> {
1330    type Item = T;
1331
1332    fn next(&mut self) -> Option<Self::Item> {
1333        self.recv().ok()
1334    }
1335}
1336
1337#[cfg(feature = "async")]
1338impl<T> AsyncReceiver<T> {
1339    /// Returns a [`ReceiveFuture`] to receive data from the channel
1340    /// asynchronously.
1341    ///
1342    /// # Cancellation and Polling Considerations
1343    ///
1344    /// Due to current limitations in Rust's handling of future cancellation, if
1345    /// a `ReceiveFuture` is dropped exactly at the time when new data is
1346    /// written to the channel, it may result in the loss of the received
1347    /// value. This behavior although memory-safe stems from the fact that
1348    /// Rust does not provide a built-in, correct mechanism for cancelling
1349    /// futures.
1350    ///
1351    /// Additionally, it is important to note that constructs such as
1352    /// `tokio::select!` are not correct to use with kanal async channels.
1353    /// Kanal's design does not rely on the conventional `poll` mechanism to
1354    /// read messages. Because of its internal optimizations, the future may
1355    /// complete without receiving the final poll, which prevents proper
1356    /// handling of the message.
1357    ///
1358    /// As a result, once the `ReceiveFuture` is polled for the first time
1359    /// (which registers the request to receive data), the programmer must
1360    /// commit to completing the polling process. This ensures that messages
1361    /// are correctly delivered and avoids potential race conditions associated
1362    /// with cancellation.
1363    ///
1364    /// # Examples
1365    ///
1366    /// ```
1367    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1368    /// # use tokio::{spawn as co};
1369    /// # let (s, r) = kanal_plus::bounded_async(0);
1370    /// # co(async move {
1371    /// #      s.send("Buddy").await?;
1372    /// #      anyhow::Ok(())
1373    /// # });
1374    /// let name=r.recv().await?;
1375    /// println!("Hello {}",name);
1376    /// # anyhow::Ok(())
1377    /// # });
1378    /// ```
1379    #[inline(always)]
1380    pub fn recv(&'_ self) -> ReceiveFuture<'_, T> {
1381        ReceiveFuture::new_ref(&self.internal)
1382    }
1383    /// Creates a asynchronous stream for the channel to receive messages,
1384    /// [`ReceiveStream`] borrows the [`AsyncReceiver`], after dropping it,
1385    /// receiver will be available and usable again.
1386    ///
1387    /// # Examples
1388    ///
1389    /// ```
1390    /// # use tokio::{spawn as co};
1391    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1392    /// // import to be able to use stream.next() function
1393    /// use futures::stream::StreamExt;
1394    /// // import to be able to use stream.is_terminated() function
1395    /// use futures::stream::FusedStream;
1396    ///
1397    /// let (s, r) = kanal_plus::unbounded_async();
1398    /// co(async move {
1399    ///     for i in 0..100 {
1400    ///         s.send(i).await.unwrap();
1401    ///     }
1402    /// });
1403    /// let mut stream = r.stream();
1404    /// assert!(!stream.is_terminated());
1405    /// for i in 0..100 {
1406    ///     assert_eq!(stream.next().await, Some(i));
1407    /// }
1408    /// // Stream will return None after it is terminated, and there is no other sender.
1409    /// assert_eq!(stream.next().await, None);
1410    /// assert!(stream.is_terminated());
1411    /// # });
1412    /// ```
1413    #[inline(always)]
1414    pub fn stream(&'_ self) -> ReceiveStream<'_, T> {
1415        ReceiveStream::new_borrowed(self)
1416    }
1417
1418    /// Creates an asynchronous stream that owns the receiver.
1419    ///
1420    /// This is useful when the stream needs to outlive the receiver borrow.
1421    #[inline(always)]
1422    pub fn into_stream(self) -> ReceiveStreamOwned<T> {
1423        ReceiveStreamOwned::new(self)
1424    }
1425
1426    /// Returns a [`DrainIntoBlockingFuture`] to drain all available messages from the channel
1427    /// into the provided vector, awaiting until at least one message is received.
1428    ///
1429    /// This function combines the behavior of `drain_into` with async semantics:
1430    /// - If messages are available, it drains all of them and returns immediately
1431    /// - If no messages are available, it awaits (yields to the async runtime) until at least one message arrives
1432    ///
1433    /// Note: The name "blocking" refers to the semantic behavior (waiting for data), not thread blocking.
1434    /// This method is fully async and will not block the thread.
1435    ///
1436    /// Returns the number of messages received.
1437    ///
1438    /// # Examples
1439    ///
1440    /// ```
1441    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1442    /// # use tokio::spawn;
1443    /// let (s, r) = kanal_plus::bounded_async(100);
1444    /// spawn(async move {
1445    ///     for i in 0..100 {
1446    ///         s.send(i).await.unwrap();
1447    ///     }
1448    /// });
1449    ///
1450    /// let mut buf = Vec::new();
1451    /// loop {
1452    ///     match r.drain_into_blocking(&mut buf).await {
1453    ///         Ok(count) => {
1454    ///             assert!(count > 0);
1455    ///             // process buf...
1456    ///             buf.clear();
1457    ///         }
1458    ///         Err(_) => break, // channel closed
1459    ///     }
1460    /// }
1461    /// # anyhow::Ok(())
1462    /// # });
1463    /// ```
1464    #[inline(always)]
1465    pub fn drain_into_blocking<'a, 'b>(
1466        &'a self,
1467        vec: &'b mut Vec<T>,
1468    ) -> DrainIntoBlockingFuture<'a, 'b, T> {
1469        DrainIntoBlockingFuture::new(&self.internal, vec)
1470    }
1471
1472    shared_recv_impl!();
1473    /// Returns sync cloned version of the receiver.
1474    ///
1475    /// # Examples
1476    ///
1477    /// ```
1478    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1479    /// # use tokio::{spawn as co};
1480    /// let (s, r) = kanal_plus::unbounded_async();
1481    /// s.send(1).await?;
1482    /// let sync_receiver=r.clone_sync();
1483    /// // JUST FOR EXAMPLE IT IS WRONG TO USE SYNC INSTANCE IN ASYNC CONTEXT
1484    /// assert_eq!(sync_receiver.recv()?,1);
1485    /// # anyhow::Ok(())
1486    /// # });
1487    /// ```
1488    pub fn clone_sync(&self) -> Receiver<T> {
1489        Receiver::<T> {
1490            internal: self.internal.clone_recv(),
1491        }
1492    }
1493
1494    /// Converts [`AsyncReceiver`] to [`Receiver`] and returns it.
1495    ///
1496    /// # Examples
1497    ///
1498    /// ```
1499    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1500    /// # use std::time::Duration;
1501    ///   let (s, r) = kanal_plus::bounded_async(0);
1502    ///   // move to sync environment
1503    ///   std::thread::spawn(move || {
1504    ///     let r=r.to_sync();
1505    ///     let name=r.recv()?;
1506    ///     println!("Hello {}!",name);
1507    ///     anyhow::Ok(())
1508    ///   });
1509    ///   s.send("World").await?;
1510    /// # anyhow::Ok(())
1511    /// # });
1512    /// ```
1513    pub fn to_sync(self) -> Receiver<T> {
1514        // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1515        unsafe { transmute(self) }
1516    }
1517
1518    /// Borrows [`AsyncReceiver`] as [`Receiver`] and returns it
1519    /// # Examples
1520    ///
1521    /// ```
1522    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1523    /// # use std::time::Duration;
1524    ///   let (s, r) = kanal_plus::bounded_async(0);
1525    ///   // move to sync environment
1526    ///   std::thread::spawn(move || {
1527    ///     let name=r.as_sync().recv()?;
1528    ///     println!("Hello {}!",name);
1529    ///     anyhow::Ok(())
1530    ///   });
1531    ///   s.send("World").await?;
1532    /// # anyhow::Ok(())
1533    /// # });
1534    /// ```
1535    pub fn as_sync(&self) -> &Receiver<T> {
1536        // SAFETY: structure of Receiver<T> and AsyncReceiver<T> is same
1537        unsafe { transmute(self) }
1538    }
1539
1540    shared_impl!();
1541}
1542
1543impl<T> Drop for Receiver<T> {
1544    fn drop(&mut self) {
1545        self.internal.drop_recv();
1546    }
1547}
1548
1549#[cfg(feature = "async")]
1550impl<T> Drop for AsyncReceiver<T> {
1551    fn drop(&mut self) {
1552        self.internal.drop_recv();
1553    }
1554}
1555
1556impl<T> Clone for Receiver<T> {
1557    fn clone(&self) -> Self {
1558        Self {
1559            internal: self.internal.clone_recv(),
1560        }
1561    }
1562}
1563
1564#[cfg(feature = "async")]
1565impl<T> Clone for AsyncReceiver<T> {
1566    fn clone(&self) -> Self {
1567        Self {
1568            internal: self.internal.clone_recv(),
1569        }
1570    }
1571}
1572
1573/// Creates a new sync bounded channel with the requested buffer size, and
1574/// returns [`Sender`] and [`Receiver`] of the channel for type T, you can get
1575/// access to async API of [`AsyncSender`] and [`AsyncReceiver`] with `to_sync`,
1576/// `as_async` or `clone_sync` based on your requirements, by calling them on
1577/// sender or receiver.
1578///
1579/// # Examples
1580///
1581/// ```
1582/// use std::thread::spawn;
1583///
1584/// let (s, r) = kanal_plus::bounded(0); // for channel with zero size queue, this channel always block until successful send/recv
1585///
1586/// // spawn 8 threads, that will send 100 numbers to channel reader
1587/// for i in 0..8{
1588///     let s = s.clone();
1589///     spawn(move || {
1590///         for i in 1..100{
1591///             s.send(i);
1592///         }
1593///     });
1594/// }
1595/// // drop local sender so the channel send side gets closed when all of the senders finished their jobs
1596/// drop(s);
1597///
1598/// let first = r.recv().unwrap(); // receive first msg
1599/// let total: u32 = first+r.sum::<u32>(); // the receiver implements iterator so you can call sum to receive sum of rest of messages
1600/// assert_eq!(total, 39600);
1601/// ```
1602pub fn bounded<T>(size: usize) -> (Sender<T>, Receiver<T>) {
1603    let internal = Internal::new(true, size);
1604    (
1605        Sender {
1606            internal: internal.clone_unchecked(),
1607        },
1608        Receiver { internal },
1609    )
1610}
1611
1612/// Creates a new async bounded channel with the requested buffer size, and
1613/// returns [`AsyncSender`] and [`AsyncReceiver`] of the channel for type T, you
1614/// can get access to sync API of [`Sender`] and [`Receiver`] with `to_sync`,
1615/// `as_async` or `clone_sync` based on your requirements, by calling them on
1616/// async sender or receiver.
1617///
1618/// # Examples
1619///
1620/// ```
1621/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1622/// use tokio::{spawn as co};
1623///
1624/// let (s, r) = kanal_plus::bounded_async(0);
1625///
1626/// co(async move {
1627///       s.send("hello!").await?;
1628///       anyhow::Ok(())
1629/// });
1630///
1631/// assert_eq!(r.recv().await?, "hello!");
1632/// anyhow::Ok(())
1633/// # });
1634/// ```
1635#[cfg(feature = "async")]
1636pub fn bounded_async<T>(size: usize) -> (AsyncSender<T>, AsyncReceiver<T>) {
1637    let internal = Internal::new(true, size);
1638    (
1639        AsyncSender {
1640            internal: internal.clone_unchecked(),
1641        },
1642        AsyncReceiver { internal },
1643    )
1644}
1645
1646const UNBOUNDED_STARTING_SIZE: usize = 32;
1647
1648/// Creates a new sync unbounded channel, and returns [`Sender`] and
1649/// [`Receiver`] of the channel for type T, you can get access to async API
1650/// of [`AsyncSender`] and [`AsyncReceiver`] with `to_sync`, `as_async` or
1651/// `clone_sync` based on your requirements, by calling them on sender or
1652/// receiver.
1653///
1654/// # Warning
1655/// This unbounded channel does not shrink its queue. As a result, if the
1656/// receive side is exhausted or delayed, the internal queue may grow
1657/// substantially. This behavior is intentional and considered as a warmup
1658/// phase. If such growth is undesirable, consider using a bounded channel with
1659/// an appropriate queue size.
1660///
1661/// # Examples
1662///
1663/// ```
1664/// use std::thread::spawn;
1665///
1666/// let (s, r) = kanal_plus::unbounded(); // for channel with unbounded size queue, this channel never blocks on send
1667///
1668/// // spawn 8 threads, that will send 100 numbers to the channel reader
1669/// for i in 0..8{
1670///     let s = s.clone();
1671///     spawn(move || {
1672///         for i in 1..100{
1673///             s.send(i);
1674///         }
1675///     });
1676/// }
1677/// // drop local sender so the channel send side gets closed when all of the senders finished their jobs
1678/// drop(s);
1679///
1680/// let first = r.recv().unwrap(); // receive first msg
1681/// let total: u32 = first+r.sum::<u32>(); // the receiver implements iterator so you can call sum to receive sum of rest of messages
1682/// assert_eq!(total, 39600);
1683/// ```
1684pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
1685    let internal = Internal::new(false, UNBOUNDED_STARTING_SIZE);
1686    (
1687        Sender {
1688            internal: internal.clone_unchecked(),
1689        },
1690        Receiver { internal },
1691    )
1692}
1693
1694/// Creates a new async unbounded channel, and returns [`AsyncSender`] and
1695/// [`AsyncReceiver`] of the channel for type T, you can get access to sync API
1696/// of [`Sender`] and [`Receiver`] with `to_sync`, `as_async` or `clone_sync`
1697/// based on your requirements, by calling them on async sender or receiver.
1698///
1699/// # Warning
1700/// This unbounded channel does not shrink its queue. As a result, if the
1701/// receive side is exhausted or delayed, the internal queue may grow
1702/// substantially. This behavior is intentional and considered as a warmup
1703/// phase. If such growth is undesirable, consider using a bounded channel with
1704/// an appropriate queue size.
1705///
1706/// # Examples
1707///
1708/// ```
1709/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
1710/// use tokio::{spawn as co};
1711///
1712/// let (s, r) = kanal_plus::unbounded_async();
1713///
1714/// co(async move {
1715///       s.send("hello!").await?;
1716///       anyhow::Ok(())
1717/// });
1718///
1719/// assert_eq!(r.recv().await?, "hello!");
1720/// anyhow::Ok(())
1721/// # });
1722/// ```
1723#[cfg(feature = "async")]
1724pub fn unbounded_async<T>() -> (AsyncSender<T>, AsyncReceiver<T>) {
1725    let internal = Internal::new(false, UNBOUNDED_STARTING_SIZE);
1726    (
1727        AsyncSender {
1728            internal: internal.clone_unchecked(),
1729        },
1730        AsyncReceiver { internal },
1731    )
1732}