parallel_processor/execution_manager/
async_channel.rs

1#![allow(dead_code)]
2
3use crossbeam::queue::SegQueue;
4use parking_lot::{Condvar, Mutex};
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::task::{Context, Poll, Waker};
10
11pub struct ReceiverFuture<'a, T: Sync + Send + 'static, const CHANNELS_COUNT: usize> {
12    internal: &'a AsyncChannelInternal<T, CHANNELS_COUNT>,
13    offset: usize,
14    stream_index: u64,
15}
16
17#[inline(always)]
18fn try_get_item<T: Sync + Send + 'static, const CHANNELS_COUNT: usize>(
19    internal: &AsyncChannelInternal<T, CHANNELS_COUNT>,
20    offset: usize,
21) -> Option<T> {
22    if let Some(packet) = internal.packets[offset..]
23        .iter()
24        .map(|ch| ch.pop())
25        .filter(|p| p.is_some())
26        .next()
27        .flatten()
28    {
29        return Some(packet);
30    }
31    internal.packets[..offset]
32        .iter()
33        .map(|ch| ch.pop())
34        .filter(|p| p.is_some())
35        .next()
36        .flatten()
37}
38
39impl<'a, T: Sync + Send + 'static, const CHANNELS_COUNT: usize> Future
40    for ReceiverFuture<'a, T, CHANNELS_COUNT>
41{
42    type Output = Result<T, ()>;
43
44    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45        match try_get_item(self.internal, self.offset) {
46            None => {
47                if self.internal.stream_index.load(Ordering::SeqCst) != self.stream_index {
48                    Poll::Ready(try_get_item(self.internal, self.offset).ok_or(()))
49                } else {
50                    self.internal.waiting_list.push(cx.waker().clone());
51                    if let Some(value) = try_get_item(self.internal, self.offset) {
52                        Poll::Ready(Ok(value))
53                    } else if self.internal.stream_index.load(Ordering::SeqCst) != self.stream_index
54                    {
55                        Poll::Ready(try_get_item(self.internal, self.offset).ok_or(()))
56                    } else {
57                        Poll::Pending
58                    }
59                }
60            }
61            Some(value) => Poll::Ready(Ok(value)),
62        }
63    }
64}
65
66struct AsyncChannelInternal<T: Sync + Send + 'static, const CHANNELS_COUNT: usize> {
67    packets: [SegQueue<T>; CHANNELS_COUNT],
68    waiting_list: SegQueue<Waker>,
69    blocking_mutex: Mutex<()>,
70    blocking_condvar: Condvar,
71    max_capacity: usize,
72    stream_index: AtomicU64,
73}
74
75pub(crate) struct MultiplePriorityAsyncChannel<
76    T: Sync + Send + 'static,
77    const CHANNELS_COUNT: usize,
78> {
79    internal: Arc<AsyncChannelInternal<T, CHANNELS_COUNT>>,
80    stream_index: AtomicU64,
81}
82
83impl<T: Sync + Send + 'static, const CHANNELS_COUNT: usize> Clone
84    for MultiplePriorityAsyncChannel<T, CHANNELS_COUNT>
85{
86    fn clone(&self) -> Self {
87        Self {
88            internal: self.internal.clone(),
89            stream_index: AtomicU64::new(self.stream_index.load(Ordering::SeqCst)),
90        }
91    }
92}
93
94impl<T: Sync + Send + 'static, const CHANNELS_COUNT: usize>
95    MultiplePriorityAsyncChannel<T, CHANNELS_COUNT>
96{
97    pub fn new(max_capacity: usize) -> Self {
98        Self {
99            internal: Arc::new(AsyncChannelInternal {
100                packets: [(); CHANNELS_COUNT].map(|_| SegQueue::new()),
101                waiting_list: SegQueue::new(),
102                blocking_mutex: Mutex::new(()),
103                blocking_condvar: Condvar::new(),
104                max_capacity,
105                stream_index: AtomicU64::new(0),
106            }),
107            stream_index: AtomicU64::new(0),
108        }
109    }
110
111    pub fn recv(&self) -> ReceiverFuture<T, CHANNELS_COUNT> {
112        self.recv_offset(0)
113    }
114
115    pub fn recv_offset(&self, offset: usize) -> ReceiverFuture<T, CHANNELS_COUNT> {
116        ReceiverFuture {
117            internal: &self.internal,
118            offset,
119            stream_index: self.stream_index.load(Ordering::SeqCst),
120        }
121    }
122
123    pub fn try_recv(&self) -> Option<T> {
124        try_get_item(&self.internal, 0)
125    }
126
127    pub fn recv_blocking(&self) -> Result<T, ()> {
128        match try_get_item(&self.internal, 0) {
129            None => {
130                let stream_index = self.stream_index.load(Ordering::SeqCst);
131                let mut lock_mutex = self.internal.blocking_mutex.lock();
132                loop {
133                    if self.internal.stream_index.load(Ordering::SeqCst) != stream_index {
134                        return Err(());
135                    }
136                    if let Some(packet) = try_get_item(&self.internal, 0) {
137                        return Ok(packet);
138                    }
139                    self.internal.blocking_condvar.wait(&mut lock_mutex);
140                }
141            }
142            Some(packet) => Ok(packet),
143        }
144    }
145
146    pub fn reopen(&self) {
147        self.stream_index.store(
148            self.internal.stream_index.load(Ordering::SeqCst),
149            Ordering::SeqCst,
150        );
151    }
152
153    pub fn release(&self) {
154        self.internal.stream_index.fetch_add(1, Ordering::SeqCst);
155        while let Some(waker) = self.internal.waiting_list.pop() {
156            waker.wake();
157        }
158        self.internal.blocking_condvar.notify_all();
159    }
160
161    pub fn send_with_priority(&self, value: T, priority: usize, limit_size: bool) {
162        let packets_len: usize = self.internal.packets.iter().map(|p| p.len()).sum();
163        if !limit_size || packets_len < self.internal.max_capacity {
164            self.internal.packets[priority].push(value);
165
166            for _ in 0..self.internal.packets.len() {
167                if let Some(waker) = self.internal.waiting_list.pop() {
168                    waker.wake();
169                } else {
170                    break;
171                }
172            }
173            if packets_len == 0 {
174                self.internal.blocking_condvar.notify_one();
175            } else {
176                self.internal.blocking_condvar.notify_all();
177            }
178        }
179    }
180
181    pub fn len(&self) -> usize {
182        self.internal.packets.len()
183    }
184}
185
186pub(crate) type AsyncChannel<T> = MultiplePriorityAsyncChannel<T, 1>;
187
188impl<T: Sync + Send + 'static> AsyncChannel<T> {
189    pub fn send(&self, value: T, limit_size: bool) {
190        self.send_with_priority(value, 0, limit_size);
191    }
192}
193
194pub(crate) type DoublePriorityAsyncChannel<T> = MultiplePriorityAsyncChannel<T, 2>;
195
196impl<T: Sync + Send + 'static> DoublePriorityAsyncChannel<T> {
197    pub fn send(&self, value: T, limit_size: bool, high_priority: bool) {
198        self.send_with_priority(value, if high_priority { 0 } else { 1 }, limit_size);
199    }
200}