1#![allow(unused_assignments)]
2
3use std::pin::Pin;
4use std::alloc::Global;
5use std::cell::UnsafeCell;
6use std::io::{Error, ErrorKind};
7use std::task::{Poll, Context, Waker};
8use std::sync::{Arc, atomic::{AtomicBool, AtomicU8, Ordering}};
9
10use futures::{stream::{Stream, FusedStream},
11 sink::Sink};
12use crossbeam_queue::ArrayQueue;
13
14use pi_async_rt::lock::spin;
15
16pub type BoxSender<'a, T, E = Error> = Pin<Box<dyn AsyncSender<T, E> + Send + 'a, Global>>;
21
22pub trait AsyncSender<T, Err = Error>: Sink<T, Error = Err> {
26 fn current_len(&self) -> Option<usize> {
28 None
29 }
30}
31
32pub trait AsyncSenderExt<T>: AsyncSender<T> {
36 fn pin_boxed<'a>(self) -> BoxSender<'a, T>
38 where Self: Sized + Send + 'a {
39 Box::pin(self)
40 }
41}
42
43impl<T, U> AsyncSenderExt<U> for T where T: AsyncSender<U> {}
44
45pub struct PipeSender<T> {
46 is_terminated: Arc<AtomicBool>, queue: Arc<ArrayQueue<T>>, waker: Arc<UnsafeCell<Option<Waker>>>, status: Arc<AtomicU8>, }
51
52unsafe impl<T> Send for PipeSender<T> {}
53unsafe impl<T> Sync for PipeSender<T> {}
54
55impl<T> Sink<T> for PipeSender<T> {
56 type Error = Error;
57
58 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60 if self
61 .is_terminated
62 .load(Ordering::Acquire) {
63 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll ready failed, reason: pipeline already terminated"))));
65 }
66
67 if self.queue.is_full() {
68 let mut spin_len = 1;
70 loop {
71 match self.status.compare_exchange(0,
72 1,
73 Ordering::AcqRel,
74 Ordering::Relaxed) {
75 Err(current) => {
76 if spin_len >= 3 {
78 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll ready failed, current: {}, reason: pipeline busy", current))));
80 }
81
82 spin_len = spin(spin_len);
84 continue;
85 },
86 Ok(_) => {
87 unsafe {
89 *self.waker.get() = Some(cx.waker().clone()); }
91 self.status.store(3, Ordering::Release); return Poll::Pending;
94 },
95 }
96 }
97 } else {
98 Poll::Ready(Ok(()))
101 }
102 }
103
104 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
106 if self
107 .is_terminated
108 .load(Ordering::Acquire) {
109 return Err(Error::new(ErrorKind::BrokenPipe, format!("Start send failed, reason: pipeline already terminated")));
111 }
112
113 if let Err(_) = self.queue.push(item) {
114 return Err(Error::new(ErrorKind::WouldBlock, format!("Start send failed, reason: pipeline already full")));
116 }
117
118 Ok(())
119 }
120
121 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123 if self
124 .is_terminated
125 .load(Ordering::Acquire) {
126 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll flush failed, reason: pipelinne already terminated"))));
128 }
129
130 let mut spin_len = 1;
131 loop {
132 match self.status.compare_exchange(0,
133 1,
134 Ordering::AcqRel,
135 Ordering::Relaxed) {
136 Err(1) => {
137 if spin_len >= 3 {
139 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll flush failed, current: 1, reason: pipeline busy"))));
141 }
142
143 spin_len = spin(spin_len);
145 continue;
146 },
147 Err(2) => {
148 unsafe {
150 if let Some(waker) = (*self.waker.get()).take() {
151 waker.wake();
153 self.status.store(0, Ordering::Release); }
155 }
156
157 return Poll::Ready(Ok(()));
158 },
159 Err(3) => {
160 return Poll::Ready(Ok(()));
162 },
163 _ => {
164 self.status.store(0, Ordering::Release); return Poll::Ready(Ok(()));
167 },
168 }
169 }
170 }
171
172 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174 if self
175 .is_terminated
176 .load(Ordering::Acquire) {
177 return Poll::Ready(Ok(()));
179 }
180
181 let mut spin_len = 1;
182 loop {
183 match self.status.compare_exchange(0,
184 1,
185 Ordering::AcqRel,
186 Ordering::Relaxed) {
187 Err(1) => {
188 if spin_len >= 3 {
190 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll close failed, current: 1, reason: pipeline busy"))));
192 }
193
194 spin_len = spin(spin_len);
196 continue;
197 },
198 Err(_) => {
199 let _ = self
202 .is_terminated
203 .compare_exchange(false,
204 true,
205 Ordering::AcqRel,
206 Ordering::Relaxed);
207
208 unsafe {
209 if let Some(waker) = (*self.waker.get()).take() {
210 waker.wake();
212 self.status.store(0, Ordering::Release); }
214 }
215
216 return Poll::Ready(Ok(()));
217 },
218 Ok(_) => {
219 let _ = self
221 .is_terminated
222 .compare_exchange(false,
223 true,
224 Ordering::AcqRel,
225 Ordering::Relaxed);
226 self.status.store(0, Ordering::Release); return Poll::Ready(Ok(()));
229 },
230 }
231 }
232 }
233}
234
235impl<T> AsyncSender<T> for PipeSender<T> {
236 fn current_len(&self) -> Option<usize> {
237 Some(self.queue.len())
238 }
239}
240
241pub type BoxReceiver<'a, T> = Pin<Box<dyn AsyncReceiver<T> + Send + 'a, Global>>;
246
247pub trait AsyncReceiver<T>: Stream<Item = T> + FusedStream {
251 fn current_len(&self) -> Option<usize> {
253 None
254 }
255}
256
257pub trait AsyncReceiverExt<T>: AsyncReceiver<T> {
261 fn pin_boxed<'a>(self) -> BoxReceiver<'a, T>
263 where Self: Sized + Send + 'a {
264 Box::pin(self)
265 }
266}
267
268impl<T, U> AsyncReceiverExt<U> for T where T: AsyncReceiver<U> {}
269
270pub struct PipeReceiver<T> {
271 is_terminated: Arc<AtomicBool>, queue: Arc<ArrayQueue<T>>, waker: Arc<UnsafeCell<Option<Waker>>>, status: Arc<AtomicU8>, }
276
277unsafe impl<T> Send for PipeReceiver<T> {}
278unsafe impl<T> Sync for PipeReceiver<T> {}
279
280impl<T> Stream for PipeReceiver<T> {
281 type Item = T;
282
283 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
284 if self.is_terminated() {
285 return Poll::Ready(None);
287 }
288
289 let mut spin_len = 1;
290 loop {
291 if let Some(frame) = self.queue.pop() {
292 return Poll::Ready(Some(frame));
294 } else {
295 match self.status.compare_exchange(0,
297 1,
298 Ordering::AcqRel,
299 Ordering::Relaxed) {
300 Err(3) => {
301 unsafe {
303 if let Some(waker) = (*self.waker.get()).take() {
304 waker.wake();
306 self.status.store(0, Ordering::Release); }
308 }
309 continue;
310 },
311 Err(_) => {
312 spin_len = spin(spin_len);
314 continue;
315 },
316 Ok(_) => {
317 unsafe {
319 *self.waker.get() = Some(cx.waker().clone()); }
321 self.status.store(2, Ordering::Release); return Poll::Pending;
324 },
325 }
326 }
327 }
328 }
329}
330
331impl<T> FusedStream for PipeReceiver<T> {
332 #[inline]
334 fn is_terminated(&self) -> bool {
335 if self
336 .is_terminated
337 .load(Ordering::Acquire) {
338 self.queue.is_empty()
340 } else {
341 false
343 }
344 }
345}
346
347impl<T> AsyncReceiver<T> for PipeReceiver<T> {
348 fn current_len(&self) -> Option<usize> {
349 Some(self.queue.len())
350 }
351}
352
353pub fn channel<T>(capacity: usize) -> (PipeSender<T>, PipeReceiver<T>) {
357 let is_terminated = Arc::new(AtomicBool::new(false));
358 let queue = Arc::new(ArrayQueue::new(capacity));
359 let waker = Arc::new(UnsafeCell::new(None));
360 let status = Arc::new(AtomicU8::new(0));
361
362 let sender = PipeSender {
363 is_terminated: is_terminated.clone(),
364 queue: queue.clone(),
365 waker: waker.clone(),
366 status: status.clone(),
367 };
368
369 let receiver = PipeReceiver {
370 is_terminated,
371 queue,
372 waker,
373 status,
374 };
375
376 (sender, receiver)
377}
378
379pub type BoxPipeline<'a, T, U = T, E = Error> = Pin<Box<dyn AsyncPipeLine<T, U, E> + Send + 'a, Global>>;
384
385pub trait AsyncPipeLine<
389 StreamFrame,
390 SinkFrame = StreamFrame,
391 Err = Error
392>: Sink<SinkFrame, Error = Err> + Stream<Item = StreamFrame> + FusedStream {}
393
394pub trait AsyncPipeLineExt<
398 StreamFrame,
399 SinkFrame = StreamFrame,
400 Err = Error
401>: AsyncPipeLine<StreamFrame, SinkFrame, Err> {
402 fn pin_boxed<'a>(self) -> BoxPipeline<'a, StreamFrame, SinkFrame, Err>
404 where Self: Sized + Send + 'a {
405 Box::pin(self)
406 }
407}
408
409impl<
410 StreamFrame,
411 SinkFrame,
412 Err,
413 T: ?Sized,
414> AsyncPipeLineExt<StreamFrame, SinkFrame, Err> for T where T: AsyncPipeLine<StreamFrame, SinkFrame, Err> {}
415
416pub fn pipeline<T, U>(capacity: usize) -> (AsyncDownStream<T, U>, AsyncUpStream<U, T>) {
420 let is_terminated_down_stream = Arc::new(AtomicBool::new(false));
421 let down_stream = Arc::new(ArrayQueue::new(capacity));
422 let down_stream_waker = Arc::new(UnsafeCell::new(None));
423 let down_stream_status = Arc::new(AtomicU8::new(0));
424 let is_terminated_down_sink = Arc::new(AtomicBool::new(false));
425 let down_sink = Arc::new(ArrayQueue::new(capacity));
426 let down_sink_waker = Arc::new(UnsafeCell::new(None));
427 let down_sink_status = Arc::new(AtomicU8::new(0));
428 let down_inner = InnerAsyncFlow {
429 is_terminated_stream: is_terminated_down_stream.clone(),
430 stream: down_stream.clone(),
431 stream_waker: down_stream_waker.clone(),
432 stream_status: down_stream_status.clone(),
433 is_terminated_sink: is_terminated_down_sink.clone(),
434 sink: down_sink.clone(),
435 sink_waker: down_sink_waker.clone(),
436 sink_status: down_sink_status.clone(),
437 };
438 let async_down_stream = AsyncDownStream(Arc::new(down_inner));
439
440 let is_terminated_stream = is_terminated_down_sink;
441 let up_stream = down_sink;
442 let stream_waker = down_sink_waker;
443 let stream_status = down_sink_status;
444 let is_terminated_sink = is_terminated_down_stream;
445 let up_sink = down_stream;
446 let sink_waker = down_stream_waker;
447 let sink_status = down_stream_status;
448 let up_inner = InnerAsyncFlow {
449 is_terminated_stream,
450 stream: up_stream,
451 stream_waker,
452 stream_status,
453 is_terminated_sink,
454 sink: up_sink,
455 sink_waker,
456 sink_status,
457 };
458 let async_up_stream = AsyncUpStream(Arc::new(up_inner));
459
460 (async_down_stream, async_up_stream)
461}
462
463pub struct AsyncDownStream<T, U = T>(Arc<InnerAsyncFlow<T, U>>);
467
468unsafe impl<T, U> Send for AsyncDownStream<T, U> {}
469unsafe impl<T, U> Sync for AsyncDownStream<T, U> {}
470
471impl<T, U> Clone for AsyncDownStream<T, U> {
472 fn clone(&self) -> Self {
473 AsyncDownStream(self.0.clone())
474 }
475}
476
477impl<T, U> Stream for AsyncDownStream<T, U> {
478 type Item = T;
479
480 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
481 if self.is_terminated() {
482 return Poll::Ready(None);
484 }
485
486 let mut spin_len = 1;
487 loop {
488 if let Some(frame) = self.0.stream.pop() {
489 return Poll::Ready(Some(frame));
491 } else {
492 match self.0.stream_status.compare_exchange(0,
494 1,
495 Ordering::AcqRel,
496 Ordering::Relaxed) {
497 Err(3) => {
498 unsafe {
500 if let Some(waker) = (*self.0.stream_waker.get()).take() {
501 waker.wake();
503 self.0.stream_status.store(0, Ordering::Release); }
505 }
506 continue;
507 },
508 Err(_) => {
509 spin_len = spin(spin_len);
511 continue;
512 },
513 Ok(_) => {
514 unsafe {
516 *self.0.stream_waker.get() = Some(cx.waker().clone()); }
518 self.0.stream_status.store(2, Ordering::Release); return Poll::Pending;
521 },
522 }
523 }
524 }
525 }
526}
527
528impl<T, U> FusedStream for AsyncDownStream<T, U> {
529 #[inline]
531 fn is_terminated(&self) -> bool {
532 if self.
533 0
534 .is_terminated_stream
535 .load(Ordering::Acquire) {
536 self.0.stream.is_empty()
538 } else {
539 false
541 }
542 }
543}
544
545impl<T, U> Sink<U> for AsyncDownStream<T, U> {
546 type Error = Error;
547
548 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
550 if self
551 .0
552 .is_terminated_sink
553 .load(Ordering::Acquire) {
554 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll ready failed, reason: sink already terminated"))));
556 }
557
558 if self.0.sink.is_full() {
559 let mut spin_len = 1;
561 loop {
562 match self.0.sink_status.compare_exchange(0,
563 1,
564 Ordering::AcqRel,
565 Ordering::Relaxed) {
566 Err(current) => {
567 if spin_len >= 3 {
569 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll ready failed, current: {}, reason: sink or peer busy", current))));
571 }
572
573 spin_len = spin(spin_len);
575 continue;
576 },
577 Ok(_) => {
578 unsafe {
580 *self.0.sink_waker.get() = Some(cx.waker().clone()); }
582 self.0.sink_status.store(3, Ordering::Release); return Poll::Pending;
585 },
586 }
587 }
588 } else {
589 Poll::Ready(Ok(()))
592 }
593 }
594
595 fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> {
597 if self
598 .0
599 .is_terminated_sink
600 .load(Ordering::Acquire) {
601 return Err(Error::new(ErrorKind::BrokenPipe, format!("Start send failed, reason: sink already terminated")));
603 }
604
605 if let Err(_) = self.0.sink.push(item) {
606 return Err(Error::new(ErrorKind::WouldBlock, format!("Start send failed, reason: buffer already full")));
608 }
609
610 Ok(())
611 }
612
613 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
615 if self
616 .0
617 .is_terminated_sink
618 .load(Ordering::Acquire) {
619 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll flush failed, reason: sink already terminated"))));
621 }
622
623 let mut spin_len = 1;
624 loop {
625 match self.0.sink_status.compare_exchange(0,
626 1,
627 Ordering::AcqRel,
628 Ordering::Relaxed) {
629 Err(1) => {
630 if spin_len >= 3 {
632 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll flush failed, current: 1, reason: sink or peer busy"))));
634 }
635
636 spin_len = spin(spin_len);
638 continue;
639 },
640 Err(2) => {
641 unsafe {
643 if let Some(waker) = (*self.0.sink_waker.get()).take() {
644 waker.wake();
646 self.0.sink_status.store(0, Ordering::Release); }
648 }
649
650 return Poll::Ready(Ok(()));
651 },
652 Err(3) => {
653 return Poll::Ready(Ok(()));
655 },
656 _ => {
657 self.0.sink_status.store(0, Ordering::Release); return Poll::Ready(Ok(()));
660 },
661 }
662 }
663 }
664
665 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
667 if self
668 .0
669 .is_terminated_sink
670 .load(Ordering::Acquire) {
671 return Poll::Ready(Ok(()));
673 }
674
675 let mut spin_len = 1;
676 loop {
677 match self.0.sink_status.compare_exchange(0,
678 1,
679 Ordering::AcqRel,
680 Ordering::Relaxed) {
681 Err(1) => {
682 if spin_len >= 3 {
684 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll close failed, current: 1, reason: sink or peer busy"))));
686 }
687
688 spin_len = spin(spin_len);
690 continue;
691 },
692 Err(_) => {
693 let _ = self
696 .0
697 .is_terminated_sink
698 .compare_exchange(false,
699 true,
700 Ordering::AcqRel,
701 Ordering::Relaxed);
702
703 unsafe {
704 if let Some(waker) = (*self.0.sink_waker.get()).take() {
705 waker.wake();
707 self.0.sink_status.store(0, Ordering::Release); }
709 }
710
711 return Poll::Ready(Ok(()));
712 },
713 Ok(_) => {
714 let _ = self
716 .0
717 .is_terminated_sink
718 .compare_exchange(false,
719 true,
720 Ordering::AcqRel,
721 Ordering::Relaxed);
722 self.0.sink_status.store(0, Ordering::Release); return Poll::Ready(Ok(()));
725 },
726 }
727 }
728 }
729}
730
731impl<T, U> AsyncPipeLine<T, U> for AsyncDownStream<T, U> {}
732
733pub struct AsyncUpStream<U, T = U>(Arc<InnerAsyncFlow<U, T>>);
737
738unsafe impl<U, T> Send for AsyncUpStream<U, T> {}
739unsafe impl<U, T> Sync for AsyncUpStream<U, T> {}
740
741impl<U, T> Clone for AsyncUpStream<U, T> {
742 fn clone(&self) -> Self {
743 AsyncUpStream(self.0.clone())
744 }
745}
746
747impl<U, T> Stream for AsyncUpStream<U, T> {
748 type Item = U;
749
750 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
751 if self.is_terminated() {
752 return Poll::Ready(None);
754 }
755
756 let mut spin_len = 1;
757 loop {
758 if let Some(frame) = self.0.stream.pop() {
759 return Poll::Ready(Some(frame));
761 } else {
762 match self.0.stream_status.compare_exchange(0,
764 1,
765 Ordering::AcqRel,
766 Ordering::Relaxed) {
767 Err(3) => {
768 unsafe {
770 if let Some(waker) = (*self.0.stream_waker.get()).take() {
771 waker.wake();
773 self.0.stream_status.store(0, Ordering::Release); }
775 }
776 continue;
777 },
778 Err(_) => {
779 spin_len = spin(spin_len);
781 continue;
782 },
783 Ok(_) => {
784 unsafe {
786 *self.0.stream_waker.get() = Some(cx.waker().clone()); }
788 self.0.stream_status.store(2, Ordering::Release); return Poll::Pending;
791 },
792 }
793 }
794 }
795 }
796}
797
798impl<T, U> FusedStream for AsyncUpStream<U, T> {
799 #[inline]
801 fn is_terminated(&self) -> bool {
802 if self.
803 0
804 .is_terminated_stream
805 .load(Ordering::Acquire) {
806 self.0.stream.is_empty()
808 } else {
809 false
811 }
812 }
813}
814
815impl<U, T> Sink<T> for AsyncUpStream<U, T> {
816 type Error = Error;
817
818 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
820 if self
821 .0
822 .is_terminated_sink
823 .load(Ordering::Acquire) {
824 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll ready failed, reason: sink already terminated"))));
826 }
827
828 if self.0.sink.is_full() {
829 let mut spin_len = 1;
831 loop {
832 match self.0.sink_status.compare_exchange(0,
833 1,
834 Ordering::AcqRel,
835 Ordering::Relaxed) {
836 Err(current) => {
837 if spin_len >= 3 {
839 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll ready failed, current: {}, reason: sink or peer busy", current))));
841 }
842
843 spin_len = spin(spin_len);
845 continue;
846 },
847 Ok(_) => {
848 unsafe {
850 *self.0.sink_waker.get() = Some(cx.waker().clone()); }
852 self.0.sink_status.store(3, Ordering::Release); return Poll::Pending;
855 },
856 }
857 }
858 } else {
859 Poll::Ready(Ok(()))
862 }
863 }
864
865 fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
867 if self
868 .0
869 .is_terminated_sink
870 .load(Ordering::Acquire) {
871 return Err(Error::new(ErrorKind::BrokenPipe, format!("Start send failed, reason: sink already terminated")));
873 }
874
875 if let Err(_) = self.0.sink.push(item) {
876 return Err(Error::new(ErrorKind::WouldBlock, format!("Start send failed, reason: buffer already full")));
878 }
879
880 Ok(())
881 }
882
883 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
885 if self
886 .0
887 .is_terminated_sink
888 .load(Ordering::Acquire) {
889 return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll flush failed, reason: sink already terminated"))));
891 }
892
893 let mut spin_len = 1;
894 loop {
895 match self.0.sink_status.compare_exchange(0,
896 1,
897 Ordering::AcqRel,
898 Ordering::Relaxed) {
899 Err(1) => {
900 if spin_len >= 3 {
902 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll flush failed, current: 1, reason: sink or peer busy"))));
904 }
905
906 spin_len = spin(spin_len);
908 continue;
909 },
910 Err(2) => {
911 unsafe {
913 if let Some(waker) = (*self.0.sink_waker.get()).take() {
914 waker.wake();
916 self.0.sink_status.store(0, Ordering::Release); }
918 }
919
920 return Poll::Ready(Ok(()));
921 },
922 Err(3) => {
923 return Poll::Ready(Ok(()));
925 },
926 _ => {
927 self.0.sink_status.store(0, Ordering::Release); return Poll::Ready(Ok(()));
930 },
931 }
932 }
933 }
934
935 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
937 if self
938 .0
939 .is_terminated_sink
940 .load(Ordering::Acquire) {
941 return Poll::Ready(Ok(()));
943 }
944
945 let mut spin_len = 1;
946 loop {
947 match self.0.sink_status.compare_exchange(0,
948 1,
949 Ordering::AcqRel,
950 Ordering::Relaxed) {
951 Err(1) => {
952 if spin_len >= 3 {
954 return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll close failed, current: 1, reason: sink or peer busy"))));
956 }
957
958 spin_len = spin(spin_len);
960 continue;
961 },
962 Err(_) => {
963 let _ = self
966 .0
967 .is_terminated_sink
968 .compare_exchange(false,
969 true,
970 Ordering::AcqRel,
971 Ordering::Relaxed);
972
973 unsafe {
974 if let Some(waker) = (*self.0.sink_waker.get()).take() {
975 waker.wake();
977 self.0.sink_status.store(0, Ordering::Release); }
979 }
980
981 return Poll::Ready(Ok(()));
982 },
983 Ok(_) => {
984 let _ = self
986 .0
987 .is_terminated_sink
988 .compare_exchange(false,
989 true,
990 Ordering::AcqRel,
991 Ordering::Relaxed);
992 self.0.sink_status.store(0, Ordering::Release); return Poll::Ready(Ok(()));
995 },
996 }
997 }
998 }
999}
1000
1001impl<U, T> AsyncPipeLine<U, T> for AsyncUpStream<U, T> {}
1002
1003struct InnerAsyncFlow<X, Y = X> {
1005 is_terminated_stream: Arc<AtomicBool>, stream: Arc<ArrayQueue<X>>, stream_waker: Arc<UnsafeCell<Option<Waker>>>, stream_status: Arc<AtomicU8>, is_terminated_sink: Arc<AtomicBool>, sink: Arc<ArrayQueue<Y>>, sink_waker: Arc<UnsafeCell<Option<Waker>>>, sink_status: Arc<AtomicU8>, }
1014