bbq_rs/
bbq_impl.rs

1use std::{
2    alloc::{Allocator, Global, Layout},
3    fmt::Debug,
4    ops::Deref,
5    ptr::{self, NonNull},
6    sync::{atomic::AtomicUsize, Arc},
7    thread::sleep,
8    time::Duration,
9};
10
11use crate::{bbq_trait::BlockingQueue, error::ErrorContext, Result};
12
13const VSN_BIT_LEN: usize = 32;
14const OFFSET_BIT_LEN: usize = usize::BITS as usize - VSN_BIT_LEN;
15
16enum EnqueueState<T> {
17    Full(T),
18    Busy(T),
19    Available,
20}
21
22enum DequeueState<T> {
23    Empty,
24    Busy,
25    Ok(T),
26}
27
28enum PBlockState {
29    Available,
30    NoEntry,
31    NotAvailable,
32}
33
34enum CBlockState<T> {
35    BlockDone,
36    Consumed(T),
37    NoEntry,
38    NotAvailable,
39}
40
41struct Block<T> {
42    entries: NonNull<T>,
43    /// Indicating which location has been allocated by the producers.
44    allocated_cursor: Arc<Cursor>,
45    /// Indicating which location has been committed by the producers.
46    committed_cursor: Arc<Cursor>,
47    /// Indicating which location has been reserved by the producers.
48    reserved_cursor: Arc<Cursor>,
49    /// Indicating which location has been consumed by the producers.
50    consumed_cursor: Arc<Cursor>,
51    size: usize,
52}
53
54impl<T> Debug for Block<T> {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("Block")
57            .field("allocated_cursor", &self.allocated_cursor)
58            .field("committed_cursor", &self.committed_cursor)
59            .field("reserved_cursor", &self.reserved_cursor)
60            .field("consumed_cursor", &self.consumed_cursor)
61            .field("size", &self.size)
62            .finish()
63    }
64}
65
66impl<T> Block<T> {
67    fn init(size: usize, cursors_offset: usize) -> Result<Self> {
68        let entries_layout = Layout::array::<T>(size)?;
69        let entries = Global.allocate_zeroed(entries_layout)?.cast::<T>();
70
71        Ok(Self {
72            entries,
73            allocated_cursor: Cursor::init_arc(cursors_offset, 0),
74            committed_cursor: Cursor::init_arc(cursors_offset, 0),
75            reserved_cursor: Cursor::init_arc(cursors_offset, 0),
76            consumed_cursor: Cursor::init_arc(cursors_offset, 0),
77            size,
78        })
79    }
80
81    fn allocate_entry(&self) -> Result<Option<&mut T>> {
82        if Cursor::offset(self.allocated_cursor.as_raw()) >= self.size {
83            return Ok(None);
84        }
85
86        let old_allocated_cursor_raw = self
87            .allocated_cursor
88            .inner
89            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
90        let old_allocated_cursor_offset = Cursor::offset(old_allocated_cursor_raw);
91        if old_allocated_cursor_offset >= self.size {
92            return Ok(None);
93        }
94
95        let entry_ref = unsafe {
96            self.entries
97                .as_ptr()
98                .add(old_allocated_cursor_offset)
99                .as_mut()
100                .with_context(|| "entry ptr is null")?
101        };
102        Ok(Some(entry_ref))
103    }
104
105    /// Try to reserved an entry in this Block.
106    ///
107    /// return the offset of reserved if it success.
108    fn try_consume_entry(&self) -> Result<CBlockState<T>> {
109        loop {
110            let reserved_raw = self.reserved_cursor.as_raw();
111            // all spaces had been reserved by others consumer.
112            if Cursor::offset(reserved_raw) >= self.size {
113                return Ok(CBlockState::BlockDone);
114            }
115
116            let committed_raw = self.committed_cursor.as_raw();
117            // means that there have no space to reserved.
118            if Cursor::offset(reserved_raw) == Cursor::offset(committed_raw) {
119                return Ok(CBlockState::NoEntry);
120            }
121
122            // means that there still have producers are allocating at this block.
123            // all consumer actions must execute after there have no executing actions of producer on this block.
124            if Cursor::offset(committed_raw) != self.size {
125                let allocated_raw = self.allocated_cursor.as_raw();
126                if Cursor::offset(allocated_raw) != Cursor::offset(committed_raw) {
127                    return Ok(CBlockState::NotAvailable);
128                }
129            }
130
131            // return the reserved index when it success reserved an entry.
132            if self.reserved_cursor.fetch_max(reserved_raw + 1) == reserved_raw {
133                let entry = unsafe { self.consume_entry_unchecked(Cursor::offset(reserved_raw))? };
134                self.consumed_cursor.fetch_add_offset(1);
135                return Ok(CBlockState::Consumed(entry));
136            }
137        }
138    }
139
140    unsafe fn consume_entry_unchecked(&self, entry_offset: usize) -> Result<T> {
141        let layout = Layout::new::<T>();
142        let entry_ptr = Global.allocate(layout)?.cast::<T>();
143
144        self.entries
145            .as_ptr()
146            .add(entry_offset)
147            .copy_to(entry_ptr.as_ptr(), 1);
148        let entry = entry_ptr.as_uninit_mut().assume_init_read();
149
150        Ok(entry)
151    }
152}
153
154impl<T> Drop for Block<T> {
155    fn drop(&mut self) {
156        unsafe {
157            ptr::drop_in_place(ptr::slice_from_raw_parts_mut(
158                self.entries.as_ptr(),
159                self.size,
160            ));
161        }
162    }
163}
164
165/// The cursor inside is a usize, indicating version + offset,
166struct Cursor {
167    inner: AtomicUsize,
168}
169
170impl Cursor {
171    fn init_arc(offset: usize, vsn: usize) -> Arc<Self> {
172        Arc::new(Self {
173            inner: AtomicUsize::new(Self::new_raw(offset, vsn)),
174        })
175    }
176
177    fn new_raw(offset: usize, vsn: usize) -> usize {
178        (vsn << OFFSET_BIT_LEN) | offset
179    }
180
181    fn fetch_max(&self, raw_val: usize) -> usize {
182        self.inner
183            .fetch_max(raw_val, std::sync::atomic::Ordering::SeqCst)
184    }
185
186    fn fetch_add_offset(&self, count: usize) -> usize {
187        let raw = self
188            .inner
189            .fetch_add(count, std::sync::atomic::Ordering::SeqCst);
190        Cursor::offset(raw)
191    }
192
193    fn as_raw(&self) -> usize {
194        self.inner.load(std::sync::atomic::Ordering::SeqCst)
195    }
196
197    #[inline]
198    fn vsn(raw: usize) -> usize {
199        raw >> OFFSET_BIT_LEN
200    }
201
202    #[inline]
203    fn offset(raw: usize) -> usize {
204        raw << VSN_BIT_LEN >> VSN_BIT_LEN
205    }
206}
207
208impl Debug for Cursor {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        let raw = self.as_raw();
211        f.write_str("{ \n")?;
212        f.write_fmt(format_args!("    offset: {}\n", Cursor::offset(raw)))?;
213        f.write_fmt(format_args!("    vsn: {}\n", Cursor::vsn(raw)))?;
214        f.write_str("}")
215    }
216}
217
218/// This is a concurrent queue that supports multiple producers and multiple consumers.
219///
220/// ## Example
221/// ```
222/// use bbq_rs::Bbq;
223/// use bbq_rs::BlockingQueue;
224///
225/// fn main() {
226///     let queue = Bbq::new(100, 100).unwrap();
227///
228///     // Create four producer threads
229///     for i in 0..4 {
230///         let q = queue.clone();
231///         std::thread::spawn(move || {
232///             q.push(i);
233///         });
234///     }
235///
236///     // Create four consumer threads
237///     for _ in 0..4 {
238///         let q = queue.clone();
239///         std::thread::spawn(move || {
240///             println!("{}", q.pop().unwrap());
241///         });
242///     }
243/// }
244#[derive(Debug, Clone)]
245pub struct Bbq<T> {
246    inner: Arc<BbqInner<T>>,
247}
248
249unsafe impl<T> Send for Bbq<T> where T: Send {}
250unsafe impl<T> Sync for Bbq<T> where T: Sync {}
251
252impl<T> Deref for Bbq<T> {
253    type Target = BbqInner<T>;
254
255    fn deref(&self) -> &Self::Target {
256        self.inner.as_ref()
257    }
258}
259
260impl<T> Debug for BbqInner<T> {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        let blocks = (0..self.blocks_num)
263            .map(|i| unsafe { self.blocks.as_ptr().add(i).as_ref() })
264            .collect::<Vec<Option<&Block<T>>>>();
265        f.debug_struct("BBQInner")
266            .field("blocks", &blocks)
267            .field("phead_idx", &self.phead_idx)
268            .field("chead_idx", &self.chead_idx)
269            .field("blocks_num", &self.blocks_num)
270            .finish()
271    }
272}
273
274pub struct BbqInner<T> {
275    blocks: NonNull<Block<T>>,
276    phead_idx: AtomicUsize,
277    chead_idx: AtomicUsize,
278    blocks_num: usize,
279}
280
281impl<T> Drop for BbqInner<T> {
282    fn drop(&mut self) {
283        unsafe {
284            ptr::drop_in_place(ptr::slice_from_raw_parts_mut(
285                self.blocks.as_ptr(),
286                self.blocks_num,
287            ));
288        }
289    }
290}
291
292impl<T> Bbq<T> {
293    pub fn new(block_size: usize, blocks_num: usize) -> Result<Self> {
294        let blocks_layout = Layout::array::<Block<T>>(blocks_num)?;
295        let blocks = Global.allocate_zeroed(blocks_layout)?.cast::<Block<T>>();
296
297        unsafe {
298            blocks.as_ptr().add(0).write(Block::init(block_size, 0)?);
299            (1..blocks_num).try_for_each(|offset| {
300                blocks
301                    .as_ptr()
302                    .add(offset)
303                    .write(Block::init(block_size, block_size)?);
304                Result::Ok(())
305            })?;
306        }
307
308        Ok(Self {
309            inner: Arc::new(BbqInner {
310                blocks,
311                phead_idx: AtomicUsize::new(0),
312                chead_idx: AtomicUsize::new(0),
313                blocks_num,
314            }),
315        })
316    }
317
318    fn get_phead_and_block(&self) -> Result<(usize, &Block<T>)> {
319        let phead_block_idx = self.phead_idx.load(std::sync::atomic::Ordering::SeqCst);
320
321        let phead_block_ref = unsafe { self.get_block_by_idx(phead_block_idx % self.blocks_num)? };
322        Ok((phead_block_idx, phead_block_ref))
323    }
324
325    fn get_chead_and_block(&self) -> Result<(usize, &Block<T>)> {
326        let chead_block_idx = self.chead_idx.load(std::sync::atomic::Ordering::SeqCst);
327        let chead_block_ref = unsafe { self.get_block_by_idx(chead_block_idx % self.blocks_num)? };
328        Ok((chead_block_idx, chead_block_ref))
329    }
330
331    unsafe fn get_block_by_idx(&self, block_idx: usize) -> Result<&Block<T>> {
332        self.blocks
333            .as_ptr()
334            .add(block_idx)
335            .as_ref()
336            .context("block ptr is null.")
337    }
338
339    fn enqueue(&self, item: T) -> Result<EnqueueState<T>> {
340        loop {
341            let (phead_block_idx, phead_block) = self.get_phead_and_block()?;
342            if let Some(entry_mut_ref) = phead_block.allocate_entry()? {
343                *entry_mut_ref = item;
344                phead_block.committed_cursor.fetch_add_offset(1);
345                return Ok(EnqueueState::Available);
346            } else {
347                match self.advance_phead(phead_block_idx)? {
348                    PBlockState::NoEntry => return Ok(EnqueueState::Busy(item)),
349                    PBlockState::NotAvailable => return Ok(EnqueueState::Full(item)),
350                    PBlockState::Available => continue,
351                }
352            }
353        }
354    }
355
356    fn dequeue(&self) -> Result<DequeueState<T>> {
357        loop {
358            let (chead_idx, chead_block) = self.get_chead_and_block()?;
359            match chead_block.try_consume_entry()? {
360                CBlockState::Consumed(entry) => return Ok(DequeueState::Ok(entry)),
361                CBlockState::NoEntry => return Ok(DequeueState::Empty),
362                CBlockState::NotAvailable => return Ok(DequeueState::Busy),
363                CBlockState::BlockDone => {
364                    if !self.advance_chead(chead_idx)? {
365                        return Ok(DequeueState::Empty);
366                    }
367                }
368            }
369        }
370    }
371
372    fn advance_phead(&self, phead_idx: usize) -> Result<PBlockState> {
373        let phead_block = unsafe { self.get_block_by_idx(phead_idx % self.blocks_num) }?;
374        let phead_next_block = unsafe { self.get_block_by_idx((phead_idx + 1) % self.blocks_num) }?;
375
376        let phead_vsn = Cursor::vsn(phead_block.committed_cursor.as_raw());
377        let nblk_consumed_raw = phead_next_block.consumed_cursor.as_raw();
378        let nblk_consumed_offset = Cursor::offset(nblk_consumed_raw);
379
380        if nblk_consumed_offset == phead_next_block.size {
381            phead_next_block
382                .committed_cursor
383                .fetch_max(Cursor::new_raw(0, phead_vsn + 1));
384            phead_next_block
385                .allocated_cursor
386                .fetch_max(Cursor::new_raw(0, phead_vsn + 1));
387            self.phead_idx
388                .fetch_max(phead_idx + 1, std::sync::atomic::Ordering::SeqCst);
389            return Ok(PBlockState::Available);
390        } else {
391            let nblk_reserved_raw = phead_next_block.reserved_cursor.as_raw();
392            let nblk_reserved_offset = Cursor::offset(nblk_reserved_raw);
393            if nblk_reserved_offset == nblk_consumed_offset {
394                return Ok(PBlockState::NoEntry);
395            } else {
396                return Ok(PBlockState::NotAvailable);
397            }
398        }
399    }
400
401    fn advance_chead(&self, chead_idx: usize) -> Result<bool> {
402        let chead_block = unsafe { self.get_block_by_idx(chead_idx % self.blocks_num) }?;
403        let chead_next_block = unsafe { self.get_block_by_idx((chead_idx + 1) % self.blocks_num) }?;
404
405        let chead_vsn = Cursor::vsn(chead_block.consumed_cursor.as_raw());
406        let nblk_committed_vsn = Cursor::vsn(chead_next_block.committed_cursor.as_raw());
407
408        // producer haven't produce on next block.
409        // The logic is still right if cancel this check, but maybe causing more competition with producers?
410        if nblk_committed_vsn < chead_vsn + 1 {
411            return Ok(false);
412        }
413
414        chead_next_block
415            .consumed_cursor
416            .fetch_max(Cursor::new_raw(0, chead_vsn + 1));
417        chead_next_block
418            .reserved_cursor
419            .fetch_max(Cursor::new_raw(0, chead_vsn + 1));
420        self.chead_idx
421            .fetch_max(chead_idx + 1, std::sync::atomic::Ordering::SeqCst);
422
423        Ok(true)
424    }
425}
426
427const SLEEP_MILLES: u64 = 10;
428
429impl<T> BlockingQueue for Bbq<T> {
430    type Item = T;
431
432    /// Blocking until send this item successful.
433    fn push(&self, item: Self::Item) -> Result<()> {
434        let mut item = item;
435        loop {
436            match self.enqueue(item)? {
437                EnqueueState::Full(it) => item = it,
438                EnqueueState::Busy(it) => item = it,
439                EnqueueState::Available => return Ok(()),
440            }
441            // yield thread, stop wasting cpu
442            sleep(Duration::from_millis(SLEEP_MILLES));
443        }
444    }
445
446    /// Blocking until get a item from this queue.
447    fn pop(&self) -> Result<Self::Item> {
448        loop {
449            if let DequeueState::Ok(item) = self.dequeue()? {
450                return Ok(item);
451            }
452            // yield thread, stop wasting cpu
453            sleep(Duration::from_millis(SLEEP_MILLES));
454        }
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use std::thread;
461
462    use crate::{bbq_impl::Cursor, Bbq, BlockingQueue};
463
464    #[test]
465    fn test_cursor() {
466        let cursor = Cursor::new_raw(4, 11);
467        assert_eq!(4, Cursor::offset(cursor));
468        assert_eq!(11, Cursor::vsn(cursor));
469
470        let cursor = Cursor::init_arc(0, 0);
471
472        let old = cursor.fetch_max(Cursor::new_raw(1, 0));
473        assert_eq!(0, Cursor::offset(old));
474        assert_eq!(0, Cursor::vsn(old));
475
476        let old = cursor.fetch_max(Cursor::new_raw(0, 1));
477        assert_eq!(1, Cursor::offset(old));
478        assert_eq!(0, Cursor::vsn(old));
479    }
480
481    #[test]
482    fn test_push_and_pop() {
483        let bbq = Bbq::<u64>::new(2, 3).unwrap();
484        bbq.push(11).unwrap();
485        bbq.push(12).unwrap();
486        bbq.push(13).unwrap();
487        bbq.push(14).unwrap();
488
489        bbq.pop().unwrap();
490        bbq.pop().unwrap();
491        bbq.pop().unwrap();
492
493        bbq.push(15).unwrap();
494
495        bbq.push(16).unwrap();
496        bbq.pop().unwrap();
497
498        bbq.push(17).unwrap();
499        bbq.push(18).unwrap();
500    }
501
502    #[test]
503    fn test_push_pop_concurrent() {
504        let bbq_1 = Bbq::<u64>::new(1000, 1000).unwrap();
505        let bbq_2 = bbq_1.clone();
506        let bbq_3 = bbq_1.clone();
507        let bbq_4 = bbq_1.clone();
508        let bbq_5 = bbq_1.clone();
509        let bbq_6 = bbq_1.clone();
510
511        let handle_1 = thread::spawn(move || {
512            for i in 0..200_000 {
513                bbq_1.push(i).unwrap();
514            }
515        });
516
517        let handle_2 = thread::spawn(move || {
518            for i in 0..200_000 {
519                bbq_2.push(i).unwrap();
520            }
521        });
522
523        let handle_3 = thread::spawn(move || {
524            for i in 0..200_000 {
525                bbq_3.push(i).unwrap();
526            }
527        });
528
529        let handle_4 = thread::spawn(move || {
530            for _ in 0..200_000 {
531                bbq_4.pop().unwrap();
532            }
533        });
534
535        let handle_5 = thread::spawn(move || {
536            for _ in 0..200_000 {
537                bbq_5.pop().unwrap();
538            }
539        });
540
541        let handle_6 = thread::spawn(move || {
542            for _ in 0..200_000 {
543                bbq_6.pop().unwrap();
544            }
545        });
546
547        handle_1.join().unwrap();
548        handle_2.join().unwrap();
549        handle_3.join().unwrap();
550        handle_4.join().unwrap();
551        handle_5.join().unwrap();
552        handle_6.join().unwrap();
553    }
554}