1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use async_std::sync::{Mutex, MutexGuard};
use crate::collections::CircularBuffer;
use crate::sync::Condition;
use crate::zasynclock;
pub struct FifoQueue<T> {
buffer: Mutex<CircularBuffer<T>>,
not_empty: Condition,
not_full: Condition,
}
impl<T> FifoQueue<T> {
pub fn new(capacity: usize, concurrency_level: usize) -> FifoQueue<T> {
FifoQueue {
buffer: Mutex::new(CircularBuffer::new(capacity)),
not_empty: Condition::new(concurrency_level),
not_full: Condition::new(concurrency_level),
}
}
pub async fn push(&self, x: T) {
loop {
let mut q = zasynclock!(self.buffer);
if !q.is_full() {
q.push(x);
if self.not_empty.has_waiting_list() {
self.not_empty.notify(q).await;
}
return;
}
self.not_full.wait(q).await;
}
}
pub async fn pull(&self) -> T {
loop {
let mut q = zasynclock!(self.buffer);
if let Some(e) = q.pull() {
if self.not_full.has_waiting_list() {
self.not_full.notify(q).await;
}
return e;
}
self.not_empty.wait(q).await;
}
}
pub async fn drain(&self) -> Drain<'_, T> {
let guard = loop {
let guard = zasynclock!(self.buffer);
if guard.is_empty() {
self.not_empty.wait(guard).await;
} else {
break guard;
}
};
Drain {
queue: self,
drained: false,
guard,
}
}
pub async fn try_drain(&self) -> Drain<'_, T> {
Drain {
queue: self,
drained: false,
guard: zasynclock!(self.buffer),
}
}
}
pub struct Drain<'a, T> {
queue: &'a FifoQueue<T>,
drained: bool,
guard: MutexGuard<'a, CircularBuffer<T>>,
}
impl<'a, T> Drain<'a, T> {
pub async fn drop(self) {
if self.drained && self.queue.not_full.has_waiting_list() {
self.queue.not_full.notify(self.guard).await;
} else {
drop(self.guard);
}
}
}
impl<'a, T> Iterator for Drain<'_, T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<T> {
if let Some(e) = self.guard.pull() {
self.drained = true;
return Some(e);
}
None
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.guard.len(), Some(self.guard.len()))
}
}