1use crate::backoff::*;
2use crate::flavor::FlavorMP;
3use crate::{shared::*, trace_log, AsyncTx, MAsyncTx, NotCloneable, SenderType};
4use std::cell::Cell;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::MaybeUninit;
8use std::ops::Deref;
9use std::sync::atomic::Ordering;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13pub struct Tx<F: Flavor> {
47 pub(crate) shared: Arc<ChannelShared<F>>,
48 _phan: PhantomData<Cell<()>>,
50 waker_cache: WakerCache<*const F::Item>,
51}
52
53unsafe impl<F: Flavor> Send for Tx<F> {}
54
55impl<F: Flavor> fmt::Debug for Tx<F> {
56 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57 write!(f, "Tx{:p}", self)
58 }
59}
60
61impl<F: Flavor> fmt::Display for Tx<F> {
62 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
63 write!(f, "Tx{:p}", self)
64 }
65}
66
67impl<F: Flavor> Drop for Tx<F> {
68 #[inline(always)]
69 fn drop(&mut self) {
70 self.shared.close_tx();
71 }
72}
73
74impl<F: Flavor> From<AsyncTx<F>> for Tx<F> {
75 fn from(value: AsyncTx<F>) -> Self {
76 value.add_tx();
77 Self::new(value.shared.clone())
78 }
79}
80
81impl<F: Flavor> Tx<F> {
82 #[inline]
83 pub(crate) fn new(shared: Arc<ChannelShared<F>>) -> Self {
84 Self { shared, waker_cache: WakerCache::new(), _phan: Default::default() }
85 }
86
87 #[inline(always)]
88 pub(crate) fn _send_bounded(
89 &self, item: &MaybeUninit<F::Item>, deadline: Option<Instant>,
90 ) -> Result<(), SendTimeoutError<F::Item>> {
91 let shared = &self.shared;
92 let large = shared.large;
93 let backoff_cfg = BackoffConfig::detect().spin(2).limit(shared.backoff_limit);
94 let mut backoff = Backoff::from(backoff_cfg);
95 let congest = shared.sender_direct_copy();
96 let direct_copy = false;
98 if large {
100 backoff.set_step(2);
101 }
102 loop {
103 let r = if large { backoff.yield_now() } else { backoff.spin() };
104 if direct_copy && large {
105 match shared.inner.try_send_oneshot(item.as_ptr()) {
106 Some(false) => break,
107 None => {
108 if r {
109 break;
110 }
111 continue;
112 }
113 _ => {
114 shared.on_send();
115 trace_log!("tx: send");
116 std::thread::yield_now();
117 return Ok(());
118 }
119 }
120 } else {
121 if false == shared.inner.try_send(&item) {
122 if r {
123 break;
124 }
125 continue;
126 }
127 shared.on_send();
128 trace_log!("tx: send");
129 return Ok(());
130 }
131 }
132 let direct_copy_ptr: *const F::Item = std::ptr::null();
133 let mut state: u8;
136 let mut o_waker: Option<<F::Send as Registry>::Waker> = None;
137 macro_rules! return_ok {
138 () => {
139 trace_log!("tx: send {:?}", o_waker);
140 if shared.is_full() {
141 std::thread::yield_now();
143 self.senders.cache_waker(o_waker, &self.waker_cache);
144 }
145 return Ok(())
146 };
147 }
148 loop {
149 self.senders.reg_waker_blocking(&mut o_waker, &self.waker_cache, direct_copy_ptr);
150 state = shared.sender_double_check::<false>(&item, &mut o_waker);
154 trace_log!("tx: sender_double_check {:?} state={}", o_waker, state);
155 while state < WakerState::Woken as u8 {
156 if congest {
157 state = shared.sender_snooze(&o_waker, &mut backoff);
158 }
159 if state <= WakerState::Waiting as u8 {
160 match check_timeout(deadline) {
161 Ok(None) => {
162 std::thread::park();
163 }
164 Ok(Some(dur)) => {
165 std::thread::park_timeout(dur);
166 }
167 Err(_) => {
168 if shared.abandon_send_waker(o_waker.as_ref().unwrap()) {
169 return Err(SendTimeoutError::Timeout(unsafe {
170 item.assume_init_read()
171 }));
172 } else {
173 return Ok(());
176 }
177 }
178 }
179 state = self.senders.get_waker_state(&o_waker, Ordering::SeqCst);
180 trace_log!("tx: after park state={}", state);
181 }
182 }
183 if state == WakerState::Woken as u8 {
184 backoff.reset();
185 loop {
186 if shared.inner.try_send(&item) {
187 shared.on_send();
188 return_ok!();
189 }
190 if backoff.is_completed() {
191 break;
192 }
193 backoff.snooze();
194 }
195 } else if state == WakerState::Done as u8 {
196 return_ok!();
197 } else {
198 debug_assert_eq!(state, WakerState::Closed as u8);
199 return Err(SendTimeoutError::Disconnected(unsafe { item.assume_init_read() }));
200 }
201 }
202 }
203
204 #[inline]
211 pub fn send(&self, item: F::Item) -> Result<(), SendError<F::Item>> {
212 let shared = &self.shared;
213 if shared.is_rx_closed() {
214 return Err(SendError(item));
215 }
216 let _item = MaybeUninit::new(item);
217 if shared.inner.try_send(&_item) {
218 shared.on_send();
219 return Ok(());
220 }
221 match self._send_bounded(&_item, None) {
222 Ok(_) => return Ok(()),
223 Err(SendTimeoutError::Disconnected(e)) => Err(SendError(e)),
224 Err(SendTimeoutError::Timeout(_)) => unreachable!(),
225 }
226 }
227
228 #[inline]
236 pub fn try_send(&self, item: F::Item) -> Result<(), TrySendError<F::Item>> {
237 let shared = &self.shared;
238 if shared.is_rx_closed() {
239 return Err(TrySendError::Disconnected(item));
240 }
241 let _item = MaybeUninit::new(item);
242 if shared.inner.try_send(&_item) {
243 shared.on_send();
244 return Ok(());
245 } else {
246 return Err(TrySendError::Full(unsafe { _item.assume_init_read() }));
247 }
248 }
249
250 #[inline]
261 pub fn send_timeout(
262 &self, item: F::Item, timeout: Duration,
263 ) -> Result<(), SendTimeoutError<F::Item>> {
264 let shared = &self.shared;
265 if shared.is_rx_closed() {
266 return Err(SendTimeoutError::Disconnected(item));
267 }
268 match Instant::now().checked_add(timeout) {
269 None => self.try_send(item).map_err(|e| match e {
270 TrySendError::Disconnected(t) => SendTimeoutError::Disconnected(t),
271 TrySendError::Full(t) => SendTimeoutError::Timeout(t),
272 }),
273 Some(deadline) => {
274 let _item = MaybeUninit::new(item);
275 if shared.inner.try_send(&_item) {
276 shared.on_send();
277 return Ok(());
278 }
279 match self._send_bounded(&_item, Some(deadline)) {
280 Ok(_) => return Ok(()),
281 Err(e) => return Err(e),
282 }
283 }
284 }
285 }
286
287 #[inline(always)]
289 pub fn is_disconnected(&self) -> bool {
290 self.shared.is_rx_closed()
291 }
292
293 #[inline]
294 pub fn into_async(self) -> AsyncTx<F> {
295 self.into()
296 }
297}
298
299pub struct MTx<F: Flavor>(pub(crate) Tx<F>);
306
307impl<F: Flavor> fmt::Debug for MTx<F> {
308 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
309 write!(f, "MTx{:p}", self)
310 }
311}
312
313impl<F: Flavor> fmt::Display for MTx<F> {
314 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
315 write!(f, "MTx{:p}", self)
316 }
317}
318
319impl<F: Flavor> From<MTx<F>> for Tx<F> {
320 fn from(tx: MTx<F>) -> Self {
321 tx.0
322 }
323}
324
325impl<F: Flavor> From<MAsyncTx<F>> for MTx<F> {
326 fn from(value: MAsyncTx<F>) -> Self {
327 value.add_tx();
328 Self(Tx::new(value.shared.clone()))
329 }
330}
331
332unsafe impl<F: Flavor> Sync for MTx<F> {}
333
334impl<F: Flavor + FlavorMP> MTx<F> {
335 #[inline]
336 pub(crate) fn new(shared: Arc<ChannelShared<F>>) -> Self {
337 Self(Tx::new(shared))
338 }
339
340 #[inline]
341 pub fn into_async(self) -> MAsyncTx<F> {
342 self.into()
343 }
344}
345
346impl<F: Flavor> Clone for MTx<F> {
347 #[inline]
348 fn clone(&self) -> Self {
349 let inner = &self.0;
350 inner.shared.add_tx();
351 Self(Tx::new(inner.shared.clone()))
352 }
353}
354
355impl<F: Flavor> Deref for MTx<F> {
356 type Target = Tx<F>;
357
358 #[inline(always)]
360 fn deref(&self) -> &Self::Target {
361 &self.0
362 }
363}
364
365pub trait BlockingTxTrait<T: Send + 'static>: Send + 'static + fmt::Debug + fmt::Display {
367 fn send(&self, _item: T) -> Result<(), SendError<T>>;
373
374 fn try_send(&self, _item: T) -> Result<(), TrySendError<T>>;
382
383 fn send_timeout(&self, item: T, timeout: Duration) -> Result<(), SendTimeoutError<T>>;
392
393 fn len(&self) -> usize;
395
396 fn capacity(&self) -> Option<usize>;
398
399 fn is_empty(&self) -> bool;
401
402 fn is_full(&self) -> bool;
404
405 fn is_disconnected(&self) -> bool;
407
408 fn get_tx_count(&self) -> usize;
410
411 fn get_rx_count(&self) -> usize;
413
414 fn clone_to_vec(self, count: usize) -> Vec<Self>
415 where
416 Self: Sized;
417
418 fn get_wakers_count(&self) -> (usize, usize);
419}
420
421impl<F: Flavor> BlockingTxTrait<F::Item> for Tx<F> {
422 #[inline(always)]
423 fn clone_to_vec(self, _count: usize) -> Vec<Self> {
424 assert_eq!(_count, 1);
425 vec![self]
426 }
427
428 #[inline(always)]
429 fn send(&self, item: F::Item) -> Result<(), SendError<F::Item>> {
430 Tx::send(self, item)
431 }
432
433 #[inline(always)]
434 fn try_send(&self, item: F::Item) -> Result<(), TrySendError<F::Item>> {
435 Tx::try_send(self, item)
436 }
437
438 #[inline(always)]
439 fn send_timeout(
440 &self, item: F::Item, timeout: Duration,
441 ) -> Result<(), SendTimeoutError<F::Item>> {
442 Tx::send_timeout(&self, item, timeout)
443 }
444
445 #[inline(always)]
447 fn len(&self) -> usize {
448 self.as_ref().len()
449 }
450
451 #[inline(always)]
453 fn capacity(&self) -> Option<usize> {
454 self.as_ref().capacity()
455 }
456
457 #[inline(always)]
459 fn is_empty(&self) -> bool {
460 self.as_ref().is_empty()
461 }
462
463 #[inline(always)]
465 fn is_full(&self) -> bool {
466 self.as_ref().is_full()
467 }
468
469 #[inline(always)]
471 fn is_disconnected(&self) -> bool {
472 self.as_ref().get_rx_count() == 0
473 }
474
475 #[inline(always)]
476 fn get_tx_count(&self) -> usize {
477 self.as_ref().get_tx_count()
478 }
479
480 #[inline(always)]
481 fn get_rx_count(&self) -> usize {
482 self.as_ref().get_rx_count()
483 }
484
485 fn get_wakers_count(&self) -> (usize, usize) {
486 self.as_ref().get_wakers_count()
487 }
488}
489
490impl<F: Flavor + FlavorMP> BlockingTxTrait<F::Item> for MTx<F> {
491 #[inline(always)]
492 fn clone_to_vec(self, count: usize) -> Vec<Self> {
493 let mut v = Vec::with_capacity(count);
494 for _ in 0..count - 1 {
495 v.push(self.clone());
496 }
497 v.push(self);
498 v
499 }
500
501 #[inline(always)]
502 fn send(&self, item: F::Item) -> Result<(), SendError<F::Item>> {
503 self.0.send(item)
504 }
505
506 #[inline(always)]
507 fn try_send(&self, item: F::Item) -> Result<(), TrySendError<F::Item>> {
508 self.0.try_send(item)
509 }
510
511 #[inline(always)]
512 fn send_timeout(
513 &self, item: F::Item, timeout: Duration,
514 ) -> Result<(), SendTimeoutError<F::Item>> {
515 self.0.send_timeout(item, timeout)
516 }
517
518 #[inline(always)]
520 fn len(&self) -> usize {
521 self.as_ref().len()
522 }
523
524 #[inline(always)]
526 fn capacity(&self) -> Option<usize> {
527 self.as_ref().capacity()
528 }
529
530 #[inline(always)]
532 fn is_empty(&self) -> bool {
533 self.as_ref().is_empty()
534 }
535
536 #[inline(always)]
538 fn is_full(&self) -> bool {
539 self.as_ref().is_full()
540 }
541
542 #[inline(always)]
544 fn is_disconnected(&self) -> bool {
545 self.as_ref().get_rx_count() == 0
546 }
547
548 #[inline(always)]
549 fn get_tx_count(&self) -> usize {
550 self.as_ref().get_tx_count()
551 }
552
553 #[inline(always)]
554 fn get_rx_count(&self) -> usize {
555 self.as_ref().get_rx_count()
556 }
557
558 fn get_wakers_count(&self) -> (usize, usize) {
559 self.as_ref().get_wakers_count()
560 }
561}
562
563impl<F: Flavor> Deref for Tx<F> {
564 type Target = ChannelShared<F>;
565 #[inline(always)]
566 fn deref(&self) -> &ChannelShared<F> {
567 &self.shared
568 }
569}
570
571impl<F: Flavor> AsRef<ChannelShared<F>> for Tx<F> {
572 #[inline(always)]
573 fn as_ref(&self) -> &ChannelShared<F> {
574 &self.shared
575 }
576}
577
578impl<F: Flavor> AsRef<ChannelShared<F>> for MTx<F> {
579 #[inline(always)]
580 fn as_ref(&self) -> &ChannelShared<F> {
581 &self.0.shared
582 }
583}
584
585impl<T: Send + Unpin + 'static, F: Flavor<Item = T>> SenderType for Tx<F> {
586 type Flavor = F;
587 #[inline(always)]
588 fn new(shared: Arc<ChannelShared<F>>) -> Self {
589 Self::new(shared)
590 }
591}
592
593impl<F: Flavor> NotCloneable for Tx<F> {}
594
595impl<T: Send + Unpin + 'static, F: Flavor<Item = T> + FlavorMP> SenderType for MTx<F> {
596 type Flavor = F;
597 #[inline(always)]
598 fn new(shared: Arc<ChannelShared<F>>) -> Self {
599 MTx::new(shared)
600 }
601}