pi_async_buffer/
async_pipeline.rs

1#![allow(unused_assignments)]
2
3use std::pin::Pin;
4use std::alloc::Global;
5use std::cell::UnsafeCell;
6use std::io::{Error, ErrorKind};
7use std::task::{Poll, Context, Waker};
8use std::sync::{Arc, atomic::{AtomicBool, AtomicU8, Ordering}};
9
10use futures::{stream::{Stream, FusedStream},
11              sink::Sink};
12use crossbeam_queue::ArrayQueue;
13
14use pi_async_rt::lock::spin;
15
16///
17/// 允许跨线程的动态异步发送器
18/// 动态异步发送器的发送不要求线程安全,所以如果动态异步发送器同时在多个线程中发送,则需要由外部保证动态异步发送器的线程安全
19///
20pub type BoxSender<'a, T, E = Error> = Pin<Box<dyn AsyncSender<T, E> + Send + 'a, Global>>;
21
22///
23/// 异步发送器
24///
25pub trait AsyncSender<T, Err = Error>: Sink<T, Error = Err> {
26    /// 获取已发送未接收的队列长度
27    fn current_len(&self) -> Option<usize> {
28        None
29    }
30}
31
32///
33/// 扩展的异步发送器
34///
35pub trait AsyncSenderExt<T>: AsyncSender<T> {
36    /// 封装AsyncSender为Box,并将它固定
37    fn pin_boxed<'a>(self) -> BoxSender<'a, T>
38        where Self: Sized + Send + 'a {
39        Box::pin(self)
40    }
41}
42
43impl<T, U> AsyncSenderExt<U> for T where T: AsyncSender<U> {}
44
45pub struct PipeSender<T> {
46    is_terminated:  Arc<AtomicBool>,                //是否已终止管道
47    queue:          Arc<ArrayQueue<T>>,             //发送队列
48    waker:          Arc<UnsafeCell<Option<Waker>>>, //发送器的唤醒器
49    status:         Arc<AtomicU8>,                  //发送器的状态
50}
51
52unsafe impl<T> Send for PipeSender<T> {}
53unsafe impl<T> Sync for PipeSender<T> {}
54
55impl<T> Sink<T> for PipeSender<T> {
56    type Error = Error;
57
58    // 用户检查写流是否已满,如果当前写流已满,则异步等待对端读流消耗了写流的帧以后,再唤醒异步等待
59    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
60        if self
61            .is_terminated
62            .load(Ordering::Acquire) {
63            //当前的写流已终止
64            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll ready failed, reason: pipeline already terminated"))));
65        }
66
67        if self.queue.is_full() {
68            //当前的写流已满
69            let mut spin_len = 1;
70            loop {
71                match self.status.compare_exchange(0,
72                                                   1,
73                                                   Ordering::AcqRel,
74                                                   Ordering::Relaxed) {
75                    Err(current) => {
76                        //对端读流或当前写流已经在异步等待唤醒,或者已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
77                        if spin_len >= 3 {
78                            //内部重试次数已达限制,则立即返回失败
79                            return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll ready failed, current: {}, reason: pipeline busy", current))));
80                        }
81
82                        //内部重试次数未达限制,则休眠后继续内部重试
83                        spin_len = spin(spin_len);
84                        continue;
85                    },
86                    Ok(_) => {
87                        //对端读流或当前写流未异步等待唤醒,且已被当前调用锁住
88                        unsafe {
89                            *self.waker.get() = Some(cx.waker().clone()); //设置写流的唤醒器
90                        }
91                        self.status.store(3, Ordering::Release); //设置等待写流未满的唤醒器状态为已就绪
92
93                        return Poll::Pending;
94                    },
95                }
96            }
97        } else {
98            //当前的写流未满,则立即返回写流已就绪
99            //对端读流或当前写流即使已经在异步等待唤醒,也不会唤醒正在等待唤醒的对端读流或当前写流
100            Poll::Ready(Ok(()))
101        }
102    }
103
104    // 同步向写流写入帧,外部调用必须保证在每次调用start_send前调用poll_ready且返回成功
105    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
106        if self
107            .is_terminated
108            .load(Ordering::Acquire) {
109            //当前的写流已终止
110            return Err(Error::new(ErrorKind::BrokenPipe, format!("Start send failed, reason: pipeline already terminated")));
111        }
112
113        if let Err(_) = self.queue.push(item) {
114            //写流已满,则立即返回失败,但允许重试
115            return Err(Error::new(ErrorKind::WouldBlock, format!("Start send failed, reason: pipeline already full")));
116        }
117
118        Ok(())
119    }
120
121    // 刷新只会唤醒对端读流的异步等待,即通知对端读流有新帧,刷新不会异步等待唤醒
122    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123        if self
124            .is_terminated
125            .load(Ordering::Acquire) {
126            //当前的写流已终止
127            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll flush failed, reason: pipelinne already terminated"))));
128        }
129
130        let mut spin_len = 1;
131        loop {
132            match self.status.compare_exchange(0,
133                                               1,
134                                               Ordering::AcqRel,
135                                               Ordering::Relaxed) {
136                Err(1) => {
137                    //已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
138                    if spin_len >= 3 {
139                        //内部重试次数已达限制,则立即返回失败
140                        return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll flush failed, current: 1, reason: pipeline busy"))));
141                    }
142
143                    //内部重试次数未达限制,则休眠后继续内部重试
144                    spin_len = spin(spin_len);
145                    continue;
146                },
147                Err(2) => {
148                    //对端读流已经在异步等待唤醒,则立即唤醒对端的读流,并返回刷新成功
149                    unsafe {
150                        if let Some(waker) = (*self.waker.get()).take() {
151                            //对端设置的唤醒器存在,则立即唤醒对端的读流
152                            waker.wake();
153                            self.status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
154                        }
155                    }
156
157                    return Poll::Ready(Ok(()));
158                },
159                Err(3) => {
160                    //当前写流已经在异步等待唤醒,则忽略刷新,并立即返回刷新成功
161                    return Poll::Ready(Ok(()));
162                },
163                _ => {
164                    //已被当前调用锁住,则忽略刷新,并立即返回刷新成功
165                    self.status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
166                    return Poll::Ready(Ok(()));
167                },
168            }
169        }
170    }
171
172    // 终止当前写流,在终止前会尝试刷新一次写流,并唤醒对端读流或当前写流的任何异步等待
173    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
174        if self
175            .is_terminated
176            .load(Ordering::Acquire) {
177            //当前的写流已终止
178            return Poll::Ready(Ok(()));
179        }
180
181        let mut spin_len = 1;
182        loop {
183            match self.status.compare_exchange(0,
184                                               1,
185                                               Ordering::AcqRel,
186                                               Ordering::Relaxed) {
187                Err(1) => {
188                    //已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
189                    if spin_len >= 3 {
190                        //内部重试次数已达限制,则立即返回失败
191                        return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll close failed, current: 1, reason: pipeline busy"))));
192                    }
193
194                    //内部重试次数未达限制,则休眠后继续内部重试
195                    spin_len = spin(spin_len);
196                    continue;
197                },
198                Err(_) => {
199                    //对端读流或当前写流已经在异步等待唤醒,则立即终止对端读流和当前写流,然后唤醒对端读流或当前写流,并立即返回成功
200                    //对端需要从对端读流获取所有剩余帧后,再终止对端读流
201                    let _ = self
202                        .is_terminated
203                        .compare_exchange(false,
204                                          true,
205                                          Ordering::AcqRel,
206                                          Ordering::Relaxed);
207
208                    unsafe {
209                        if let Some(waker) = (*self.waker.get()).take() {
210                            //对端设置的唤醒器存在,则立即唤醒对端的读流
211                            waker.wake();
212                            self.status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
213                        }
214                    }
215
216                    return Poll::Ready(Ok(()));
217                },
218                Ok(_) => {
219                    //已被当前调用锁住,则立即终止对端读流和当前写流,并立即返回成功
220                    let _ = self
221                        .is_terminated
222                        .compare_exchange(false,
223                                          true,
224                                          Ordering::AcqRel,
225                                          Ordering::Relaxed);
226                    self.status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
227
228                    return Poll::Ready(Ok(()));
229                },
230            }
231        }
232    }
233}
234
235impl<T> AsyncSender<T> for PipeSender<T> {
236    fn current_len(&self) -> Option<usize> {
237        Some(self.queue.len())
238    }
239}
240
241///
242/// 允许跨线程的动态异步接收器
243/// 动态异步接收器的接收不要求线程安全,所以如果动态异步接收器同时在多个线程中接收,则需要由外部保证动态异步接收器的线程安全
244///
245pub type BoxReceiver<'a, T> = Pin<Box<dyn AsyncReceiver<T> + Send + 'a, Global>>;
246
247///
248/// 异步接收器
249///
250pub trait AsyncReceiver<T>: Stream<Item = T> + FusedStream {
251    /// 获取已发送未接收的队列长度
252    fn current_len(&self) -> Option<usize> {
253        None
254    }
255}
256
257///
258/// 扩展的异步接收器
259///
260pub trait AsyncReceiverExt<T>: AsyncReceiver<T> {
261    /// 封装AsyncReceiver为Box,并将它固定
262    fn pin_boxed<'a>(self) -> BoxReceiver<'a, T>
263        where Self: Sized + Send + 'a {
264        Box::pin(self)
265    }
266}
267
268impl<T, U> AsyncReceiverExt<U> for T where T: AsyncReceiver<U> {}
269
270pub struct PipeReceiver<T> {
271    is_terminated:  Arc<AtomicBool>,                //是否已终止管道
272    queue:          Arc<ArrayQueue<T>>,             //接收队列
273    waker:          Arc<UnsafeCell<Option<Waker>>>, //接收器的唤醒器
274    status:         Arc<AtomicU8>,                  //接收器的状态
275}
276
277unsafe impl<T> Send for PipeReceiver<T> {}
278unsafe impl<T> Sync for PipeReceiver<T> {}
279
280impl<T> Stream for PipeReceiver<T> {
281    type Item = T;
282
283    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
284        if self.is_terminated() {
285            //当前的接收器已终止
286            return Poll::Ready(None);
287        }
288
289        let mut spin_len = 1;
290        loop {
291            if let Some(frame) = self.queue.pop() {
292                //当前的读流有数据
293                return Poll::Ready(Some(frame));
294            } else {
295                //当前的读流无数据
296                match self.status.compare_exchange(0,
297                                                   1,
298                                                   Ordering::AcqRel,
299                                                   Ordering::Relaxed) {
300                    Err(3) => {
301                        //对端写流已经在异步等待唤醒,则立即唤醒对端写流,并继续内部重试
302                        unsafe {
303                            if let Some(waker) = (*self.waker.get()).take() {
304                                //对端设置的唤醒器存在,则立即唤醒对端的写流
305                                waker.wake();
306                                self.status.store(0, Ordering::Release); //设置对端写流和当前读流的状态为已初始化
307                            }
308                        }
309                        continue;
310                    },
311                    Err(_) => {
312                        //当前读流已经在异步等待唤醒,或者已被非当前调用锁住,则休眠后继续内部重试
313                        spin_len = spin(spin_len);
314                        continue;
315                    },
316                    Ok(_) => {
317                        //对端写流或当前读流未异步等待唤醒,且已被当前调用锁住
318                        unsafe {
319                            *self.waker.get() = Some(cx.waker().clone()); //设置读流的唤醒器
320                        }
321                        self.status.store(2, Ordering::Release); //设置对端写流和当前读流的唤醒器状态为已就绪
322
323                        return Poll::Pending;
324                    },
325                }
326            }
327        }
328    }
329}
330
331impl<T> FusedStream for PipeReceiver<T> {
332    // 判断接收器是否真的已终止
333    #[inline]
334    fn is_terminated(&self) -> bool {
335        if self
336            .is_terminated
337            .load(Ordering::Acquire) {
338            //读流已终止,则继续判断当前读流是否为空
339            self.queue.is_empty()
340        } else {
341            //读流未终止
342            false
343        }
344    }
345}
346
347impl<T> AsyncReceiver<T> for PipeReceiver<T> {
348    fn current_len(&self) -> Option<usize> {
349        Some(self.queue.len())
350    }
351}
352
353///
354/// 创建一对指定容量的异步通道
355///
356pub fn channel<T>(capacity: usize) -> (PipeSender<T>, PipeReceiver<T>) {
357    let is_terminated = Arc::new(AtomicBool::new(false));
358    let queue = Arc::new(ArrayQueue::new(capacity));
359    let waker = Arc::new(UnsafeCell::new(None));
360    let status = Arc::new(AtomicU8::new(0));
361
362    let sender = PipeSender {
363        is_terminated: is_terminated.clone(),
364        queue: queue.clone(),
365        waker: waker.clone(),
366        status: status.clone(),
367    };
368
369    let receiver = PipeReceiver {
370        is_terminated,
371        queue,
372        waker,
373        status,
374    };
375
376    (sender, receiver)
377}
378
379///
380/// 允许跨线程的动态异步管道
381/// 动态异步管道的获取和写入不要求线程安全,所以如果动态异步管道同时在多个线程中处理获取和写入,则需要由外部保证动态异步管道的线程安全
382///
383pub type BoxPipeline<'a, T, U = T, E = Error> = Pin<Box<dyn AsyncPipeLine<T, U, E> + Send + 'a, Global>>;
384
385///
386/// 异步管道,可以从异步管道中获取帧,也可以向异步管道中写入帧
387///
388pub trait AsyncPipeLine<
389    StreamFrame,
390    SinkFrame = StreamFrame,
391    Err = Error
392>: Sink<SinkFrame, Error = Err> + Stream<Item = StreamFrame> + FusedStream {}
393
394///
395/// 扩展的异步管道
396///
397pub trait AsyncPipeLineExt<
398    StreamFrame,
399    SinkFrame = StreamFrame,
400    Err = Error
401>: AsyncPipeLine<StreamFrame, SinkFrame, Err> {
402    /// 封装AsyncPipeLine为Box,并将它固定
403    fn pin_boxed<'a>(self) -> BoxPipeline<'a, StreamFrame, SinkFrame, Err>
404        where Self: Sized + Send + 'a {
405        Box::pin(self)
406    }
407}
408
409impl<
410    StreamFrame,
411    SinkFrame,
412    Err,
413    T: ?Sized,
414> AsyncPipeLineExt<StreamFrame, SinkFrame, Err> for T where T: AsyncPipeLine<StreamFrame, SinkFrame, Err> {}
415
416///
417/// 创建一对指定容量的异步管道
418///
419pub fn pipeline<T, U>(capacity: usize) -> (AsyncDownStream<T, U>, AsyncUpStream<U, T>) {
420    let is_terminated_down_stream = Arc::new(AtomicBool::new(false));
421    let down_stream = Arc::new(ArrayQueue::new(capacity));
422    let down_stream_waker = Arc::new(UnsafeCell::new(None));
423    let down_stream_status = Arc::new(AtomicU8::new(0));
424    let is_terminated_down_sink = Arc::new(AtomicBool::new(false));
425    let down_sink = Arc::new(ArrayQueue::new(capacity));
426    let down_sink_waker = Arc::new(UnsafeCell::new(None));
427    let down_sink_status = Arc::new(AtomicU8::new(0));
428    let down_inner = InnerAsyncFlow {
429        is_terminated_stream: is_terminated_down_stream.clone(),
430        stream: down_stream.clone(),
431        stream_waker: down_stream_waker.clone(),
432        stream_status: down_stream_status.clone(),
433        is_terminated_sink: is_terminated_down_sink.clone(),
434        sink: down_sink.clone(),
435        sink_waker: down_sink_waker.clone(),
436        sink_status: down_sink_status.clone(),
437    };
438    let async_down_stream = AsyncDownStream(Arc::new(down_inner));
439
440    let is_terminated_stream = is_terminated_down_sink;
441    let up_stream = down_sink;
442    let stream_waker = down_sink_waker;
443    let stream_status = down_sink_status;
444    let is_terminated_sink = is_terminated_down_stream;
445    let up_sink = down_stream;
446    let sink_waker = down_stream_waker;
447    let sink_status = down_stream_status;
448    let up_inner = InnerAsyncFlow {
449        is_terminated_stream,
450        stream: up_stream,
451        stream_waker,
452        stream_status,
453        is_terminated_sink,
454        sink: up_sink,
455        sink_waker,
456        sink_status,
457    };
458    let async_up_stream = AsyncUpStream(Arc::new(up_inner));
459
460    (async_down_stream, async_up_stream)
461}
462
463///
464/// 线程安全的异步下游
465///
466pub struct AsyncDownStream<T, U = T>(Arc<InnerAsyncFlow<T, U>>);
467
468unsafe impl<T, U> Send for AsyncDownStream<T, U> {}
469unsafe impl<T, U> Sync for AsyncDownStream<T, U> {}
470
471impl<T, U> Clone for AsyncDownStream<T, U> {
472    fn clone(&self) -> Self {
473        AsyncDownStream(self.0.clone())
474    }
475}
476
477impl<T, U> Stream for AsyncDownStream<T, U> {
478    type Item = T;
479
480    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
481        if self.is_terminated() {
482            //当前的读流已终止
483            return Poll::Ready(None);
484        }
485
486        let mut spin_len = 1;
487        loop {
488            if let Some(frame) = self.0.stream.pop() {
489                //当前的读流有数据
490                return Poll::Ready(Some(frame));
491            } else {
492                //当前的读流无数据
493                match self.0.stream_status.compare_exchange(0,
494                                                            1,
495                                                            Ordering::AcqRel,
496                                                            Ordering::Relaxed) {
497                    Err(3) => {
498                        //对端写流已经在异步等待唤醒,则立即唤醒对端写流,并继续内部重试
499                        unsafe {
500                            if let Some(waker) = (*self.0.stream_waker.get()).take() {
501                                //对端设置的唤醒器存在,则立即唤醒对端的写流
502                                waker.wake();
503                                self.0.stream_status.store(0, Ordering::Release); //设置对端写流和当前读流的状态为已初始化
504                            }
505                        }
506                        continue;
507                    },
508                    Err(_) => {
509                        //当前读流已经在异步等待唤醒,或者已被非当前调用锁住,则休眠后继续内部重试
510                        spin_len = spin(spin_len);
511                        continue;
512                    },
513                    Ok(_) => {
514                        //对端写流或当前读流未异步等待唤醒,且已被当前调用锁住
515                        unsafe {
516                            *self.0.stream_waker.get() = Some(cx.waker().clone()); //设置读流的唤醒器
517                        }
518                        self.0.stream_status.store(2, Ordering::Release); //设置对端写流和当前读流的唤醒器状态为已就绪
519
520                        return Poll::Pending;
521                    },
522                }
523            }
524        }
525    }
526}
527
528impl<T, U> FusedStream for AsyncDownStream<T, U> {
529    // 判断读流是否真的已终止
530    #[inline]
531    fn is_terminated(&self) -> bool {
532        if self.
533            0
534            .is_terminated_stream
535            .load(Ordering::Acquire) {
536            //读流已终止,则继续判断当前读流是否为空
537            self.0.stream.is_empty()
538        } else {
539            //读流未终止
540            false
541        }
542    }
543}
544
545impl<T, U> Sink<U> for AsyncDownStream<T, U> {
546    type Error = Error;
547
548    // 用户检查写流是否已满,如果当前写流已满,则异步等待对端读流消耗了写流的帧以后,再唤醒异步等待
549    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
550        if self
551            .0
552            .is_terminated_sink
553            .load(Ordering::Acquire) {
554            //当前的写流已终止
555            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll ready failed, reason: sink already terminated"))));
556        }
557
558        if self.0.sink.is_full() {
559            //当前的写流已满
560            let mut spin_len = 1;
561            loop {
562                match self.0.sink_status.compare_exchange(0,
563                                                          1,
564                                                          Ordering::AcqRel,
565                                                          Ordering::Relaxed) {
566                    Err(current) => {
567                        //对端读流或当前写流已经在异步等待唤醒,或者已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
568                        if spin_len >= 3 {
569                            //内部重试次数已达限制,则立即返回失败
570                            return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll ready failed, current: {}, reason: sink or peer busy", current))));
571                        }
572
573                        //内部重试次数未达限制,则休眠后继续内部重试
574                        spin_len = spin(spin_len);
575                        continue;
576                    },
577                    Ok(_) => {
578                        //对端读流或当前写流未异步等待唤醒,且已被当前调用锁住
579                        unsafe {
580                            *self.0.sink_waker.get() = Some(cx.waker().clone()); //设置写流的唤醒器
581                        }
582                        self.0.sink_status.store(3, Ordering::Release); //设置等待写流未满的唤醒器状态为已就绪
583
584                        return Poll::Pending;
585                    },
586                }
587            }
588        } else {
589            //当前的写流未满,则立即返回写流已就绪
590            //对端读流或当前写流即使已经在异步等待唤醒,也不会唤醒正在等待唤醒的对端读流或当前写流
591            Poll::Ready(Ok(()))
592        }
593    }
594
595    // 同步向写流写入帧,外部调用必须保证在每次调用start_send前调用poll_ready且返回成功
596    fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> {
597        if self
598            .0
599            .is_terminated_sink
600            .load(Ordering::Acquire) {
601            //当前的写流已终止
602            return Err(Error::new(ErrorKind::BrokenPipe, format!("Start send failed, reason: sink already terminated")));
603        }
604
605        if let Err(_) = self.0.sink.push(item) {
606            //写流已满,则立即返回失败,但允许重试
607            return Err(Error::new(ErrorKind::WouldBlock, format!("Start send failed, reason: buffer already full")));
608        }
609
610        Ok(())
611    }
612
613    // 刷新只会唤醒对端读流的异步等待,即通知对端读流有新帧,刷新不会异步等待唤醒
614    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
615        if self
616            .0
617            .is_terminated_sink
618            .load(Ordering::Acquire) {
619            //当前的写流已终止
620            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll flush failed, reason: sink already terminated"))));
621        }
622
623        let mut spin_len = 1;
624        loop {
625            match self.0.sink_status.compare_exchange(0,
626                                                      1,
627                                                      Ordering::AcqRel,
628                                                      Ordering::Relaxed) {
629                Err(1) => {
630                    //已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
631                    if spin_len >= 3 {
632                        //内部重试次数已达限制,则立即返回失败
633                        return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll flush failed, current: 1, reason: sink or peer busy"))));
634                    }
635
636                    //内部重试次数未达限制,则休眠后继续内部重试
637                    spin_len = spin(spin_len);
638                    continue;
639                },
640                Err(2) => {
641                    //对端读流已经在异步等待唤醒,则立即唤醒对端的读流,并返回刷新成功
642                    unsafe {
643                        if let Some(waker) = (*self.0.sink_waker.get()).take() {
644                            //对端设置的唤醒器存在,则立即唤醒对端的读流
645                            waker.wake();
646                            self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
647                        }
648                    }
649
650                    return Poll::Ready(Ok(()));
651                },
652                Err(3) => {
653                    //当前写流已经在异步等待唤醒,则忽略刷新,并立即返回刷新成功
654                    return Poll::Ready(Ok(()));
655                },
656                _ => {
657                    //已被当前调用锁住,则忽略刷新,并立即返回刷新成功
658                    self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
659                    return Poll::Ready(Ok(()));
660                },
661            }
662        }
663    }
664
665    // 终止当前写流,在终止前会尝试刷新一次写流,并唤醒对端读流或当前写流的任何异步等待
666    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
667        if self
668            .0
669            .is_terminated_sink
670            .load(Ordering::Acquire) {
671            //当前的写流已终止
672            return Poll::Ready(Ok(()));
673        }
674
675        let mut spin_len = 1;
676        loop {
677            match self.0.sink_status.compare_exchange(0,
678                                                      1,
679                                                      Ordering::AcqRel,
680                                                      Ordering::Relaxed) {
681                Err(1) => {
682                    //已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
683                    if spin_len >= 3 {
684                        //内部重试次数已达限制,则立即返回失败
685                        return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll close failed, current: 1, reason: sink or peer busy"))));
686                    }
687
688                    //内部重试次数未达限制,则休眠后继续内部重试
689                    spin_len = spin(spin_len);
690                    continue;
691                },
692                Err(_) => {
693                    //对端读流或当前写流已经在异步等待唤醒,则立即终止对端读流和当前写流,然后唤醒对端读流或当前写流,并立即返回成功
694                    //对端需要从对端读流获取所有剩余帧后,再终止对端读流
695                    let _ = self
696                        .0
697                        .is_terminated_sink
698                        .compare_exchange(false,
699                                          true,
700                                          Ordering::AcqRel,
701                                          Ordering::Relaxed);
702
703                    unsafe {
704                        if let Some(waker) = (*self.0.sink_waker.get()).take() {
705                            //对端设置的唤醒器存在,则立即唤醒对端的读流
706                            waker.wake();
707                            self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
708                        }
709                    }
710
711                    return Poll::Ready(Ok(()));
712                },
713                Ok(_) => {
714                    //已被当前调用锁住,则立即终止对端读流和当前写流,并立即返回成功
715                    let _ = self
716                        .0
717                        .is_terminated_sink
718                        .compare_exchange(false,
719                                          true,
720                                          Ordering::AcqRel,
721                                          Ordering::Relaxed);
722                    self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
723
724                    return Poll::Ready(Ok(()));
725                },
726            }
727        }
728    }
729}
730
731impl<T, U> AsyncPipeLine<T, U> for AsyncDownStream<T, U> {}
732
733///
734/// 线程安全的异步上游
735///
736pub struct AsyncUpStream<U, T = U>(Arc<InnerAsyncFlow<U, T>>);
737
738unsafe impl<U, T> Send for AsyncUpStream<U, T> {}
739unsafe impl<U, T> Sync for AsyncUpStream<U, T> {}
740
741impl<U, T> Clone for AsyncUpStream<U, T> {
742    fn clone(&self) -> Self {
743        AsyncUpStream(self.0.clone())
744    }
745}
746
747impl<U, T> Stream for AsyncUpStream<U, T> {
748    type Item = U;
749
750    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
751        if self.is_terminated() {
752            //当前的读流已终止
753            return Poll::Ready(None);
754        }
755
756        let mut spin_len = 1;
757        loop {
758            if let Some(frame) = self.0.stream.pop() {
759                //当前的读流有数据
760                return Poll::Ready(Some(frame));
761            } else {
762                //当前的读流无数据
763                match self.0.stream_status.compare_exchange(0,
764                                                            1,
765                                                            Ordering::AcqRel,
766                                                            Ordering::Relaxed) {
767                    Err(3) => {
768                        //对端写流已经在异步等待唤醒,则立即唤醒对端写流,并继续内部重试
769                        unsafe {
770                            if let Some(waker) = (*self.0.stream_waker.get()).take() {
771                                //对端设置的唤醒器存在,则立即唤醒对端的写流
772                                waker.wake();
773                                self.0.stream_status.store(0, Ordering::Release); //设置对端写流和当前读流的状态为已初始化
774                            }
775                        }
776                        continue;
777                    },
778                    Err(_) => {
779                        //当前读流已经在异步等待唤醒,或者已被非当前调用锁住,则休眠后继续内部重试
780                        spin_len = spin(spin_len);
781                        continue;
782                    },
783                    Ok(_) => {
784                        //对端写流或当前读流未异步等待唤醒,且已被当前调用锁住
785                        unsafe {
786                            *self.0.stream_waker.get() = Some(cx.waker().clone()); //设置读流的唤醒器
787                        }
788                        self.0.stream_status.store(2, Ordering::Release); //设置对端写流和当前读流的唤醒器状态为已就绪
789
790                        return Poll::Pending;
791                    },
792                }
793            }
794        }
795    }
796}
797
798impl<T, U> FusedStream for AsyncUpStream<U, T> {
799    // 判断读流是否真的已终止
800    #[inline]
801    fn is_terminated(&self) -> bool {
802        if self.
803            0
804            .is_terminated_stream
805            .load(Ordering::Acquire) {
806            //读流已终止,则继续判断当前读流是否为空
807            self.0.stream.is_empty()
808        } else {
809            //读流未终止
810            false
811        }
812    }
813}
814
815impl<U, T> Sink<T> for AsyncUpStream<U, T> {
816    type Error = Error;
817
818    // 用户检查写流是否已满,如果当前写流已满,则异步等待对端读流消耗了写流的帧以后,再唤醒异步等待
819    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
820        if self
821            .0
822            .is_terminated_sink
823            .load(Ordering::Acquire) {
824            //当前的写流已终止
825            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll ready failed, reason: sink already terminated"))));
826        }
827
828        if self.0.sink.is_full() {
829            //当前的写流已满
830            let mut spin_len = 1;
831            loop {
832                match self.0.sink_status.compare_exchange(0,
833                                                          1,
834                                                          Ordering::AcqRel,
835                                                          Ordering::Relaxed) {
836                    Err(current) => {
837                        //对端读流或当前写流已经在异步等待唤醒,或者已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
838                        if spin_len >= 3 {
839                            //内部重试次数已达限制,则立即返回失败
840                            return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll ready failed, current: {}, reason: sink or peer busy", current))));
841                        }
842
843                        //内部重试次数未达限制,则休眠后继续内部重试
844                        spin_len = spin(spin_len);
845                        continue;
846                    },
847                    Ok(_) => {
848                        //对端读流或当前写流未异步等待唤醒,且已被当前调用锁住
849                        unsafe {
850                            *self.0.sink_waker.get() = Some(cx.waker().clone()); //设置写流的唤醒器
851                        }
852                        self.0.sink_status.store(3, Ordering::Release); //设置等待写流未满的唤醒器状态为已就绪
853
854                        return Poll::Pending;
855                    },
856                }
857            }
858        } else {
859            //当前的写流未满,则立即返回写流已就绪
860            //对端读流或当前写流即使已经在异步等待唤醒,也不会唤醒正在等待唤醒的对端读流或当前写流
861            Poll::Ready(Ok(()))
862        }
863    }
864
865    // 同步向写流写入帧,外部调用必须保证在每次调用start_send前调用poll_ready且返回成功
866    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
867        if self
868            .0
869            .is_terminated_sink
870            .load(Ordering::Acquire) {
871            //当前的写流已终止
872            return Err(Error::new(ErrorKind::BrokenPipe, format!("Start send failed, reason: sink already terminated")));
873        }
874
875        if let Err(_) = self.0.sink.push(item) {
876            //写流已满,则立即返回失败,但允许重试
877            return Err(Error::new(ErrorKind::WouldBlock, format!("Start send failed, reason: buffer already full")));
878        }
879
880        Ok(())
881    }
882
883    // 刷新只会唤醒对端读流的异步等待,即通知对端读流有新帧,刷新不会异步等待唤醒
884    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
885        if self
886            .0
887            .is_terminated_sink
888            .load(Ordering::Acquire) {
889            //当前的写流已终止
890            return Poll::Ready(Err(Error::new(ErrorKind::BrokenPipe, format!("Poll flush failed, reason: sink already terminated"))));
891        }
892
893        let mut spin_len = 1;
894        loop {
895            match self.0.sink_status.compare_exchange(0,
896                                                      1,
897                                                      Ordering::AcqRel,
898                                                      Ordering::Relaxed) {
899                Err(1) => {
900                    //已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
901                    if spin_len >= 3 {
902                        //内部重试次数已达限制,则立即返回失败
903                        return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll flush failed, current: 1, reason: sink or peer busy"))));
904                    }
905
906                    //内部重试次数未达限制,则休眠后继续内部重试
907                    spin_len = spin(spin_len);
908                    continue;
909                },
910                Err(2) => {
911                    //对端读流已经在异步等待唤醒,则立即唤醒对端的读流,并返回刷新成功
912                    unsafe {
913                        if let Some(waker) = (*self.0.sink_waker.get()).take() {
914                            //对端设置的唤醒器存在,则立即唤醒对端的读流
915                            waker.wake();
916                            self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
917                        }
918                    }
919
920                    return Poll::Ready(Ok(()));
921                },
922                Err(3) => {
923                    //当前写流已经在异步等待唤醒,则忽略刷新,并立即返回刷新成功
924                    return Poll::Ready(Ok(()));
925                },
926                _ => {
927                    //已被当前调用锁住,则忽略刷新,并立即返回刷新成功
928                    self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
929                    return Poll::Ready(Ok(()));
930                },
931            }
932        }
933    }
934
935    // 终止当前写流,在终止前会尝试刷新一次写流,并唤醒对端读流或当前写流的任何异步等待
936    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
937        if self
938            .0
939            .is_terminated_sink
940            .load(Ordering::Acquire) {
941            //当前的写流已终止
942            return Poll::Ready(Ok(()));
943        }
944
945        let mut spin_len = 1;
946        loop {
947            match self.0.sink_status.compare_exchange(0,
948                                                      1,
949                                                      Ordering::AcqRel,
950                                                      Ordering::Relaxed) {
951                Err(1) => {
952                    //已被非当前调用锁住,则尝试指定次数的内部重试,重试失败则立即返回失败,但允许外部重试
953                    if spin_len >= 3 {
954                        //内部重试次数已达限制,则立即返回失败
955                        return Poll::Ready(Err(Error::new(ErrorKind::WouldBlock, format!("Poll close failed, current: 1, reason: sink or peer busy"))));
956                    }
957
958                    //内部重试次数未达限制,则休眠后继续内部重试
959                    spin_len = spin(spin_len);
960                    continue;
961                },
962                Err(_) => {
963                    //对端读流或当前写流已经在异步等待唤醒,则立即终止对端读流和当前写流,然后唤醒对端读流或当前写流,并立即返回成功
964                    //对端需要从对端读流获取所有剩余帧后,再终止对端读流
965                    let _ = self
966                        .0
967                        .is_terminated_sink
968                        .compare_exchange(false,
969                                          true,
970                                          Ordering::AcqRel,
971                                          Ordering::Relaxed);
972
973                    unsafe {
974                        if let Some(waker) = (*self.0.sink_waker.get()).take() {
975                            //对端设置的唤醒器存在,则立即唤醒对端的读流
976                            waker.wake();
977                            self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
978                        }
979                    }
980
981                    return Poll::Ready(Ok(()));
982                },
983                Ok(_) => {
984                    //已被当前调用锁住,则立即终止对端读流和当前写流,并立即返回成功
985                    let _ = self
986                        .0
987                        .is_terminated_sink
988                        .compare_exchange(false,
989                                          true,
990                                          Ordering::AcqRel,
991                                          Ordering::Relaxed);
992                    self.0.sink_status.store(0, Ordering::Release); //设置对端读流和当前写流的状态为已初始化
993
994                    return Poll::Ready(Ok(()));
995                },
996            }
997        }
998    }
999}
1000
1001impl<U, T> AsyncPipeLine<U, T> for AsyncUpStream<U, T> {}
1002
1003// 内部线程安全的异步流
1004struct InnerAsyncFlow<X, Y = X> {
1005    is_terminated_stream:   Arc<AtomicBool>,                //是否已终止读流
1006    stream:                 Arc<ArrayQueue<X>>,             //读流
1007    stream_waker:           Arc<UnsafeCell<Option<Waker>>>, //读流唤醒器
1008    stream_status:          Arc<AtomicU8>,                  //读流的唤醒器状态
1009    is_terminated_sink:     Arc<AtomicBool>,                //是否已终止写流
1010    sink:                   Arc<ArrayQueue<Y>>,             //写流
1011    sink_waker:             Arc<UnsafeCell<Option<Waker>>>, //写流唤醒器
1012    sink_status:            Arc<AtomicU8>,                  //写流的唤醒器状态
1013}
1014