parallel_processor/execution_manager/
packets_channel.rs

1#![allow(dead_code)]
2
3use std::sync::{
4    atomic::{AtomicBool, Ordering},
5    Arc,
6};
7
8pub(crate) struct PacketsChannelReceiver<Q> {
9    queue: Arc<Q>,
10}
11impl<Q> Clone for PacketsChannelReceiver<Q> {
12    fn clone(&self) -> Self {
13        Self {
14            queue: self.queue.clone(),
15        }
16    }
17}
18
19pub(crate) struct PacketsChannelSender<Q: PacketsQueue> {
20    queue: Arc<Q>,
21    is_disposed: AtomicBool,
22}
23impl<Q: PacketsQueue> Clone for PacketsChannelSender<Q> {
24    fn clone(&self) -> Self {
25        self.queue.incr_senders_count();
26        Self {
27            queue: self.queue.clone(),
28            is_disposed: AtomicBool::new(false),
29        }
30    }
31}
32
33impl<Q: PacketsQueue> Drop for PacketsChannelSender<Q> {
34    fn drop(&mut self) {
35        if !self.is_disposed.load(Ordering::Relaxed) {
36            self.queue.decr_senders_count();
37        }
38    }
39}
40
41pub(crate) trait PacketsQueue {
42    type Item;
43    fn push(&self, value: Self::Item);
44    fn try_pop(&self) -> Option<Self::Item>;
45    fn pop(&self) -> Option<Self::Item>;
46    fn len(&self) -> usize;
47    fn incr_senders_count(&self);
48    fn decr_senders_count(&self);
49    fn get_senders_count(&self) -> usize;
50}
51
52impl<Q: PacketsQueue> PacketsChannelSender<Q> {
53    #[inline(always)]
54    pub fn send(&self, value: Q::Item) {
55        self.queue.push(value);
56    }
57
58    #[inline(always)]
59    pub fn len(&self) -> usize {
60        self.queue.len()
61    }
62
63    pub fn dispose(&self) {
64        if !self.is_disposed.swap(true, Ordering::Relaxed) {
65            self.queue.decr_senders_count();
66        }
67    }
68}
69
70impl<Q: PacketsQueue> PacketsChannelReceiver<Q> {
71    #[inline(always)]
72    pub fn try_recv(&self) -> Option<Q::Item> {
73        self.queue.try_pop()
74    }
75
76    #[inline(always)]
77    pub fn recv(&self) -> Option<Q::Item> {
78        self.queue.pop()
79    }
80
81    #[inline(always)]
82    pub fn is_active(&self) -> bool {
83        self.queue.get_senders_count() > 0 || self.queue.len() > 0
84    }
85
86    #[inline(always)]
87    pub fn make_sender(&self) -> PacketsChannelSender<Q> {
88        self.queue.incr_senders_count();
89        PacketsChannelSender {
90            queue: self.queue.clone(),
91            is_disposed: AtomicBool::new(false),
92        }
93    }
94
95    #[inline(always)]
96    pub fn len(&self) -> usize {
97        self.queue.len()
98    }
99}
100
101pub mod bounded {
102    use std::{
103        mem::{forget, ManuallyDrop},
104        sync::{
105            atomic::{AtomicBool, AtomicUsize, Ordering},
106            Arc,
107        },
108    };
109
110    use crossbeam::queue::ArrayQueue;
111
112    use crate::execution_manager::{
113        notifier::Notifier,
114        objects_pool::PoolObjectTrait,
115        packets_channel::{PacketsChannelReceiver, PacketsChannelSender, PacketsQueue},
116        scheduler::run_blocking_op,
117    };
118
119    pub(crate) struct BoundedQueue<T> {
120        queue: ArrayQueue<T>,
121        senders_waiting: Notifier,
122        receivers_waiting: Notifier,
123        senders_count: AtomicUsize,
124    }
125
126    impl<T> PacketsQueue for BoundedQueue<T> {
127        type Item = T;
128
129        fn push(&self, value: Self::Item) {
130            if let Err(value) = self.queue.push(value) {
131                let mut value = ManuallyDrop::new(value);
132                run_blocking_op(|| {
133                    self.senders_waiting.wait_for_condition(|| {
134                        match self.queue.push(unsafe { ManuallyDrop::take(&mut value) }) {
135                            Ok(_) => true,
136                            Err(value) => {
137                                // Forget this value as it is only a copy
138                                forget(value);
139
140                                // If we failed to push, it means the queue is full
141                                // and we need to wait for a receiver to pop an item
142                                false
143                            }
144                        }
145                    });
146                });
147            }
148            self.receivers_waiting.notify_one();
149        }
150
151        fn try_pop(&self) -> Option<Self::Item> {
152            let value = self.queue.pop()?;
153            self.senders_waiting.notify_one();
154            Some(value)
155        }
156
157        fn pop(&self) -> Option<Self::Item> {
158            let value = if let Some(value) = self.queue.pop() {
159                Some(value)
160            } else {
161                let mut value = None;
162
163                run_blocking_op(|| {
164                    self.receivers_waiting
165                        .wait_for_condition(|| match self.queue.pop() {
166                            Some(v) => {
167                                value = Some(v);
168                                true
169                            }
170                            None => self.senders_count.load(Ordering::Relaxed) == 0,
171                        });
172                });
173                value
174            };
175            self.senders_waiting.notify_one();
176            value
177        }
178
179        #[inline(always)]
180        fn len(&self) -> usize {
181            self.queue.len()
182        }
183
184        #[inline(always)]
185        fn incr_senders_count(&self) {
186            self.senders_count.fetch_add(1, Ordering::Relaxed);
187        }
188
189        #[inline(always)]
190        fn decr_senders_count(&self) {
191            let senders_count = self.senders_count.fetch_sub(1, Ordering::Relaxed);
192            if senders_count == 1 {
193                self.receivers_waiting.notify_all();
194            }
195        }
196
197        #[inline(always)]
198        fn get_senders_count(&self) -> usize {
199            self.senders_count.load(Ordering::Relaxed)
200        }
201    }
202
203    pub(crate) type PacketsChannelReceiverBounded<T> = PacketsChannelReceiver<BoundedQueue<T>>;
204    pub(crate) type PacketsChannelSenderBounded<T> = PacketsChannelSender<BoundedQueue<T>>;
205
206    impl<T> PacketsChannelReceiverBounded<T> {}
207    impl<T> PacketsChannelSenderBounded<T> {}
208
209    impl<T: Send + 'static> PoolObjectTrait for PacketsChannelReceiverBounded<T> {
210        type InitData = usize; // max_size
211
212        fn allocate_new(init_data: &Self::InitData) -> Self {
213            Self {
214                queue: Arc::new(BoundedQueue {
215                    queue: ArrayQueue::new(*init_data),
216                    senders_waiting: Notifier::new(),
217                    receivers_waiting: Notifier::new(),
218                    senders_count: AtomicUsize::new(0),
219                }),
220            }
221        }
222
223        fn reset(&mut self) {
224            assert_eq!(
225                self.queue.senders_count.load(Ordering::Relaxed),
226                0,
227                "Cannot reset PacketsChannelReceiver while senders are active"
228            );
229            assert_eq!(
230                self.queue.len(),
231                0,
232                "Cannot reset PacketsChannelReceiver while there are items in the queue"
233            );
234        }
235    }
236
237    pub(crate) fn packets_channel_bounded<T: Send + 'static>(
238        max_size: usize,
239    ) -> (
240        PacketsChannelSenderBounded<T>,
241        PacketsChannelReceiverBounded<T>,
242    ) {
243        let internal = Arc::new(BoundedQueue {
244            queue: ArrayQueue::new(max_size),
245            receivers_waiting: Notifier::new(),
246            senders_waiting: Notifier::new(),
247            senders_count: AtomicUsize::new(1),
248        });
249        (
250            PacketsChannelSender {
251                queue: internal.clone(),
252                is_disposed: AtomicBool::new(false),
253            },
254            PacketsChannelReceiver { queue: internal },
255        )
256    }
257}
258
259pub mod unbounded {
260    use crate::execution_manager::{
261        packets_channel::{PacketsChannelReceiver, PacketsChannelSender, PacketsQueue},
262        scheduler::run_blocking_op,
263    };
264    use parking_lot::{Condvar, Mutex};
265    use std::{
266        collections::VecDeque,
267        sync::{
268            atomic::{AtomicBool, AtomicUsize},
269            Arc,
270        },
271        time::Duration,
272    };
273
274    pub(crate) struct UnboundedQueue<T> {
275        queue: Mutex<VecDeque<T>>,
276        receivers_waiting: Condvar,
277        senders_waiting: Condvar,
278        senders_count: AtomicUsize,
279    }
280
281    impl<T> PacketsQueue for UnboundedQueue<T> {
282        type Item = T;
283
284        fn push(&self, value: Self::Item) {
285            let mut queue = self.queue.lock();
286            queue.push_back(value);
287            drop(queue);
288            self.receivers_waiting.notify_one();
289        }
290
291        fn try_pop(&self) -> Option<Self::Item> {
292            self.queue.lock().pop_front()
293        }
294
295        fn pop(&self) -> Option<Self::Item> {
296            let mut queue = self.queue.lock();
297            let value = if let Some(value) = queue.pop_front() {
298                drop(queue);
299                Some(value)
300            } else {
301                run_blocking_op(|| loop {
302                    match queue.pop_front() {
303                        Some(v) => {
304                            drop(queue);
305                            return Some(v);
306                        }
307                        None => {
308                            if self
309                                .senders_count
310                                .load(std::sync::atomic::Ordering::Relaxed)
311                                == 0
312                            {
313                                drop(queue);
314                                return None;
315                            } else {
316                                self.receivers_waiting.wait(&mut queue);
317                            }
318                        }
319                    }
320                })
321            };
322            self.senders_waiting.notify_one();
323            value
324        }
325
326        #[inline(always)]
327        fn len(&self) -> usize {
328            self.queue.lock().len()
329        }
330
331        #[inline(always)]
332        fn incr_senders_count(&self) {
333            self.senders_count
334                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
335        }
336
337        #[inline(always)]
338        fn decr_senders_count(&self) {
339            let senders_count = self
340                .senders_count
341                .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
342            if senders_count == 1 {
343                let _lock = self.queue.lock();
344                self.receivers_waiting.notify_all();
345            }
346        }
347
348        #[inline(always)]
349        fn get_senders_count(&self) -> usize {
350            self.senders_count
351                .load(std::sync::atomic::Ordering::Relaxed)
352        }
353    }
354
355    impl<T> PacketsChannelSenderUnbounded<T> {
356        fn wait_for_space(&self, max_in_queue: usize) {
357            run_blocking_op(|| {
358                let mut queue = self.queue.queue.lock();
359                while queue.len() > max_in_queue {
360                    self.queue
361                        .senders_waiting
362                        .wait_for(&mut queue, Duration::from_millis(50));
363                }
364            });
365        }
366
367        #[inline(always)]
368        pub fn send_batch(
369            &self,
370            values: impl Iterator<Item = T>,
371            max_in_queue: Option<usize>,
372            high_priority: bool,
373        ) {
374            if let Some(max_in_queue) = max_in_queue {
375                self.wait_for_space(max_in_queue);
376            }
377
378            let mut queue = self.queue.queue.lock();
379            if high_priority {
380                for value in values {
381                    queue.push_front(value);
382                }
383            } else {
384                for value in values {
385                    queue.push_back(value);
386                }
387            }
388            drop(queue);
389            self.queue.receivers_waiting.notify_all();
390        }
391
392        pub fn send_with_priority(
393            &self,
394            value: T,
395            high_priority: bool,
396            max_in_queue: Option<usize>,
397        ) {
398            if let Some(max_in_queue) = max_in_queue {
399                self.wait_for_space(max_in_queue);
400            }
401
402            let mut queue = self.queue.queue.lock();
403            if high_priority {
404                queue.push_front(value);
405            } else {
406                queue.push_back(value);
407            }
408            drop(queue);
409            self.queue.receivers_waiting.notify_one();
410        }
411    }
412
413    pub(crate) type PacketsChannelReceiverUnbounded<T> = PacketsChannelReceiver<UnboundedQueue<T>>;
414    pub(crate) type PacketsChannelSenderUnbounded<T> = PacketsChannelSender<UnboundedQueue<T>>;
415
416    pub(crate) fn packets_channel_unbounded<T: Send + 'static>() -> (
417        PacketsChannelSenderUnbounded<T>,
418        PacketsChannelReceiverUnbounded<T>,
419    ) {
420        let internal = Arc::new(UnboundedQueue {
421            queue: Mutex::new(VecDeque::with_capacity(64)),
422            receivers_waiting: Condvar::new(),
423            senders_waiting: Condvar::new(),
424            senders_count: AtomicUsize::new(1),
425        });
426        (
427            PacketsChannelSender {
428                queue: internal.clone(),
429                is_disposed: AtomicBool::new(false),
430            },
431            PacketsChannelReceiver { queue: internal },
432        )
433    }
434}