crossfire/select/multiplex.rs
1use crate::backoff::*;
2use crate::flavor::{Flavor, FlavorBounded, FlavorImpl, FlavorNew, FlavorWrap};
3use crate::shared::{check_timeout, ChannelShared};
4use crate::waker::WakerState;
5use crate::waker_registry::{RegistrySend, SelectWaker, SelectWakerWrapper};
6use crate::BlockingRxTrait;
7use crate::SenderType;
8use crate::{RecvError, RecvTimeoutError, TryRecvError};
9use std::cell::Cell;
10use std::fmt;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::thread;
14use std::time::{Duration, Instant};
15
16pub const DEFAULT_WEIGHT: u32 = 128;
17
18/// Type alias for multiplexed channel flavor
19pub type Mux<F> = FlavorWrap<F, <F as Flavor>::Send, SelectWakerWrapper>;
20
21/// A multiplexer that owns multi channel receivers of the same Flavor type.
22///
23/// Unlike select, it focus on round-robin mode, allow to specified weight on each channel.
24/// It maintains a count of message received for each channel.
25/// That means if the last message recv on the `idx` channel, it will keep trying the same channel
26/// until the number equals to weight has been received. If the channel is empty, it will try the
27/// next one without touching the count. This strategy improves the hit rate of cpu cache and ensures no starvation.
28///
29/// NOTE: The default weight is 128. (When the weight of all channel set to 1, the performance is
30/// the worst because of cpu cache thrashing)
31///
32/// ## Capability and limitation:
33/// - New channel may be added on the fly
34/// - This abstraction is only designed for stable channels for most efficient select.
35/// - If channel close by sender, the receiver will be automatically close inside the Multiplex,
36/// user will not be notify until all its channels closed.
37/// - Due to it binds on Flavor interface, it cannot be use between different type.
38/// If you want to multiplex between list and array, can use the
39/// [CompatFlavor](crate::compat::CompatFlavor)
40/// - **NOTE** : It has internal mutability because it need to impl [BlockingRxTrait](crate::BlockingRxTrait),
41/// the adding channel process remains `&mut self`. Because `Multiplex` is a single consumer just
42/// like [Rx](crate::Rx), it does not have `Sync`. If you can guarantee no concurrent access you
43/// can manutally add the `Sync` back in parent struct.
44///
45///
46/// # Examples
47///
48/// Basic usage with multiple senders:
49///
50/// ```
51/// use crossfire::{mpsc::Array, MTx, select::{Multiplex, Mux}};
52/// use std::thread;
53///
54/// // Create a multiplexer with Array flavor
55/// let mut mp = Multiplex::<Array<i32>>::new();
56///
57/// // Create multiple senders through the multiplexer
58/// let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
59/// let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
60///
61/// // Send values from different threads
62/// let h1 = thread::spawn(move || {
63/// tx1.send(1).unwrap();
64/// });
65/// let h2 = thread::spawn(move || {
66/// tx2.send(2).unwrap();
67/// });
68///
69/// // Receive values through the multiplexer (order may vary)
70/// let val1 = mp.recv().unwrap();
71/// let val2 = mp.recv().unwrap();
72///
73/// h1.join().unwrap();
74/// h2.join().unwrap();
75/// ```
76pub struct Multiplex<F: Flavor> {
77 waker: Arc<SelectWaker>,
78 handlers: Vec<MultiplexHandle<F>>,
79 last_idx: Cell<usize>,
80 count: Cell<u32>,
81}
82
83unsafe impl<F: Flavor> Send for Multiplex<F> {}
84
85struct MultiplexHandle<F: Flavor> {
86 shared: Arc<ChannelShared<Mux<F>>>,
87 weight: u32,
88}
89
90impl<F: Flavor> Multiplex<F> {
91 /// Initialize Select with fair, round-robin strategy
92 pub fn new() -> Self {
93 Self {
94 waker: Arc::new(SelectWaker::new()),
95 handlers: Vec::with_capacity(10),
96 count: Cell::new(0),
97 last_idx: Cell::new(0),
98 }
99 }
100
101 #[inline]
102 fn _add_item(&mut self, flavor: F, weight: u32) -> Arc<ChannelShared<Mux<F>>> {
103 self.waker.add_opened();
104 let recvs = self.waker.clone().to_wrapper(self.handlers.len());
105 let shared = ChannelShared::new(Mux::<F>::from_inner(flavor), F::Send::new(), recvs);
106 self.handlers.push(MultiplexHandle { shared: shared.clone(), weight: weight - 1 });
107 self.last_idx.set(self.handlers.len() - 1);
108 shared
109 }
110
111 /// Add a new channels with a new() method to multiplex, return its sender.
112 ///
113 /// # Type Parameters
114 ///
115 /// * `S` - The sender type that implements SenderType with the appropriate Flavor,
116 /// may be async or blocking sender, MP or SP that match the `Flavor` type.
117 ///
118 /// # Note
119 ///
120 /// This method is only available for flavors that implement `FlavorNew` trait,
121 /// such as `List` / `One` flavor. For flavors like Array that don't implement `FlavorNew`,
122 /// use `bounded_tx` instead.
123 ///
124 /// # Example
125 ///
126 /// with mpsc::List (which sender type is [MTx](crate::MTx) and allow to clone)
127 ///
128 /// ```
129 /// use crossfire::{mpsc::List, MTx, select::{Multiplex, Mux}};
130 /// use tokio;
131 ///
132 /// let mut mp = Multiplex::<List<i32>>::new();
133 /// let tx1: MTx<Mux<List<i32>>> = mp.new_tx();
134 /// let tx2: MTx<Mux<List<i32>>> = mp.new_tx();
135 /// tx1.send(42).expect("send");
136 /// tx2.send(42).expect("send");
137 /// let value = mp.recv().unwrap();
138 /// assert_eq!(value, 42);
139 /// let value = mp.recv().unwrap();
140 /// assert_eq!(value, 42);
141 /// ```
142 ///
143 /// with spsc::One (which sender type is [Tx](crate::Tx) and not cloneable)
144 /// ```
145 /// use crossfire::{spsc::One, Tx, select::{Multiplex, Mux}};
146 /// use tokio;
147 ///
148 /// let mut mp = Multiplex::<One<i32>>::new();
149 /// // Creates an size-1 channel
150 /// let tx1: Tx<Mux<One<i32>>> = mp.new_tx();
151 /// // Creates another size-1 channel
152 /// let tx2: Tx<Mux<One<i32>>> = mp.new_tx();
153 /// std::thread::spawn(move ||{
154 /// tx2.send(42).expect("send");
155 /// });
156 /// let value = mp.recv().unwrap();
157 /// assert_eq!(value, 42);
158 /// ```
159 pub fn new_tx<S>(&mut self) -> S
160 where
161 F: FlavorNew,
162 S: SenderType<Flavor = Mux<F>>,
163 {
164 let shared = self._add_item(F::new(), DEFAULT_WEIGHT);
165 return S::new(shared);
166 }
167
168 /// Add a channel of flavor (impl FlavorNew), with custom weight instead of default
169 /// (the default weight is 128)
170 pub fn new_tx_with_weight<S>(&mut self, weight: u32) -> S
171 where
172 F: FlavorNew,
173 S: SenderType<Flavor = Mux<F>>,
174 {
175 let shared = self._add_item(F::new(), weight);
176 return S::new(shared);
177 }
178
179 /// Creates a new bounded sender for the multiplexer
180 ///
181 /// # Arguments
182 ///
183 /// * `size` - The maximum capacity of the channel
184 ///
185 /// # Type Parameters
186 ///
187 /// * `S` - The sender type that implements SenderType with the appropriate Flavor
188 ///
189 /// # Example
190 ///
191 /// ```
192 /// use crossfire::{mpsc::Array, *, select::{Multiplex, Mux}};
193 ///
194 /// let mut mp = Multiplex::<Array<i32>>::new();
195 /// // Creates a bounded channel with capacity 10
196 /// let tx1: MTx<Mux<Array<i32>>> = mp.bounded_tx(10);
197 /// // Creates another bounded channel with capacity 20
198 /// let tx2: MTx<Mux<Array<i32>>> = mp.bounded_tx(20);
199 /// tx1.send(42).expect("send");
200 /// std::thread::spawn(move || {
201 /// tx2.send(42).expect("send");
202 /// });
203 /// let value = mp.recv().unwrap();
204 /// assert_eq!(value, 42);
205 /// let value = mp.recv().unwrap();
206 /// assert_eq!(value, 42);
207 /// ```
208 pub fn bounded_tx<S>(&mut self, size: usize) -> S
209 where
210 F: FlavorBounded,
211 S: SenderType<Flavor = Mux<F>>,
212 {
213 let shared = self._add_item(F::new_with_bound(size), DEFAULT_WEIGHT);
214 return S::new(shared);
215 }
216
217 /// Add a bounded channel to the multiplex, with custom weight (the default is 128)
218 pub fn bounded_tx_with_weight<S>(&mut self, size: usize, weight: u32) -> S
219 where
220 F: FlavorBounded,
221 S: SenderType<Flavor = Mux<F>>,
222 {
223 let shared = self._add_item(F::new_with_bound(size), weight);
224 return S::new(shared);
225 }
226
227 /// Attempts to receive a message from any of the multiplexed channels without blocking.
228 ///
229 /// Returns `Ok(item)` if a message is available on any of the channels.
230 /// Returns `Err(TryRecvError::Empty)` if no messages are available.
231 /// Returns `Err(TryRecvError::Disconnected)` if all senders have been dropped.
232 ///
233 /// # Example
234 ///
235 /// ```
236 /// use crossfire::{mpsc::Array, select::{Multiplex, Mux}, MTx, TryRecvError};
237 ///
238 /// let mut mp = Multiplex::<Array<i32>>::new();
239 /// let tx1: MTx<Mux<_>> = mp.bounded_tx(10);
240 /// let _tx2: MTx<Mux<_>> = mp.bounded_tx(10);
241 /// // No message available yet
242 /// assert_eq!(mp.try_recv(), Err(TryRecvError::Empty));
243 /// tx1.send(42).unwrap();
244 /// // Now a message is available
245 /// assert_eq!(mp.try_recv(), Ok(42));
246 /// ```
247 #[inline]
248 pub fn try_recv(&self) -> Result<F::Item, TryRecvError> {
249 if let Ok(item) = self._try_select_cached::<true>() {
250 return Ok(item);
251 }
252 if self.waker.get_opened_count() == 0 {
253 return Err(TryRecvError::Disconnected);
254 }
255 Err(TryRecvError::Empty)
256 }
257
258 /// Receives a message from any of the multiplexed channels, blocking if necessary.
259 ///
260 /// This method will block the current thread until a message is available on any of the channels,
261 /// or until all senders are dropped.
262 #[inline]
263 pub fn recv(&self) -> Result<F::Item, RecvError> {
264 match self._recv_blocking(None) {
265 Ok(item) => Ok(item),
266 Err(_) => Err(RecvError),
267 }
268 }
269
270 /// Receives a message from any of the multiplexed channels with a timeout.
271 /// Will block when channel is empty.
272 ///
273 /// The behavior is atomic: the message is either received successfully or the operation is canceled due to a timeout.
274 ///
275 /// Returns `Ok(T)` when successful.
276 ///
277 /// Returns Err([RecvTimeoutError::Timeout]) when a message could not be received because the channel is empty and the operation timed out.
278 ///
279 /// Returns Err([RecvTimeoutError::Disconnected]) if the sender has been dropped and the channel is empty.
280 #[inline]
281 pub fn recv_timeout(&self, timeout: Duration) -> Result<F::Item, RecvTimeoutError> {
282 match Instant::now().checked_add(timeout) {
283 Some(deadline) => match self._recv_blocking(Some(deadline)) {
284 Ok(item) => Ok(item),
285 Err(true) => Err(RecvTimeoutError::Disconnected),
286 Err(false) => Err(RecvTimeoutError::Timeout),
287 },
288 None => self.try_recv().map_err(|e| match e {
289 TryRecvError::Disconnected => RecvTimeoutError::Disconnected,
290 TryRecvError::Empty => RecvTimeoutError::Timeout,
291 }),
292 }
293 }
294
295 #[inline(always)]
296 fn _try_select_cached<const FINAL: bool>(&self) -> Result<F::Item, usize> {
297 let last_idx = self.last_idx.get();
298 let handle = unsafe { self.handlers.get_unchecked(last_idx) };
299 let count = self.count.get();
300 let loop_count;
301 if count > 0 {
302 if let Some(msg) = handle.shared.inner.try_recv_cached() {
303 handle.shared.on_recv();
304 self.count.set(count - 1);
305 return Ok(msg);
306 }
307 loop_count = self.handlers.len() - 1;
308 } else {
309 loop_count = self.handlers.len();
310 };
311 if let Some(item) = self._try_select_all::<FINAL>(last_idx, loop_count) {
312 return Ok(item);
313 }
314 Err(last_idx)
315 }
316
317 #[inline(always)]
318 fn _try_select_all<const FINAL: bool>(
319 &self, mut idx: usize, loop_count: usize,
320 ) -> Option<F::Item> {
321 let len = self.handlers.len();
322 for _ in 0..loop_count {
323 idx = if idx + 1 >= len { 0 } else { idx + 1 };
324 let handle = unsafe { self.handlers.get_unchecked(idx) };
325 if let Some(msg) = if FINAL {
326 handle.shared.inner.try_recv_final()
327 } else {
328 handle.shared.inner.try_recv()
329 } {
330 handle.shared.on_recv();
331 self.count.set(handle.weight);
332 self.last_idx.set(idx);
333 return Some(msg);
334 }
335 }
336 None
337 }
338
339 /// Internal method to perform blocking receive with optional timeout
340 ///
341 /// # Parameters
342 ///
343 /// * `deadline` - Optional deadline for the operation; if None, blocks indefinitely
344 ///
345 /// # Returns
346 ///
347 /// Returns `Ok(item)` on successful receive, `Err(true)` if disconnected, `Err(false)` if timed out
348 #[inline]
349 fn _recv_blocking(&self, deadline: Option<Instant>) -> Result<F::Item, bool> {
350 let mut start_idx;
351 match self._try_select_cached::<false>() {
352 Ok(item) => return Ok(item),
353 Err(idx) => {
354 start_idx = idx;
355 }
356 }
357 let mut backoff = Backoff::from(BackoffConfig::detect());
358 backoff.snooze();
359 let len = self.handlers.len();
360 loop {
361 loop {
362 if let Some(item) = self._try_select_all::<false>(start_idx, len) {
363 return Ok(item);
364 }
365 if backoff.snooze() {
366 break;
367 }
368 }
369 // TODO For thread, actually the waker can be reuse and not change
370 self.waker.init_blocking();
371 if let Some(item) = self._try_select_all::<true>(start_idx, len) {
372 return Ok(item);
373 }
374 if self.waker.get_opened_count() == 0 {
375 return Err(true);
376 }
377 let mut state = WakerState::Init as u8;
378 while state < WakerState::Woken as u8 {
379 match check_timeout(deadline) {
380 Ok(None) => {
381 thread::park();
382 }
383 Ok(Some(dur)) => {
384 thread::park_timeout(dur);
385 }
386 Err(_) => {
387 // As sc don't need to abandon
388 return Err(false);
389 }
390 }
391 state = self.waker.get_waker_state(Ordering::SeqCst);
392 }
393 backoff.reset();
394 start_idx = self.waker.get_hint();
395 }
396 }
397}
398
399impl<F: Flavor> Drop for Multiplex<F> {
400 #[inline]
401 fn drop(&mut self) {
402 for handle in &self.handlers {
403 handle.shared.close_rx();
404 }
405 }
406}
407
408impl<F: Flavor> fmt::Debug for Multiplex<F> {
409 #[inline]
410 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
411 write!(f, "Multiplex<{}>", std::any::type_name::<F>())
412 }
413}
414
415impl<F: Flavor> fmt::Display for Multiplex<F> {
416 #[inline]
417 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418 fmt::Debug::fmt(self, f)
419 }
420}
421
422impl<F: Flavor> BlockingRxTrait<F::Item> for Multiplex<F> {
423 #[inline(always)]
424 fn recv(&self) -> Result<F::Item, RecvError> {
425 Self::recv(self)
426 }
427
428 #[inline(always)]
429 fn try_recv(&self) -> Result<F::Item, TryRecvError> {
430 Self::try_recv(self)
431 }
432
433 #[inline(always)]
434 fn recv_timeout(&self, timeout: Duration) -> Result<F::Item, RecvTimeoutError> {
435 Self::recv_timeout(self, timeout)
436 }
437
438 /// The number of messages in the channel at the moment
439 #[inline(always)]
440 fn len(&self) -> usize {
441 0
442 }
443
444 /// always return None
445 #[inline(always)]
446 fn capacity(&self) -> Option<usize> {
447 None
448 }
449
450 /// Returns true when all the channel's empty
451 #[inline(always)]
452 fn is_empty(&self) -> bool {
453 for handle in &self.handlers {
454 if !handle.shared.is_empty() {
455 return false;
456 }
457 }
458 true
459 }
460
461 /// Not practical to impl
462 #[inline(always)]
463 fn is_full(&self) -> bool {
464 false
465 }
466
467 /// Return true if all sender has been close
468 #[inline(always)]
469 fn is_disconnected(&self) -> bool {
470 self.get_tx_count() == 0
471 }
472
473 /// NOTE: it does not count all the clones to the senders
474 #[inline(always)]
475 fn get_tx_count(&self) -> usize {
476 self.waker.get_opened_count()
477 }
478
479 /// This is single consumer
480 #[inline(always)]
481 fn get_rx_count(&self) -> usize {
482 1
483 }
484
485 fn get_wakers_count(&self) -> (usize, usize) {
486 (0, 0)
487 }
488
489 fn clone_to_vec(self, _count: usize) -> Vec<Self> {
490 unimplemented!();
491 }
492}