crossfire/select/
multiplex.rs

1use crate::backoff::*;
2use crate::flavor::{Flavor, FlavorBounded, FlavorImpl, FlavorNew, FlavorWrap};
3use crate::shared::{check_timeout, ChannelShared};
4use crate::waker::WakerState;
5use crate::waker_registry::{RegistrySend, SelectWaker, SelectWakerWrapper};
6use crate::BlockingRxTrait;
7use crate::SenderType;
8use crate::{RecvError, RecvTimeoutError, TryRecvError};
9use std::cell::Cell;
10use std::fmt;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::thread;
14use std::time::{Duration, Instant};
15
16pub const DEFAULT_WEIGHT: u32 = 128;
17
18/// Type alias for multiplexed channel flavor
19pub type Mux<F> = FlavorWrap<F, <F as Flavor>::Send, SelectWakerWrapper>;
20
21/// A multiplexer that owns multi channel receivers of the same Flavor type.
22///
23/// Unlike select, it focus on round-robin mode, allow to specified weight on each channel.
24/// It maintains a count of message received for each channel.
25/// That means if the last message recv on the `idx` channel, it will keep trying the same channel
26/// until the number equals to weight has been received. If the channel is empty, it will try the
27/// next one without touching the count. This strategy improves the hit rate of cpu cache and ensures no starvation.
28///
29/// NOTE: The default weight is 128. (When the weight of all channel set to 1, the performance is
30/// the worst because of cpu cache thrashing)
31///
32/// ## Capability and limitation:
33/// - New channel may be added on the fly
34/// - This abstraction is only designed for stable channels for most efficient select.
35/// - If channel close by sender, the receiver will be automatically close inside the Multiplex,
36/// user will not be notify until all its channels closed.
37/// - Due to it binds on Flavor interface, it cannot be use between different type.
38/// If you want to multiplex between list and array, can use the
39/// [CompatFlavor](crate::compat::CompatFlavor)
40/// - **NOTE** : It has internal mutability because it need to impl [BlockingRxTrait](crate::BlockingRxTrait),
41/// the adding channel process remains `&mut self`. Because `Multiplex` is a single consumer just
42/// like [Rx](crate::Rx), it does not have `Sync`. If you can guarantee no concurrent access you
43/// can manutally add the `Sync` back in parent struct.
44///
45///
46/// # Examples
47///
48/// Basic usage with multiple senders:
49///
50/// ```
51/// use crossfire::{mpsc::Array, MTx, select::{Multiplex, Mux}};
52/// use std::thread;
53///
54/// // Create a multiplexer with Array flavor
55/// let mut mp = Multiplex::<Array<i32>>::new();
56///
57/// // Create multiple senders through the multiplexer
58/// let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
59/// let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
60///
61/// // Send values from different threads
62/// let h1 = thread::spawn(move || {
63///     tx1.send(1).unwrap();
64/// });
65/// let h2 = thread::spawn(move || {
66///     tx2.send(2).unwrap();
67/// });
68///
69/// // Receive values through the multiplexer (order may vary)
70/// let val1 = mp.recv().unwrap();
71/// let val2 = mp.recv().unwrap();
72///
73/// h1.join().unwrap();
74/// h2.join().unwrap();
75/// ```
76pub struct Multiplex<F: Flavor> {
77    waker: Arc<SelectWaker>,
78    handlers: Vec<MultiplexHandle<F>>,
79    last_idx: Cell<usize>,
80    count: Cell<u32>,
81}
82
83unsafe impl<F: Flavor> Send for Multiplex<F> {}
84
85struct MultiplexHandle<F: Flavor> {
86    shared: Arc<ChannelShared<Mux<F>>>,
87    weight: u32,
88}
89
90impl<F: Flavor> Multiplex<F> {
91    /// Initialize Select with fair, round-robin strategy
92    pub fn new() -> Self {
93        Self {
94            waker: Arc::new(SelectWaker::new()),
95            handlers: Vec::with_capacity(10),
96            count: Cell::new(0),
97            last_idx: Cell::new(0),
98        }
99    }
100
101    #[inline]
102    fn _add_item(&mut self, flavor: F, weight: u32) -> Arc<ChannelShared<Mux<F>>> {
103        self.waker.add_opened();
104        let recvs = self.waker.clone().to_wrapper(self.handlers.len());
105        let shared = ChannelShared::new(Mux::<F>::from_inner(flavor), F::Send::new(), recvs);
106        self.handlers.push(MultiplexHandle { shared: shared.clone(), weight: weight - 1 });
107        self.last_idx.set(self.handlers.len() - 1);
108        shared
109    }
110
111    /// Add a new channels with a new() method to multiplex, return its sender.
112    ///
113    /// # Type Parameters
114    ///
115    /// * `S` - The sender type that implements SenderType with the appropriate Flavor,
116    /// may be async or blocking sender, MP or SP that match the `Flavor` type.
117    ///
118    /// # Note
119    ///
120    /// This method is only available for flavors that implement `FlavorNew` trait,
121    /// such as `List` / `One` flavor. For flavors like Array that don't implement `FlavorNew`,
122    /// use `bounded_tx` instead.
123    ///
124    /// # Example
125    ///
126    /// with mpsc::List (which sender type is [MTx](crate::MTx) and allow to clone)
127    ///
128    /// ```
129    /// use crossfire::{mpsc::List, MTx, select::{Multiplex, Mux}};
130    /// use tokio;
131    ///
132    /// let mut mp = Multiplex::<List<i32>>::new();
133    /// let tx1: MTx<Mux<List<i32>>> = mp.new_tx();
134    /// let tx2: MTx<Mux<List<i32>>> = mp.new_tx();
135    /// tx1.send(42).expect("send");
136    /// tx2.send(42).expect("send");
137    /// let value = mp.recv().unwrap();
138    /// assert_eq!(value, 42);
139    /// let value = mp.recv().unwrap();
140    /// assert_eq!(value, 42);
141    /// ```
142    ///
143    /// with spsc::One (which sender type is [Tx](crate::Tx) and not cloneable)
144    /// ```
145    /// use crossfire::{spsc::One, Tx, select::{Multiplex, Mux}};
146    /// use tokio;
147    ///
148    /// let mut mp = Multiplex::<One<i32>>::new();
149    /// // Creates an size-1 channel
150    /// let tx1: Tx<Mux<One<i32>>> = mp.new_tx();
151    /// // Creates another size-1 channel
152    /// let tx2: Tx<Mux<One<i32>>> = mp.new_tx();
153    /// std::thread::spawn(move ||{
154    ///     tx2.send(42).expect("send");
155    /// });
156    /// let value = mp.recv().unwrap();
157    /// assert_eq!(value, 42);
158    /// ```
159    pub fn new_tx<S>(&mut self) -> S
160    where
161        F: FlavorNew,
162        S: SenderType<Flavor = Mux<F>>,
163    {
164        let shared = self._add_item(F::new(), DEFAULT_WEIGHT);
165        return S::new(shared);
166    }
167
168    /// Add a channel of flavor (impl FlavorNew), with custom weight instead of default
169    /// (the default weight is 128)
170    pub fn new_tx_with_weight<S>(&mut self, weight: u32) -> S
171    where
172        F: FlavorNew,
173        S: SenderType<Flavor = Mux<F>>,
174    {
175        let shared = self._add_item(F::new(), weight);
176        return S::new(shared);
177    }
178
179    /// Creates a new bounded sender for the multiplexer
180    ///
181    /// # Arguments
182    ///
183    /// * `size` - The maximum capacity of the channel
184    ///
185    /// # Type Parameters
186    ///
187    /// * `S` - The sender type that implements SenderType with the appropriate Flavor
188    ///
189    /// # Example
190    ///
191    /// ```
192    /// use crossfire::{mpsc::Array, *, select::{Multiplex, Mux}};
193    ///
194    /// let mut mp = Multiplex::<Array<i32>>::new();
195    /// // Creates a bounded channel with capacity 10
196    /// let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
197    /// // Creates another bounded channel with capacity 20
198    /// let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(20);
199    /// tx1.send(42).expect("send");
200    /// std::thread::spawn(move || {
201    ///     tx2.send(42).expect("send");
202    /// });
203    /// let value = mp.recv().unwrap();
204    /// assert_eq!(value, 42);
205    /// let value = mp.recv().unwrap();
206    /// assert_eq!(value, 42);
207    /// ```
208    pub fn bounded_tx<S>(&mut self, size: usize) -> S
209    where
210        F: FlavorBounded,
211        S: SenderType<Flavor = Mux<F>>,
212    {
213        let shared = self._add_item(F::new_with_bound(size), DEFAULT_WEIGHT);
214        return S::new(shared);
215    }
216
217    /// Add a bounded channel to the multiplex, with custom weight (the default is 128)
218    pub fn bounded_tx_with_weight<S>(&mut self, size: usize, weight: u32) -> S
219    where
220        F: FlavorBounded,
221        S: SenderType<Flavor = Mux<F>>,
222    {
223        let shared = self._add_item(F::new_with_bound(size), weight);
224        return S::new(shared);
225    }
226
227    /// Attempts to receive a message from any of the multiplexed channels without blocking.
228    ///
229    /// Returns `Ok(item)` if a message is available on any of the channels.
230    /// Returns `Err(TryRecvError::Empty)` if no messages are available.
231    /// Returns `Err(TryRecvError::Disconnected)` if all senders have been dropped.
232    ///
233    /// # Example
234    ///
235    /// ```
236    /// use crossfire::{mpsc::Array, select::{Multiplex, Mux}, MTx, TryRecvError};
237    ///
238    /// let mut mp = Multiplex::<Array<i32>>::new();
239    /// let tx1: MTx<Mux<_>> = mp.bounded_tx(10);
240    /// let _tx2: MTx<Mux<_>> = mp.bounded_tx(10);
241    /// // No message available yet
242    /// assert_eq!(mp.try_recv(), Err(TryRecvError::Empty));
243    /// tx1.send(42).unwrap();
244    /// // Now a message is available
245    /// assert_eq!(mp.try_recv(), Ok(42));
246    /// ```
247    #[inline]
248    pub fn try_recv(&self) -> Result<F::Item, TryRecvError> {
249        if let Ok(item) = self._try_select_cached::<true>() {
250            return Ok(item);
251        }
252        if self.waker.get_opened_count() == 0 {
253            return Err(TryRecvError::Disconnected);
254        }
255        Err(TryRecvError::Empty)
256    }
257
258    /// Receives a message from any of the multiplexed channels, blocking if necessary.
259    ///
260    /// This method will block the current thread until a message is available on any of the channels,
261    /// or until all senders are dropped.
262    #[inline]
263    pub fn recv(&self) -> Result<F::Item, RecvError> {
264        match self._recv_blocking(None) {
265            Ok(item) => Ok(item),
266            Err(_) => Err(RecvError),
267        }
268    }
269
270    /// Receives a message from any of the multiplexed channels with a timeout.
271    /// Will block when channel is empty.
272    ///
273    /// The behavior is atomic: the message is either received successfully or the operation is canceled due to a timeout.
274    ///
275    /// Returns `Ok(T)` when successful.
276    ///
277    /// Returns Err([RecvTimeoutError::Timeout]) when a message could not be received because the channel is empty and the operation timed out.
278    ///
279    /// Returns Err([RecvTimeoutError::Disconnected]) if the sender has been dropped and the channel is empty.
280    #[inline]
281    pub fn recv_timeout(&self, timeout: Duration) -> Result<F::Item, RecvTimeoutError> {
282        match Instant::now().checked_add(timeout) {
283            Some(deadline) => match self._recv_blocking(Some(deadline)) {
284                Ok(item) => Ok(item),
285                Err(true) => Err(RecvTimeoutError::Disconnected),
286                Err(false) => Err(RecvTimeoutError::Timeout),
287            },
288            None => self.try_recv().map_err(|e| match e {
289                TryRecvError::Disconnected => RecvTimeoutError::Disconnected,
290                TryRecvError::Empty => RecvTimeoutError::Timeout,
291            }),
292        }
293    }
294
295    #[inline(always)]
296    fn _try_select_cached<const FINAL: bool>(&self) -> Result<F::Item, usize> {
297        let last_idx = self.last_idx.get();
298        let handle = unsafe { self.handlers.get_unchecked(last_idx) };
299        let count = self.count.get();
300        let loop_count;
301        if count > 0 {
302            if let Some(msg) = handle.shared.inner.try_recv_cached() {
303                handle.shared.on_recv();
304                self.count.set(count - 1);
305                return Ok(msg);
306            }
307            loop_count = self.handlers.len() - 1;
308        } else {
309            loop_count = self.handlers.len();
310        };
311        if let Some(item) = self._try_select_all::<FINAL>(last_idx, loop_count) {
312            return Ok(item);
313        }
314        Err(last_idx)
315    }
316
317    #[inline(always)]
318    fn _try_select_all<const FINAL: bool>(
319        &self, mut idx: usize, loop_count: usize,
320    ) -> Option<F::Item> {
321        let len = self.handlers.len();
322        for _ in 0..loop_count {
323            idx = if idx + 1 >= len { 0 } else { idx + 1 };
324            let handle = unsafe { self.handlers.get_unchecked(idx) };
325            if let Some(msg) = if FINAL {
326                handle.shared.inner.try_recv_final()
327            } else {
328                handle.shared.inner.try_recv()
329            } {
330                handle.shared.on_recv();
331                self.count.set(handle.weight);
332                self.last_idx.set(idx);
333                return Some(msg);
334            }
335        }
336        None
337    }
338
339    /// Internal method to perform blocking receive with optional timeout
340    ///
341    /// # Parameters
342    ///
343    /// * `deadline` - Optional deadline for the operation; if None, blocks indefinitely
344    ///
345    /// # Returns
346    ///
347    /// Returns `Ok(item)` on successful receive, `Err(true)` if disconnected, `Err(false)` if timed out
348    #[inline]
349    fn _recv_blocking(&self, deadline: Option<Instant>) -> Result<F::Item, bool> {
350        let mut start_idx;
351        match self._try_select_cached::<false>() {
352            Ok(item) => return Ok(item),
353            Err(idx) => {
354                start_idx = idx;
355            }
356        }
357        let mut backoff = Backoff::from(BackoffConfig::detect());
358        backoff.snooze();
359        let len = self.handlers.len();
360        loop {
361            loop {
362                if let Some(item) = self._try_select_all::<false>(start_idx, len) {
363                    return Ok(item);
364                }
365                if backoff.snooze() {
366                    break;
367                }
368            }
369            // TODO For thread, actually the waker can be reuse and not change
370            self.waker.init_blocking();
371            if let Some(item) = self._try_select_all::<true>(start_idx, len) {
372                return Ok(item);
373            }
374            if self.waker.get_opened_count() == 0 {
375                return Err(true);
376            }
377            let mut state = WakerState::Init as u8;
378            while state < WakerState::Woken as u8 {
379                match check_timeout(deadline) {
380                    Ok(None) => {
381                        thread::park();
382                    }
383                    Ok(Some(dur)) => {
384                        thread::park_timeout(dur);
385                    }
386                    Err(_) => {
387                        // As sc don't need to abandon
388                        return Err(false);
389                    }
390                }
391                state = self.waker.get_waker_state(Ordering::SeqCst);
392            }
393            backoff.reset();
394            start_idx = self.waker.get_hint();
395        }
396    }
397}
398
399impl<F: Flavor> Drop for Multiplex<F> {
400    #[inline]
401    fn drop(&mut self) {
402        for handle in &self.handlers {
403            handle.shared.close_rx();
404        }
405    }
406}
407
408impl<F: Flavor> fmt::Debug for Multiplex<F> {
409    #[inline]
410    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411        write!(f, "Multiplex<{}>", std::any::type_name::<F>())
412    }
413}
414
415impl<F: Flavor> fmt::Display for Multiplex<F> {
416    #[inline]
417    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418        fmt::Debug::fmt(self, f)
419    }
420}
421
422impl<F: Flavor> BlockingRxTrait<F::Item> for Multiplex<F> {
423    #[inline(always)]
424    fn recv(&self) -> Result<F::Item, RecvError> {
425        Self::recv(self)
426    }
427
428    #[inline(always)]
429    fn try_recv(&self) -> Result<F::Item, TryRecvError> {
430        Self::try_recv(self)
431    }
432
433    #[inline(always)]
434    fn recv_timeout(&self, timeout: Duration) -> Result<F::Item, RecvTimeoutError> {
435        Self::recv_timeout(self, timeout)
436    }
437
438    /// The number of messages in the channel at the moment
439    #[inline(always)]
440    fn len(&self) -> usize {
441        0
442    }
443
444    /// always return None
445    #[inline(always)]
446    fn capacity(&self) -> Option<usize> {
447        None
448    }
449
450    /// Returns true when all the channel's empty
451    #[inline(always)]
452    fn is_empty(&self) -> bool {
453        for handle in &self.handlers {
454            if !handle.shared.is_empty() {
455                return false;
456            }
457        }
458        true
459    }
460
461    /// Not practical to impl
462    #[inline(always)]
463    fn is_full(&self) -> bool {
464        false
465    }
466
467    /// Return true if all sender has been close
468    #[inline(always)]
469    fn is_disconnected(&self) -> bool {
470        self.get_tx_count() == 0
471    }
472
473    /// NOTE: it does not count all the clones to the senders
474    #[inline(always)]
475    fn get_tx_count(&self) -> usize {
476        self.waker.get_opened_count()
477    }
478
479    /// This is single consumer
480    #[inline(always)]
481    fn get_rx_count(&self) -> usize {
482        1
483    }
484
485    fn get_wakers_count(&self) -> (usize, usize) {
486        (0, 0)
487    }
488
489    fn clone_to_vec(self, _count: usize) -> Vec<Self> {
490        unimplemented!();
491    }
492}