crossfire/
blocking_tx.rs

1use crate::backoff::*;
2use crate::flavor::FlavorMP;
3use crate::{shared::*, trace_log, AsyncTx, MAsyncTx, NotCloneable, SenderType};
4use std::cell::Cell;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::MaybeUninit;
8use std::ops::Deref;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13/// A single producer (sender) that works in a blocking context.
14///
15/// Additional methods in [ChannelShared] can be accessed through `Deref`.
16///
17/// **NOTE**: `Tx` is not `Clone` or `Sync`.
18/// If you need concurrent access, use [MTx] instead.
19///
20/// `Tx` has a `Send` marker and can be moved to other threads.
21/// The following code is OK:
22///
23/// ``` rust
24/// use crossfire::*;
25/// let (tx, rx) = spsc::bounded_blocking::<usize>(100);
26/// std::thread::spawn(move || {
27///     let _ = tx.send(1);
28/// });
29/// drop(rx);
30/// ```
31///
32/// Because `Tx` does not have a `Sync` marker, using `Arc<Tx>` will lose the `Send` marker.
33///
34/// For your safety, the following code **should not compile**:
35///
36/// ``` compile_fail
37/// use crossfire::*;
38/// use std::sync::Arc;
39/// let (tx, rx) = spsc::bounded_blocking::<usize>(100);
40/// let tx = Arc::new(tx);
41/// std::thread::spawn(move || {
42///     let _ = tx.send(1);
43/// });
44/// drop(rx);
45/// ```
46pub struct Tx<F: Flavor> {
47    pub(crate) shared: Arc<ChannelShared<F>>,
48    // Remove the Sync marker to prevent being put in Arc
49    _phan: PhantomData<Cell<()>>,
50    waker_cache: WakerCache<*const F::Item>,
51}
52
53unsafe impl<F: Flavor> Send for Tx<F> {}
54
55impl<F: Flavor> fmt::Debug for Tx<F> {
56    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57        write!(f, "Tx{:p}", self)
58    }
59}
60
61impl<F: Flavor> fmt::Display for Tx<F> {
62    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63        write!(f, "Tx{:p}", self)
64    }
65}
66
67impl<F: Flavor> Drop for Tx<F> {
68    #[inline(always)]
69    fn drop(&mut self) {
70        self.shared.close_tx();
71    }
72}
73
74impl<F: Flavor> From<AsyncTx<F>> for Tx<F> {
75    fn from(value: AsyncTx<F>) -> Self {
76        value.add_tx();
77        Self::new(value.shared.clone())
78    }
79}
80
81impl<F: Flavor> Tx<F> {
82    #[inline]
83    pub(crate) fn new(shared: Arc<ChannelShared<F>>) -> Self {
84        Self { shared, waker_cache: WakerCache::new(), _phan: Default::default() }
85    }
86
87    #[inline(always)]
88    pub(crate) fn _send_bounded(
89        &self, item: &MaybeUninit<F::Item>, deadline: Option<Instant>,
90    ) -> Result<(), SendTimeoutError<F::Item>> {
91        let shared = &self.shared;
92        let large = shared.large;
93        let backoff_cfg = BackoffConfig::detect().spin(2).limit(shared.backoff_limit);
94        let mut backoff = Backoff::from(backoff_cfg);
95        let congest = shared.sender_direct_copy();
96        // disable because of issue #54
97        let direct_copy = false;
98        //        let direct_copy = deadline.is_none() && shared.sender_direct_copy();
99        if large {
100            backoff.set_step(2);
101        }
102        loop {
103            let r = if large { backoff.yield_now() } else { backoff.spin() };
104            if direct_copy && large {
105                match shared.inner.try_send_oneshot(item.as_ptr()) {
106                    Some(false) => break,
107                    None => {
108                        if r {
109                            break;
110                        }
111                        continue;
112                    }
113                    _ => {
114                        shared.on_send();
115                        trace_log!("tx: send");
116                        std::thread::yield_now();
117                        return Ok(());
118                    }
119                }
120            } else {
121                if false == shared.inner.try_send(&item) {
122                    if r {
123                        break;
124                    }
125                    continue;
126                }
127                shared.on_send();
128                trace_log!("tx: send");
129                return Ok(());
130            }
131        }
132        let direct_copy_ptr: *const F::Item = std::ptr::null();
133        //            if direct_copy { item.as_ptr() } else { std::ptr::null() };
134
135        let mut state: u8;
136        let mut o_waker: Option<<F::Send as Registry>::Waker> = None;
137        macro_rules! return_ok {
138            () => {
139                trace_log!("tx: send {:?}", o_waker);
140                if shared.is_full() {
141                    // It's for 8x1, 16x1.
142                    std::thread::yield_now();
143                    self.senders.cache_waker(o_waker, &self.waker_cache);
144                }
145                return Ok(())
146            };
147        }
148        loop {
149            self.senders.reg_waker_blocking(&mut o_waker, &self.waker_cache, direct_copy_ptr);
150            // For nx1 (more likely congest), need to reset backoff
151            // to allow more yield to receivers.
152            // For nxn (the backoff is already complete), wait a little bit.
153            state = shared.sender_double_check::<false>(&item, &mut o_waker);
154            trace_log!("tx: sender_double_check {:?} state={}", o_waker, state);
155            while state < WakerState::Woken as u8 {
156                if congest {
157                    state = shared.sender_snooze(&o_waker, &mut backoff);
158                }
159                if state <= WakerState::Waiting as u8 {
160                    match check_timeout(deadline) {
161                        Ok(None) => {
162                            std::thread::park();
163                        }
164                        Ok(Some(dur)) => {
165                            std::thread::park_timeout(dur);
166                        }
167                        Err(_) => {
168                            if shared.abandon_send_waker(o_waker.as_ref().unwrap()) {
169                                return Err(SendTimeoutError::Timeout(unsafe {
170                                    item.assume_init_read()
171                                }));
172                            } else {
173                                // NOTE: Unlikely since we disable direct copy with deadline
174                                // state is WakerState::Done
175                                return Ok(());
176                            }
177                        }
178                    }
179                    state = self.senders.get_waker_state(&o_waker, Ordering::SeqCst);
180                    trace_log!("tx: after park state={}", state);
181                }
182            }
183            if state == WakerState::Woken as u8 {
184                backoff.reset();
185                loop {
186                    if shared.inner.try_send(&item) {
187                        shared.on_send();
188                        return_ok!();
189                    }
190                    if backoff.is_completed() {
191                        break;
192                    }
193                    backoff.snooze();
194                }
195            } else if state == WakerState::Done as u8 {
196                return_ok!();
197            } else {
198                debug_assert_eq!(state, WakerState::Closed as u8);
199                return Err(SendTimeoutError::Disconnected(unsafe { item.assume_init_read() }));
200            }
201        }
202    }
203
204    /// Sends a message. This method will block until the message is sent or the channel is closed.
205    ///
206    /// Returns `Ok(())` on success.
207    ///
208    /// Returns `Err(SendError)` if the receiver has been dropped.
209    ///
210    #[inline]
211    pub fn send(&self, item: F::Item) -> Result<(), SendError<F::Item>> {
212        let shared = &self.shared;
213        if shared.is_rx_closed() {
214            return Err(SendError(item));
215        }
216        let _item = MaybeUninit::new(item);
217        if shared.inner.try_send(&_item) {
218            shared.on_send();
219            return Ok(());
220        }
221        match self._send_bounded(&_item, None) {
222            Ok(_) => return Ok(()),
223            Err(SendTimeoutError::Disconnected(e)) => Err(SendError(e)),
224            Err(SendTimeoutError::Timeout(_)) => unreachable!(),
225        }
226    }
227
228    /// Attempts to send a message without blocking.
229    ///
230    /// Returns `Ok(())` when successful.
231    ///
232    /// Returns Err([TrySendError::Full]) if the channel is full.
233    ///
234    /// Returns Err([TrySendError::Disconnected]) if the receiver has been dropped.
235    #[inline]
236    pub fn try_send(&self, item: F::Item) -> Result<(), TrySendError<F::Item>> {
237        let shared = &self.shared;
238        if shared.is_rx_closed() {
239            return Err(TrySendError::Disconnected(item));
240        }
241        let _item = MaybeUninit::new(item);
242        if shared.inner.try_send(&_item) {
243            shared.on_send();
244            return Ok(());
245        } else {
246            return Err(TrySendError::Full(unsafe { _item.assume_init_read() }));
247        }
248    }
249
250    /// Sends a message with a timeout.
251    /// Will block when channel is full.
252    ///
253    /// The behavior is atomic: the message is either sent successfully or returned on error.
254    ///
255    /// Returns `Ok(())` when successful.
256    ///
257    /// Returns Err([SendTimeoutError::Timeout]) if the operation timed out.
258    ///
259    /// Returns Err([SendTimeoutError::Disconnected]) if the receiver has been dropped.
260    #[inline]
261    pub fn send_timeout(
262        &self, item: F::Item, timeout: Duration,
263    ) -> Result<(), SendTimeoutError<F::Item>> {
264        let shared = &self.shared;
265        if shared.is_rx_closed() {
266            return Err(SendTimeoutError::Disconnected(item));
267        }
268        match Instant::now().checked_add(timeout) {
269            None => self.try_send(item).map_err(|e| match e {
270                TrySendError::Disconnected(t) => SendTimeoutError::Disconnected(t),
271                TrySendError::Full(t) => SendTimeoutError::Timeout(t),
272            }),
273            Some(deadline) => {
274                let _item = MaybeUninit::new(item);
275                if shared.inner.try_send(&_item) {
276                    shared.on_send();
277                    return Ok(());
278                }
279                match self._send_bounded(&_item, Some(deadline)) {
280                    Ok(_) => return Ok(()),
281                    Err(e) => return Err(e),
282                }
283            }
284        }
285    }
286
287    /// Return true if the other side has closed
288    #[inline(always)]
289    pub fn is_disconnected(&self) -> bool {
290        self.shared.is_rx_closed()
291    }
292
293    #[inline]
294    pub fn into_async(self) -> AsyncTx<F> {
295        self.into()
296    }
297}
298
299/// A multi-producer (sender) that works in a blocking context.
300///
301/// Inherits from [`Tx<F>`] and implements `Clone`.
302/// Additional methods can be accessed through `Deref<Target=[ChannelShared]>`.
303///
304/// You can use `into()` to convert it to `Tx<F>`.
305pub struct MTx<F: Flavor>(pub(crate) Tx<F>);
306
307impl<F: Flavor> fmt::Debug for MTx<F> {
308    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
309        write!(f, "MTx{:p}", self)
310    }
311}
312
313impl<F: Flavor> fmt::Display for MTx<F> {
314    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
315        write!(f, "MTx{:p}", self)
316    }
317}
318
319impl<F: Flavor> From<MTx<F>> for Tx<F> {
320    fn from(tx: MTx<F>) -> Self {
321        tx.0
322    }
323}
324
325impl<F: Flavor> From<MAsyncTx<F>> for MTx<F> {
326    fn from(value: MAsyncTx<F>) -> Self {
327        value.add_tx();
328        Self(Tx::new(value.shared.clone()))
329    }
330}
331
332unsafe impl<F: Flavor> Sync for MTx<F> {}
333
334impl<F: Flavor + FlavorMP> MTx<F> {
335    #[inline]
336    pub(crate) fn new(shared: Arc<ChannelShared<F>>) -> Self {
337        Self(Tx::new(shared))
338    }
339
340    #[inline]
341    pub fn into_async(self) -> MAsyncTx<F> {
342        self.into()
343    }
344}
345
346impl<F: Flavor> Clone for MTx<F> {
347    #[inline]
348    fn clone(&self) -> Self {
349        let inner = &self.0;
350        inner.shared.add_tx();
351        Self(Tx::new(inner.shared.clone()))
352    }
353}
354
355impl<F: Flavor> Deref for MTx<F> {
356    type Target = Tx<F>;
357
358    /// Inherits all the functions of [Tx].
359    #[inline(always)]
360    fn deref(&self) -> &Self::Target {
361        &self.0
362    }
363}
364
365/// For writing generic code with MTx & Tx
366pub trait BlockingTxTrait<T: Send + 'static>: Send + 'static + fmt::Debug + fmt::Display {
367    /// Sends a message. This method will block until the message is sent or the channel is closed.
368    ///
369    /// Returns `Ok(())` on success.
370    ///
371    /// Returns Err([SendError]) if the receiver has been dropped.
372    fn send(&self, _item: T) -> Result<(), SendError<T>>;
373
374    /// Attempts to send a message without blocking.
375    ///
376    /// Returns `Ok(())` when successful.
377    ///
378    /// Returns `Err([TrySendError::Full])` if the channel is full.
379    ///
380    /// Returns Err([TrySendError::Disconnected]) if the receiver has been dropped.
381    fn try_send(&self, _item: T) -> Result<(), TrySendError<T>>;
382
383    /// Sends a message with a timeout.
384    /// Will block when channel is empty.
385    ///
386    /// Returns `Ok(())` when successful.
387    ///
388    /// Returns Err([SendTimeoutError::Timeout]) if the message could not be sent because the channel is full and the operation timed out.
389    ///
390    /// Returns Err([SendTimeoutError::Disconnected]) if the receiver has been dropped.
391    fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>;
392
393    /// The number of messages in the channel at the moment
394    fn len(&self) -> usize;
395
396    /// The capacity of the channel, return None for unbounded channel.
397    fn capacity(&self) -> Option<usize>;
398
399    /// Whether channel is empty at the moment
400    fn is_empty(&self) -> bool;
401
402    /// Whether the channel is full at the moment
403    fn is_full(&self) -> bool;
404
405    /// Return true if the other side has closed
406    fn is_disconnected(&self) -> bool;
407
408    /// Return the number of senders
409    fn get_tx_count(&self) -> usize;
410
411    /// Return the number of receivers
412    fn get_rx_count(&self) -> usize;
413
414    fn clone_to_vec(self, count: usize) -> Vec<Self>
415    where
416        Self: Sized;
417
418    fn get_wakers_count(&self) -> (usize, usize);
419}
420
421impl<F: Flavor> BlockingTxTrait<F::Item> for Tx<F> {
422    #[inline(always)]
423    fn clone_to_vec(self, _count: usize) -> Vec<Self> {
424        assert_eq!(_count, 1);
425        vec![self]
426    }
427
428    #[inline(always)]
429    fn send(&self, item: F::Item) -> Result<(), SendError<F::Item>> {
430        Tx::send(self, item)
431    }
432
433    #[inline(always)]
434    fn try_send(&self, item: F::Item) -> Result<(), TrySendError<F::Item>> {
435        Tx::try_send(self, item)
436    }
437
438    #[inline(always)]
439    fn send_timeout(
440        &self, item: F::Item, timeout: Duration,
441    ) -> Result<(), SendTimeoutError<F::Item>> {
442        Tx::send_timeout(&self, item, timeout)
443    }
444
445    /// The number of messages in the channel at the moment
446    #[inline(always)]
447    fn len(&self) -> usize {
448        self.as_ref().len()
449    }
450
451    /// The capacity of the channel, return None for unbounded channel.
452    #[inline(always)]
453    fn capacity(&self) -> Option<usize> {
454        self.as_ref().capacity()
455    }
456
457    /// Whether channel is empty at the moment
458    #[inline(always)]
459    fn is_empty(&self) -> bool {
460        self.as_ref().is_empty()
461    }
462
463    /// Whether the channel is full at the moment
464    #[inline(always)]
465    fn is_full(&self) -> bool {
466        self.as_ref().is_full()
467    }
468
469    /// Return true if the other side has closed
470    #[inline(always)]
471    fn is_disconnected(&self) -> bool {
472        self.as_ref().get_rx_count() == 0
473    }
474
475    #[inline(always)]
476    fn get_tx_count(&self) -> usize {
477        self.as_ref().get_tx_count()
478    }
479
480    #[inline(always)]
481    fn get_rx_count(&self) -> usize {
482        self.as_ref().get_rx_count()
483    }
484
485    fn get_wakers_count(&self) -> (usize, usize) {
486        self.as_ref().get_wakers_count()
487    }
488}
489
490impl<F: Flavor + FlavorMP> BlockingTxTrait<F::Item> for MTx<F> {
491    #[inline(always)]
492    fn clone_to_vec(self, count: usize) -> Vec<Self> {
493        let mut v = Vec::with_capacity(count);
494        for _ in 0..count - 1 {
495            v.push(self.clone());
496        }
497        v.push(self);
498        v
499    }
500
501    #[inline(always)]
502    fn send(&self, item: F::Item) -> Result<(), SendError<F::Item>> {
503        self.0.send(item)
504    }
505
506    #[inline(always)]
507    fn try_send(&self, item: F::Item) -> Result<(), TrySendError<F::Item>> {
508        self.0.try_send(item)
509    }
510
511    #[inline(always)]
512    fn send_timeout(
513        &self, item: F::Item, timeout: Duration,
514    ) -> Result<(), SendTimeoutError<F::Item>> {
515        self.0.send_timeout(item, timeout)
516    }
517
518    /// The number of messages in the channel at the moment
519    #[inline(always)]
520    fn len(&self) -> usize {
521        self.as_ref().len()
522    }
523
524    /// The capacity of the channel, return None for unbounded channel.
525    #[inline(always)]
526    fn capacity(&self) -> Option<usize> {
527        self.as_ref().capacity()
528    }
529
530    /// Whether channel is empty at the moment
531    #[inline(always)]
532    fn is_empty(&self) -> bool {
533        self.as_ref().is_empty()
534    }
535
536    /// Whether the channel is full at the moment
537    #[inline(always)]
538    fn is_full(&self) -> bool {
539        self.as_ref().is_full()
540    }
541
542    /// Return true if the other side has closed
543    #[inline(always)]
544    fn is_disconnected(&self) -> bool {
545        self.as_ref().get_rx_count() == 0
546    }
547
548    #[inline(always)]
549    fn get_tx_count(&self) -> usize {
550        self.as_ref().get_tx_count()
551    }
552
553    #[inline(always)]
554    fn get_rx_count(&self) -> usize {
555        self.as_ref().get_rx_count()
556    }
557
558    fn get_wakers_count(&self) -> (usize, usize) {
559        self.as_ref().get_wakers_count()
560    }
561}
562
563impl<F: Flavor> Deref for Tx<F> {
564    type Target = ChannelShared<F>;
565    #[inline(always)]
566    fn deref(&self) -> &ChannelShared<F> {
567        &self.shared
568    }
569}
570
571impl<F: Flavor> AsRef<ChannelShared<F>> for Tx<F> {
572    #[inline(always)]
573    fn as_ref(&self) -> &ChannelShared<F> {
574        &self.shared
575    }
576}
577
578impl<F: Flavor> AsRef<ChannelShared<F>> for MTx<F> {
579    #[inline(always)]
580    fn as_ref(&self) -> &ChannelShared<F> {
581        &self.0.shared
582    }
583}
584
585impl<T: Send + Unpin + 'static, F: Flavor<Item = T>> SenderType for Tx<F> {
586    type Flavor = F;
587    #[inline(always)]
588    fn new(shared: Arc<ChannelShared<F>>) -> Self {
589        Self::new(shared)
590    }
591}
592
593impl<F: Flavor> NotCloneable for Tx<F> {}
594
595impl<T: Send + Unpin + 'static, F: Flavor<Item = T> + FlavorMP> SenderType for MTx<F> {
596    type Flavor = F;
597    #[inline(always)]
598    fn new(shared: Arc<ChannelShared<F>>) -> Self {
599        MTx::new(shared)
600    }
601}