rust_rsm/common/
atomicqueue.rs

1
2#![allow(non_camel_case_types)]
3#![allow(non_snake_case)]
4#![allow(non_upper_case_globals)]
5// thread safe Dequeue
6use std::sync::{Mutex,Arc};
7use std::sync::Condvar;
8use crate::common::{spin_lock::spin_lock_t,errcode};
9use std::alloc::{self,Layout};
10use std::{mem,ptr};
11use std::iter::{Iterator};
12
13const INVALID_INDEX:usize = usize::MAX; 
14pub struct AtomicDequeue<T> {
15    inner: *mut T,
16    length:usize,
17    limit:usize,
18    head:usize,
19    tail:usize,
20    locked:spin_lock_t,
21    cond:Condvar,
22    has_data:Arc<Mutex<bool>>,
23}
24//const MAX_BURST_SIZE:usize = 16;
25impl<T> AtomicDequeue<T> {
26    pub fn new(capacity:usize) -> Self {
27        use crate::alg;
28        let new_cap = 1 << (alg::log2(capacity as u64) + 1);   
29        let pdata = unsafe { alloc::alloc(Layout::from_size_align_unchecked(new_cap*mem::size_of::<T>(), 1)) as *mut T};
30        return Self {
31            inner: pdata,
32            length:0,
33            limit:new_cap,
34            head:INVALID_INDEX,
35            tail:INVALID_INDEX,
36            locked:spin_lock_t::new(),
37            cond:Condvar::new(),
38            has_data:Arc::new(Mutex::new(false)),
39        };
40    }
41    pub fn push_back(&mut self, v: T)->errcode::RESULT {
42         self.locked.lock();
43         let res = if self.length<self.limit {
44            self.get_next_tail_index();
45            self.buffer_write(self.tail, v);
46            self.length+=1;
47            errcode::RESULT_SUCCESS   
48         } else {
49            errcode::ERROR_OUTOF_MEM
50         };
51         
52         self.locked.unlock();
53         return res
54    }
55    pub fn push_front(&mut self, v: T)->errcode::RESULT {
56        self.locked.lock();
57        let res = if self.length<self.limit {
58            self.get_prev_head_index();
59            self.buffer_write(self.head, v);
60            self.length+=1;
61            errcode::RESULT_SUCCESS 
62         } else {
63            errcode::ERROR_OUTOF_MEM
64         };
65        self.locked.unlock();
66        return res
67    }
68
69    pub fn pop_back(&mut self) -> Option<T> {
70        self.locked.lock();
71        if self.length>0 {            
72            let data = self.buffer_read(self.tail);
73            self.length-=1;
74            self.get_prev_tail_index();
75            self.locked.unlock();
76            return Some(data);
77                       
78         } else {
79            self.locked.unlock();
80            return None
81         }
82    }
83
84    pub fn pop_front(&mut self) -> Option<T> {
85        self.locked.lock();
86        if self.length>0 {            
87            let data = self.buffer_read(self.head);
88            self.length-=1;
89            self.get_next_head_index();
90            self.locked.unlock();
91            return Some(data);
92                       
93         } else {
94            self.locked.unlock();
95            return None
96         }
97    }
98
99    #[inline(always)]
100    fn buffer_read(&mut self, off: usize) -> T {
101        unsafe { ptr::read(self.inner.add(off)) }
102    }
103
104    /// Writes an element into the buffer, moving it.
105    #[inline(always)]
106    fn buffer_write(&mut self, off: usize, value: T) {
107        unsafe {
108            ptr::write(self.inner.add(off), value);
109        }
110    }
111
112    #[inline(always)]
113    fn get_next_tail_index(&mut self)->usize {
114        if self.tail==INVALID_INDEX {
115            self.tail=0;
116            self.head=0;
117            return 0;
118        }
119        self.tail  = (self.tail + 1) & !self.limit;
120        self.tail
121    }
122    #[inline(always)]
123    fn get_next_head_index(&mut self)->usize {
124        if self.head==INVALID_INDEX {
125            self.tail=0;
126            self.head=0;
127            return 0;
128        }
129        self.head  = (self.head + 1) & !self.limit;
130        self.head
131    }
132    #[inline(always)]
133    fn get_prev_tail_index(&mut self)->usize {
134        if self.tail==INVALID_INDEX {
135            self.tail=0;
136            self.head=0;
137            return 0;
138        }        
139        if self.tail==0 {
140            self.tail = self.limit-1;
141        } else {
142            self.tail-=1;
143        }
144        self.tail
145
146    }
147    #[inline(always)]
148    fn get_prev_head_index(&mut self)->usize {
149        if self.tail==INVALID_INDEX {
150            self.tail=0;
151            self.head=0;
152            return 0;
153        }          
154        if self.head==0 {
155            self.head = self.limit-1;
156        } else {
157            self.head-=1;
158        }
159        self.head
160    }
161
162
163    ///iter()使用自动加锁,使用后必须要进行手工end_iter()
164    pub fn iter(&self) -> Iter<T> {
165        self.locked.lock();
166        return Iter{cur_idx:0,inner:self}
167    }
168
169    pub fn end_iter(&self) {
170        self.locked.unlock();
171    }
172   ///iter_mut()使用自动加锁,使用后必须要进行手工end_iter()
173    pub fn iter_mut(&mut self) -> Iter<T> {
174        self.locked.lock();
175        return Iter{cur_idx:0,inner:self}
176    }
177
178    fn is_index_valid(&self,index:usize)->bool {
179        if self.head < self.tail {
180            return index>=self.head && index<=self.tail;
181        }
182        return index<=self.head || (index>=self.tail && index <self.limit)
183    }
184
185    pub fn len(&self)->usize {
186        return self.length;
187    }
188
189    pub fn capacity(&self)->usize {
190        self.limit
191    }
192
193    pub fn notify(&self) {        
194        let mut has_data = self.has_data.lock().unwrap();
195        *has_data = true;
196        self.cond.notify_all();
197    }
198    pub fn wait(&self) {
199        let mut l = self.has_data.lock().unwrap();
200        while !(*l) {
201            l = self.cond.wait(l).unwrap();
202        }
203        *l=false;        
204    }
205}
206
207impl<T> Drop for AtomicDequeue<T> {
208    fn drop(&mut self) {
209        let pdata = self.inner as *mut u8;
210        if pdata!=std::ptr::null_mut() {
211            unsafe {
212                alloc::dealloc(pdata, Layout::from_size_align_unchecked(self.limit*mem::size_of::<T>(), 1));
213            }
214            
215        }
216    }
217}
218
219pub struct Iter<'a,T> {
220    cur_idx:usize,
221    inner:&'a AtomicDequeue<T>,
222}
223impl <'a,T> Iterator for Iter<'a,T> {
224    type Item = &'a T;
225    fn next(&mut self)->Option<Self::Item> {
226        if self.inner.length==0 {
227            return None;
228        }
229        if self.cur_idx==INVALID_INDEX {
230            self.cur_idx=0;
231        } else {
232            self.cur_idx=(self.cur_idx+1) % self.inner.limit;
233        }
234        if !self.inner.is_index_valid(self.cur_idx) {
235            return None;
236        }
237        let p = unsafe { &mut *((self.inner.inner as usize+self.cur_idx*mem::size_of::<T>()) as *mut T) };
238        
239        return Some(p);
240    }
241
242}