parallel_processor/execution_manager/
packets_channel.rs1#![allow(dead_code)]
2
3use std::sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6};
7
8pub(crate) struct PacketsChannelReceiver<Q> {
9 queue: Arc<Q>,
10}
11impl<Q> Clone for PacketsChannelReceiver<Q> {
12 fn clone(&self) -> Self {
13 Self {
14 queue: self.queue.clone(),
15 }
16 }
17}
18
19pub(crate) struct PacketsChannelSender<Q: PacketsQueue> {
20 queue: Arc<Q>,
21 is_disposed: AtomicBool,
22}
23impl<Q: PacketsQueue> Clone for PacketsChannelSender<Q> {
24 fn clone(&self) -> Self {
25 self.queue.incr_senders_count();
26 Self {
27 queue: self.queue.clone(),
28 is_disposed: AtomicBool::new(false),
29 }
30 }
31}
32
33impl<Q: PacketsQueue> Drop for PacketsChannelSender<Q> {
34 fn drop(&mut self) {
35 if !self.is_disposed.load(Ordering::Relaxed) {
36 self.queue.decr_senders_count();
37 }
38 }
39}
40
41pub(crate) trait PacketsQueue {
42 type Item;
43 fn push(&self, value: Self::Item);
44 fn try_pop(&self) -> Option<Self::Item>;
45 fn pop(&self) -> Option<Self::Item>;
46 fn len(&self) -> usize;
47 fn incr_senders_count(&self);
48 fn decr_senders_count(&self);
49 fn get_senders_count(&self) -> usize;
50}
51
52impl<Q: PacketsQueue> PacketsChannelSender<Q> {
53 #[inline(always)]
54 pub fn send(&self, value: Q::Item) {
55 self.queue.push(value);
56 }
57
58 #[inline(always)]
59 pub fn len(&self) -> usize {
60 self.queue.len()
61 }
62
63 pub fn dispose(&self) {
64 if !self.is_disposed.swap(true, Ordering::Relaxed) {
65 self.queue.decr_senders_count();
66 }
67 }
68}
69
70impl<Q: PacketsQueue> PacketsChannelReceiver<Q> {
71 #[inline(always)]
72 pub fn try_recv(&self) -> Option<Q::Item> {
73 self.queue.try_pop()
74 }
75
76 #[inline(always)]
77 pub fn recv(&self) -> Option<Q::Item> {
78 self.queue.pop()
79 }
80
81 #[inline(always)]
82 pub fn is_active(&self) -> bool {
83 self.queue.get_senders_count() > 0 || self.queue.len() > 0
84 }
85
86 #[inline(always)]
87 pub fn make_sender(&self) -> PacketsChannelSender<Q> {
88 self.queue.incr_senders_count();
89 PacketsChannelSender {
90 queue: self.queue.clone(),
91 is_disposed: AtomicBool::new(false),
92 }
93 }
94
95 #[inline(always)]
96 pub fn len(&self) -> usize {
97 self.queue.len()
98 }
99}
100
101pub mod bounded {
102 use std::{
103 mem::{forget, ManuallyDrop},
104 sync::{
105 atomic::{AtomicBool, AtomicUsize, Ordering},
106 Arc,
107 },
108 };
109
110 use crossbeam::queue::ArrayQueue;
111
112 use crate::execution_manager::{
113 notifier::Notifier,
114 objects_pool::PoolObjectTrait,
115 packets_channel::{PacketsChannelReceiver, PacketsChannelSender, PacketsQueue},
116 scheduler::run_blocking_op,
117 };
118
119 pub(crate) struct BoundedQueue<T> {
120 queue: ArrayQueue<T>,
121 senders_waiting: Notifier,
122 receivers_waiting: Notifier,
123 senders_count: AtomicUsize,
124 }
125
126 impl<T> PacketsQueue for BoundedQueue<T> {
127 type Item = T;
128
129 fn push(&self, value: Self::Item) {
130 if let Err(value) = self.queue.push(value) {
131 let mut value = ManuallyDrop::new(value);
132 run_blocking_op(|| {
133 self.senders_waiting.wait_for_condition(|| {
134 match self.queue.push(unsafe { ManuallyDrop::take(&mut value) }) {
135 Ok(_) => true,
136 Err(value) => {
137 forget(value);
139
140 false
143 }
144 }
145 });
146 });
147 }
148 self.receivers_waiting.notify_one();
149 }
150
151 fn try_pop(&self) -> Option<Self::Item> {
152 let value = self.queue.pop()?;
153 self.senders_waiting.notify_one();
154 Some(value)
155 }
156
157 fn pop(&self) -> Option<Self::Item> {
158 let value = if let Some(value) = self.queue.pop() {
159 Some(value)
160 } else {
161 let mut value = None;
162
163 run_blocking_op(|| {
164 self.receivers_waiting
165 .wait_for_condition(|| match self.queue.pop() {
166 Some(v) => {
167 value = Some(v);
168 true
169 }
170 None => self.senders_count.load(Ordering::Relaxed) == 0,
171 });
172 });
173 value
174 };
175 self.senders_waiting.notify_one();
176 value
177 }
178
179 #[inline(always)]
180 fn len(&self) -> usize {
181 self.queue.len()
182 }
183
184 #[inline(always)]
185 fn incr_senders_count(&self) {
186 self.senders_count.fetch_add(1, Ordering::Relaxed);
187 }
188
189 #[inline(always)]
190 fn decr_senders_count(&self) {
191 let senders_count = self.senders_count.fetch_sub(1, Ordering::Relaxed);
192 if senders_count == 1 {
193 self.receivers_waiting.notify_all();
194 }
195 }
196
197 #[inline(always)]
198 fn get_senders_count(&self) -> usize {
199 self.senders_count.load(Ordering::Relaxed)
200 }
201 }
202
203 pub(crate) type PacketsChannelReceiverBounded<T> = PacketsChannelReceiver<BoundedQueue<T>>;
204 pub(crate) type PacketsChannelSenderBounded<T> = PacketsChannelSender<BoundedQueue<T>>;
205
206 impl<T> PacketsChannelReceiverBounded<T> {}
207 impl<T> PacketsChannelSenderBounded<T> {}
208
209 impl<T: Send + 'static> PoolObjectTrait for PacketsChannelReceiverBounded<T> {
210 type InitData = usize; fn allocate_new(init_data: &Self::InitData) -> Self {
213 Self {
214 queue: Arc::new(BoundedQueue {
215 queue: ArrayQueue::new(*init_data),
216 senders_waiting: Notifier::new(),
217 receivers_waiting: Notifier::new(),
218 senders_count: AtomicUsize::new(0),
219 }),
220 }
221 }
222
223 fn reset(&mut self) {
224 assert_eq!(
225 self.queue.senders_count.load(Ordering::Relaxed),
226 0,
227 "Cannot reset PacketsChannelReceiver while senders are active"
228 );
229 assert_eq!(
230 self.queue.len(),
231 0,
232 "Cannot reset PacketsChannelReceiver while there are items in the queue"
233 );
234 }
235 }
236
237 pub(crate) fn packets_channel_bounded<T: Send + 'static>(
238 max_size: usize,
239 ) -> (
240 PacketsChannelSenderBounded<T>,
241 PacketsChannelReceiverBounded<T>,
242 ) {
243 let internal = Arc::new(BoundedQueue {
244 queue: ArrayQueue::new(max_size),
245 receivers_waiting: Notifier::new(),
246 senders_waiting: Notifier::new(),
247 senders_count: AtomicUsize::new(1),
248 });
249 (
250 PacketsChannelSender {
251 queue: internal.clone(),
252 is_disposed: AtomicBool::new(false),
253 },
254 PacketsChannelReceiver { queue: internal },
255 )
256 }
257}
258
259pub mod unbounded {
260 use crate::execution_manager::{
261 packets_channel::{PacketsChannelReceiver, PacketsChannelSender, PacketsQueue},
262 scheduler::run_blocking_op,
263 };
264 use parking_lot::{Condvar, Mutex};
265 use std::{
266 collections::VecDeque,
267 sync::{
268 atomic::{AtomicBool, AtomicUsize},
269 Arc,
270 },
271 time::Duration,
272 };
273
274 pub(crate) struct UnboundedQueue<T> {
275 queue: Mutex<VecDeque<T>>,
276 receivers_waiting: Condvar,
277 senders_waiting: Condvar,
278 senders_count: AtomicUsize,
279 }
280
281 impl<T> PacketsQueue for UnboundedQueue<T> {
282 type Item = T;
283
284 fn push(&self, value: Self::Item) {
285 let mut queue = self.queue.lock();
286 queue.push_back(value);
287 drop(queue);
288 self.receivers_waiting.notify_one();
289 }
290
291 fn try_pop(&self) -> Option<Self::Item> {
292 self.queue.lock().pop_front()
293 }
294
295 fn pop(&self) -> Option<Self::Item> {
296 let mut queue = self.queue.lock();
297 let value = if let Some(value) = queue.pop_front() {
298 drop(queue);
299 Some(value)
300 } else {
301 run_blocking_op(|| loop {
302 match queue.pop_front() {
303 Some(v) => {
304 drop(queue);
305 return Some(v);
306 }
307 None => {
308 if self
309 .senders_count
310 .load(std::sync::atomic::Ordering::Relaxed)
311 == 0
312 {
313 drop(queue);
314 return None;
315 } else {
316 self.receivers_waiting.wait(&mut queue);
317 }
318 }
319 }
320 })
321 };
322 self.senders_waiting.notify_one();
323 value
324 }
325
326 #[inline(always)]
327 fn len(&self) -> usize {
328 self.queue.lock().len()
329 }
330
331 #[inline(always)]
332 fn incr_senders_count(&self) {
333 self.senders_count
334 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
335 }
336
337 #[inline(always)]
338 fn decr_senders_count(&self) {
339 let senders_count = self
340 .senders_count
341 .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
342 if senders_count == 1 {
343 let _lock = self.queue.lock();
344 self.receivers_waiting.notify_all();
345 }
346 }
347
348 #[inline(always)]
349 fn get_senders_count(&self) -> usize {
350 self.senders_count
351 .load(std::sync::atomic::Ordering::Relaxed)
352 }
353 }
354
355 impl<T> PacketsChannelSenderUnbounded<T> {
356 fn wait_for_space(&self, max_in_queue: usize) {
357 run_blocking_op(|| {
358 let mut queue = self.queue.queue.lock();
359 while queue.len() > max_in_queue {
360 self.queue
361 .senders_waiting
362 .wait_for(&mut queue, Duration::from_millis(50));
363 }
364 });
365 }
366
367 #[inline(always)]
368 pub fn send_batch(
369 &self,
370 values: impl Iterator<Item = T>,
371 max_in_queue: Option<usize>,
372 high_priority: bool,
373 ) {
374 if let Some(max_in_queue) = max_in_queue {
375 self.wait_for_space(max_in_queue);
376 }
377
378 let mut queue = self.queue.queue.lock();
379 if high_priority {
380 for value in values {
381 queue.push_front(value);
382 }
383 } else {
384 for value in values {
385 queue.push_back(value);
386 }
387 }
388 drop(queue);
389 self.queue.receivers_waiting.notify_all();
390 }
391
392 pub fn send_with_priority(
393 &self,
394 value: T,
395 high_priority: bool,
396 max_in_queue: Option<usize>,
397 ) {
398 if let Some(max_in_queue) = max_in_queue {
399 self.wait_for_space(max_in_queue);
400 }
401
402 let mut queue = self.queue.queue.lock();
403 if high_priority {
404 queue.push_front(value);
405 } else {
406 queue.push_back(value);
407 }
408 drop(queue);
409 self.queue.receivers_waiting.notify_one();
410 }
411 }
412
413 pub(crate) type PacketsChannelReceiverUnbounded<T> = PacketsChannelReceiver<UnboundedQueue<T>>;
414 pub(crate) type PacketsChannelSenderUnbounded<T> = PacketsChannelSender<UnboundedQueue<T>>;
415
416 pub(crate) fn packets_channel_unbounded<T: Send + 'static>() -> (
417 PacketsChannelSenderUnbounded<T>,
418 PacketsChannelReceiverUnbounded<T>,
419 ) {
420 let internal = Arc::new(UnboundedQueue {
421 queue: Mutex::new(VecDeque::with_capacity(64)),
422 receivers_waiting: Condvar::new(),
423 senders_waiting: Condvar::new(),
424 senders_count: AtomicUsize::new(1),
425 });
426 (
427 PacketsChannelSender {
428 queue: internal.clone(),
429 is_disposed: AtomicBool::new(false),
430 },
431 PacketsChannelReceiver { queue: internal },
432 )
433 }
434}