parallel_processor/execution_manager/
async_channel.rs1#![allow(dead_code)]
2
3use crossbeam::queue::SegQueue;
4use parking_lot::{Condvar, Mutex};
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::task::{Context, Poll, Waker};
10
11pub struct ReceiverFuture<'a, T: Sync + Send + 'static, const CHANNELS_COUNT: usize> {
12 internal: &'a AsyncChannelInternal<T, CHANNELS_COUNT>,
13 offset: usize,
14 stream_index: u64,
15}
16
17#[inline(always)]
18fn try_get_item<T: Sync + Send + 'static, const CHANNELS_COUNT: usize>(
19 internal: &AsyncChannelInternal<T, CHANNELS_COUNT>,
20 offset: usize,
21) -> Option<T> {
22 if let Some(packet) = internal.packets[offset..]
23 .iter()
24 .map(|ch| ch.pop())
25 .filter(|p| p.is_some())
26 .next()
27 .flatten()
28 {
29 return Some(packet);
30 }
31 internal.packets[..offset]
32 .iter()
33 .map(|ch| ch.pop())
34 .filter(|p| p.is_some())
35 .next()
36 .flatten()
37}
38
39impl<'a, T: Sync + Send + 'static, const CHANNELS_COUNT: usize> Future
40 for ReceiverFuture<'a, T, CHANNELS_COUNT>
41{
42 type Output = Result<T, ()>;
43
44 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
45 match try_get_item(self.internal, self.offset) {
46 None => {
47 if self.internal.stream_index.load(Ordering::SeqCst) != self.stream_index {
48 Poll::Ready(try_get_item(self.internal, self.offset).ok_or(()))
49 } else {
50 self.internal.waiting_list.push(cx.waker().clone());
51 if let Some(value) = try_get_item(self.internal, self.offset) {
52 Poll::Ready(Ok(value))
53 } else if self.internal.stream_index.load(Ordering::SeqCst) != self.stream_index
54 {
55 Poll::Ready(try_get_item(self.internal, self.offset).ok_or(()))
56 } else {
57 Poll::Pending
58 }
59 }
60 }
61 Some(value) => Poll::Ready(Ok(value)),
62 }
63 }
64}
65
66struct AsyncChannelInternal<T: Sync + Send + 'static, const CHANNELS_COUNT: usize> {
67 packets: [SegQueue<T>; CHANNELS_COUNT],
68 waiting_list: SegQueue<Waker>,
69 blocking_mutex: Mutex<()>,
70 blocking_condvar: Condvar,
71 max_capacity: usize,
72 stream_index: AtomicU64,
73}
74
75pub(crate) struct MultiplePriorityAsyncChannel<
76 T: Sync + Send + 'static,
77 const CHANNELS_COUNT: usize,
78> {
79 internal: Arc<AsyncChannelInternal<T, CHANNELS_COUNT>>,
80 stream_index: AtomicU64,
81}
82
83impl<T: Sync + Send + 'static, const CHANNELS_COUNT: usize> Clone
84 for MultiplePriorityAsyncChannel<T, CHANNELS_COUNT>
85{
86 fn clone(&self) -> Self {
87 Self {
88 internal: self.internal.clone(),
89 stream_index: AtomicU64::new(self.stream_index.load(Ordering::SeqCst)),
90 }
91 }
92}
93
94impl<T: Sync + Send + 'static, const CHANNELS_COUNT: usize>
95 MultiplePriorityAsyncChannel<T, CHANNELS_COUNT>
96{
97 pub fn new(max_capacity: usize) -> Self {
98 Self {
99 internal: Arc::new(AsyncChannelInternal {
100 packets: [(); CHANNELS_COUNT].map(|_| SegQueue::new()),
101 waiting_list: SegQueue::new(),
102 blocking_mutex: Mutex::new(()),
103 blocking_condvar: Condvar::new(),
104 max_capacity,
105 stream_index: AtomicU64::new(0),
106 }),
107 stream_index: AtomicU64::new(0),
108 }
109 }
110
111 pub fn recv(&self) -> ReceiverFuture<T, CHANNELS_COUNT> {
112 self.recv_offset(0)
113 }
114
115 pub fn recv_offset(&self, offset: usize) -> ReceiverFuture<T, CHANNELS_COUNT> {
116 ReceiverFuture {
117 internal: &self.internal,
118 offset,
119 stream_index: self.stream_index.load(Ordering::SeqCst),
120 }
121 }
122
123 pub fn try_recv(&self) -> Option<T> {
124 try_get_item(&self.internal, 0)
125 }
126
127 pub fn recv_blocking(&self) -> Result<T, ()> {
128 match try_get_item(&self.internal, 0) {
129 None => {
130 let stream_index = self.stream_index.load(Ordering::SeqCst);
131 let mut lock_mutex = self.internal.blocking_mutex.lock();
132 loop {
133 if self.internal.stream_index.load(Ordering::SeqCst) != stream_index {
134 return Err(());
135 }
136 if let Some(packet) = try_get_item(&self.internal, 0) {
137 return Ok(packet);
138 }
139 self.internal.blocking_condvar.wait(&mut lock_mutex);
140 }
141 }
142 Some(packet) => Ok(packet),
143 }
144 }
145
146 pub fn reopen(&self) {
147 self.stream_index.store(
148 self.internal.stream_index.load(Ordering::SeqCst),
149 Ordering::SeqCst,
150 );
151 }
152
153 pub fn release(&self) {
154 self.internal.stream_index.fetch_add(1, Ordering::SeqCst);
155 while let Some(waker) = self.internal.waiting_list.pop() {
156 waker.wake();
157 }
158 self.internal.blocking_condvar.notify_all();
159 }
160
161 pub fn send_with_priority(&self, value: T, priority: usize, limit_size: bool) {
162 let packets_len: usize = self.internal.packets.iter().map(|p| p.len()).sum();
163 if !limit_size || packets_len < self.internal.max_capacity {
164 self.internal.packets[priority].push(value);
165
166 for _ in 0..self.internal.packets.len() {
167 if let Some(waker) = self.internal.waiting_list.pop() {
168 waker.wake();
169 } else {
170 break;
171 }
172 }
173 if packets_len == 0 {
174 self.internal.blocking_condvar.notify_one();
175 } else {
176 self.internal.blocking_condvar.notify_all();
177 }
178 }
179 }
180
181 pub fn len(&self) -> usize {
182 self.internal.packets.len()
183 }
184}
185
186pub(crate) type AsyncChannel<T> = MultiplePriorityAsyncChannel<T, 1>;
187
188impl<T: Sync + Send + 'static> AsyncChannel<T> {
189 pub fn send(&self, value: T, limit_size: bool) {
190 self.send_with_priority(value, 0, limit_size);
191 }
192}
193
194pub(crate) type DoublePriorityAsyncChannel<T> = MultiplePriorityAsyncChannel<T, 2>;
195
196impl<T: Sync + Send + 'static> DoublePriorityAsyncChannel<T> {
197 pub fn send(&self, value: T, limit_size: bool, high_priority: bool) {
198 self.send_with_priority(value, if high_priority { 0 } else { 1 }, limit_size);
199 }
200}