rust_rsm/common/
atomicqueue.rs1
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(non_upper_case_globals)]
5use std::sync::{Mutex,Arc};
7use std::sync::Condvar;
8use crate::common::{spin_lock::spin_lock_t,errcode};
9use std::alloc::{self,Layout};
10use std::{mem,ptr};
11use std::iter::{Iterator};
12
13const INVALID_INDEX:usize = usize::MAX;
14pub struct AtomicDequeue<T> {
15 inner: *mut T,
16 length:usize,
17 limit:usize,
18 head:usize,
19 tail:usize,
20 locked:spin_lock_t,
21 cond:Condvar,
22 has_data:Arc<Mutex<bool>>,
23}
24impl<T> AtomicDequeue<T> {
26 pub fn new(capacity:usize) -> Self {
27 use crate::alg;
28 let new_cap = 1 << (alg::log2(capacity as u64) + 1);
29 let pdata = unsafe { alloc::alloc(Layout::from_size_align_unchecked(new_cap*mem::size_of::<T>(), 1)) as *mut T};
30 return Self {
31 inner: pdata,
32 length:0,
33 limit:new_cap,
34 head:INVALID_INDEX,
35 tail:INVALID_INDEX,
36 locked:spin_lock_t::new(),
37 cond:Condvar::new(),
38 has_data:Arc::new(Mutex::new(false)),
39 };
40 }
41 pub fn push_back(&mut self, v: T)->errcode::RESULT {
42 self.locked.lock();
43 let res = if self.length<self.limit {
44 self.get_next_tail_index();
45 self.buffer_write(self.tail, v);
46 self.length+=1;
47 errcode::RESULT_SUCCESS
48 } else {
49 errcode::ERROR_OUTOF_MEM
50 };
51
52 self.locked.unlock();
53 return res
54 }
55 pub fn push_front(&mut self, v: T)->errcode::RESULT {
56 self.locked.lock();
57 let res = if self.length<self.limit {
58 self.get_prev_head_index();
59 self.buffer_write(self.head, v);
60 self.length+=1;
61 errcode::RESULT_SUCCESS
62 } else {
63 errcode::ERROR_OUTOF_MEM
64 };
65 self.locked.unlock();
66 return res
67 }
68
69 pub fn pop_back(&mut self) -> Option<T> {
70 self.locked.lock();
71 if self.length>0 {
72 let data = self.buffer_read(self.tail);
73 self.length-=1;
74 self.get_prev_tail_index();
75 self.locked.unlock();
76 return Some(data);
77
78 } else {
79 self.locked.unlock();
80 return None
81 }
82 }
83
84 pub fn pop_front(&mut self) -> Option<T> {
85 self.locked.lock();
86 if self.length>0 {
87 let data = self.buffer_read(self.head);
88 self.length-=1;
89 self.get_next_head_index();
90 self.locked.unlock();
91 return Some(data);
92
93 } else {
94 self.locked.unlock();
95 return None
96 }
97 }
98
99 #[inline(always)]
100 fn buffer_read(&mut self, off: usize) -> T {
101 unsafe { ptr::read(self.inner.add(off)) }
102 }
103
104 #[inline(always)]
106 fn buffer_write(&mut self, off: usize, value: T) {
107 unsafe {
108 ptr::write(self.inner.add(off), value);
109 }
110 }
111
112 #[inline(always)]
113 fn get_next_tail_index(&mut self)->usize {
114 if self.tail==INVALID_INDEX {
115 self.tail=0;
116 self.head=0;
117 return 0;
118 }
119 self.tail = (self.tail + 1) & !self.limit;
120 self.tail
121 }
122 #[inline(always)]
123 fn get_next_head_index(&mut self)->usize {
124 if self.head==INVALID_INDEX {
125 self.tail=0;
126 self.head=0;
127 return 0;
128 }
129 self.head = (self.head + 1) & !self.limit;
130 self.head
131 }
132 #[inline(always)]
133 fn get_prev_tail_index(&mut self)->usize {
134 if self.tail==INVALID_INDEX {
135 self.tail=0;
136 self.head=0;
137 return 0;
138 }
139 if self.tail==0 {
140 self.tail = self.limit-1;
141 } else {
142 self.tail-=1;
143 }
144 self.tail
145
146 }
147 #[inline(always)]
148 fn get_prev_head_index(&mut self)->usize {
149 if self.tail==INVALID_INDEX {
150 self.tail=0;
151 self.head=0;
152 return 0;
153 }
154 if self.head==0 {
155 self.head = self.limit-1;
156 } else {
157 self.head-=1;
158 }
159 self.head
160 }
161
162
163 pub fn iter(&self) -> Iter<T> {
165 self.locked.lock();
166 return Iter{cur_idx:0,inner:self}
167 }
168
169 pub fn end_iter(&self) {
170 self.locked.unlock();
171 }
172 pub fn iter_mut(&mut self) -> Iter<T> {
174 self.locked.lock();
175 return Iter{cur_idx:0,inner:self}
176 }
177
178 fn is_index_valid(&self,index:usize)->bool {
179 if self.head < self.tail {
180 return index>=self.head && index<=self.tail;
181 }
182 return index<=self.head || (index>=self.tail && index <self.limit)
183 }
184
185 pub fn len(&self)->usize {
186 return self.length;
187 }
188
189 pub fn capacity(&self)->usize {
190 self.limit
191 }
192
193 pub fn notify(&self) {
194 let mut has_data = self.has_data.lock().unwrap();
195 *has_data = true;
196 self.cond.notify_all();
197 }
198 pub fn wait(&self) {
199 let mut l = self.has_data.lock().unwrap();
200 while !(*l) {
201 l = self.cond.wait(l).unwrap();
202 }
203 *l=false;
204 }
205}
206
207impl<T> Drop for AtomicDequeue<T> {
208 fn drop(&mut self) {
209 let pdata = self.inner as *mut u8;
210 if pdata!=std::ptr::null_mut() {
211 unsafe {
212 alloc::dealloc(pdata, Layout::from_size_align_unchecked(self.limit*mem::size_of::<T>(), 1));
213 }
214
215 }
216 }
217}
218
219pub struct Iter<'a,T> {
220 cur_idx:usize,
221 inner:&'a AtomicDequeue<T>,
222}
223impl <'a,T> Iterator for Iter<'a,T> {
224 type Item = &'a T;
225 fn next(&mut self)->Option<Self::Item> {
226 if self.inner.length==0 {
227 return None;
228 }
229 if self.cur_idx==INVALID_INDEX {
230 self.cur_idx=0;
231 } else {
232 self.cur_idx=(self.cur_idx+1) % self.inner.limit;
233 }
234 if !self.inner.is_index_valid(self.cur_idx) {
235 return None;
236 }
237 let p = unsafe { &mut *((self.inner.inner as usize+self.cur_idx*mem::size_of::<T>()) as *mut T) };
238
239 return Some(p);
240 }
241
242}