1use std::{
2 alloc::{Allocator, Global, Layout},
3 fmt::Debug,
4 ops::Deref,
5 ptr::{self, NonNull},
6 sync::{atomic::AtomicUsize, Arc},
7 thread::sleep,
8 time::Duration,
9};
10
11use crate::{bbq_trait::BlockingQueue, error::ErrorContext, Result};
12
13const VSN_BIT_LEN: usize = 32;
14const OFFSET_BIT_LEN: usize = usize::BITS as usize - VSN_BIT_LEN;
15
16enum EnqueueState<T> {
17 Full(T),
18 Busy(T),
19 Available,
20}
21
22enum DequeueState<T> {
23 Empty,
24 Busy,
25 Ok(T),
26}
27
28enum PBlockState {
29 Available,
30 NoEntry,
31 NotAvailable,
32}
33
34enum CBlockState<T> {
35 BlockDone,
36 Consumed(T),
37 NoEntry,
38 NotAvailable,
39}
40
41struct Block<T> {
42 entries: NonNull<T>,
43 allocated_cursor: Arc<Cursor>,
45 committed_cursor: Arc<Cursor>,
47 reserved_cursor: Arc<Cursor>,
49 consumed_cursor: Arc<Cursor>,
51 size: usize,
52}
53
54impl<T> Debug for Block<T> {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("Block")
57 .field("allocated_cursor", &self.allocated_cursor)
58 .field("committed_cursor", &self.committed_cursor)
59 .field("reserved_cursor", &self.reserved_cursor)
60 .field("consumed_cursor", &self.consumed_cursor)
61 .field("size", &self.size)
62 .finish()
63 }
64}
65
66impl<T> Block<T> {
67 fn init(size: usize, cursors_offset: usize) -> Result<Self> {
68 let entries_layout = Layout::array::<T>(size)?;
69 let entries = Global.allocate_zeroed(entries_layout)?.cast::<T>();
70
71 Ok(Self {
72 entries,
73 allocated_cursor: Cursor::init_arc(cursors_offset, 0),
74 committed_cursor: Cursor::init_arc(cursors_offset, 0),
75 reserved_cursor: Cursor::init_arc(cursors_offset, 0),
76 consumed_cursor: Cursor::init_arc(cursors_offset, 0),
77 size,
78 })
79 }
80
81 fn allocate_entry(&self) -> Result<Option<&mut T>> {
82 if Cursor::offset(self.allocated_cursor.as_raw()) >= self.size {
83 return Ok(None);
84 }
85
86 let old_allocated_cursor_raw = self
87 .allocated_cursor
88 .inner
89 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
90 let old_allocated_cursor_offset = Cursor::offset(old_allocated_cursor_raw);
91 if old_allocated_cursor_offset >= self.size {
92 return Ok(None);
93 }
94
95 let entry_ref = unsafe {
96 self.entries
97 .as_ptr()
98 .add(old_allocated_cursor_offset)
99 .as_mut()
100 .with_context(|| "entry ptr is null")?
101 };
102 Ok(Some(entry_ref))
103 }
104
105 fn try_consume_entry(&self) -> Result<CBlockState<T>> {
109 loop {
110 let reserved_raw = self.reserved_cursor.as_raw();
111 if Cursor::offset(reserved_raw) >= self.size {
113 return Ok(CBlockState::BlockDone);
114 }
115
116 let committed_raw = self.committed_cursor.as_raw();
117 if Cursor::offset(reserved_raw) == Cursor::offset(committed_raw) {
119 return Ok(CBlockState::NoEntry);
120 }
121
122 if Cursor::offset(committed_raw) != self.size {
125 let allocated_raw = self.allocated_cursor.as_raw();
126 if Cursor::offset(allocated_raw) != Cursor::offset(committed_raw) {
127 return Ok(CBlockState::NotAvailable);
128 }
129 }
130
131 if self.reserved_cursor.fetch_max(reserved_raw + 1) == reserved_raw {
133 let entry = unsafe { self.consume_entry_unchecked(Cursor::offset(reserved_raw))? };
134 self.consumed_cursor.fetch_add_offset(1);
135 return Ok(CBlockState::Consumed(entry));
136 }
137 }
138 }
139
140 unsafe fn consume_entry_unchecked(&self, entry_offset: usize) -> Result<T> {
141 let layout = Layout::new::<T>();
142 let entry_ptr = Global.allocate(layout)?.cast::<T>();
143
144 self.entries
145 .as_ptr()
146 .add(entry_offset)
147 .copy_to(entry_ptr.as_ptr(), 1);
148 let entry = entry_ptr.as_uninit_mut().assume_init_read();
149
150 Ok(entry)
151 }
152}
153
154impl<T> Drop for Block<T> {
155 fn drop(&mut self) {
156 unsafe {
157 ptr::drop_in_place(ptr::slice_from_raw_parts_mut(
158 self.entries.as_ptr(),
159 self.size,
160 ));
161 }
162 }
163}
164
165struct Cursor {
167 inner: AtomicUsize,
168}
169
170impl Cursor {
171 fn init_arc(offset: usize, vsn: usize) -> Arc<Self> {
172 Arc::new(Self {
173 inner: AtomicUsize::new(Self::new_raw(offset, vsn)),
174 })
175 }
176
177 fn new_raw(offset: usize, vsn: usize) -> usize {
178 (vsn << OFFSET_BIT_LEN) | offset
179 }
180
181 fn fetch_max(&self, raw_val: usize) -> usize {
182 self.inner
183 .fetch_max(raw_val, std::sync::atomic::Ordering::SeqCst)
184 }
185
186 fn fetch_add_offset(&self, count: usize) -> usize {
187 let raw = self
188 .inner
189 .fetch_add(count, std::sync::atomic::Ordering::SeqCst);
190 Cursor::offset(raw)
191 }
192
193 fn as_raw(&self) -> usize {
194 self.inner.load(std::sync::atomic::Ordering::SeqCst)
195 }
196
197 #[inline]
198 fn vsn(raw: usize) -> usize {
199 raw >> OFFSET_BIT_LEN
200 }
201
202 #[inline]
203 fn offset(raw: usize) -> usize {
204 raw << VSN_BIT_LEN >> VSN_BIT_LEN
205 }
206}
207
208impl Debug for Cursor {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 let raw = self.as_raw();
211 f.write_str("{ \n")?;
212 f.write_fmt(format_args!(" offset: {}\n", Cursor::offset(raw)))?;
213 f.write_fmt(format_args!(" vsn: {}\n", Cursor::vsn(raw)))?;
214 f.write_str("}")
215 }
216}
217
218#[derive(Debug, Clone)]
245pub struct Bbq<T> {
246 inner: Arc<BbqInner<T>>,
247}
248
249unsafe impl<T> Send for Bbq<T> where T: Send {}
250unsafe impl<T> Sync for Bbq<T> where T: Sync {}
251
252impl<T> Deref for Bbq<T> {
253 type Target = BbqInner<T>;
254
255 fn deref(&self) -> &Self::Target {
256 self.inner.as_ref()
257 }
258}
259
260impl<T> Debug for BbqInner<T> {
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 let blocks = (0..self.blocks_num)
263 .map(|i| unsafe { self.blocks.as_ptr().add(i).as_ref() })
264 .collect::<Vec<Option<&Block<T>>>>();
265 f.debug_struct("BBQInner")
266 .field("blocks", &blocks)
267 .field("phead_idx", &self.phead_idx)
268 .field("chead_idx", &self.chead_idx)
269 .field("blocks_num", &self.blocks_num)
270 .finish()
271 }
272}
273
274pub struct BbqInner<T> {
275 blocks: NonNull<Block<T>>,
276 phead_idx: AtomicUsize,
277 chead_idx: AtomicUsize,
278 blocks_num: usize,
279}
280
281impl<T> Drop for BbqInner<T> {
282 fn drop(&mut self) {
283 unsafe {
284 ptr::drop_in_place(ptr::slice_from_raw_parts_mut(
285 self.blocks.as_ptr(),
286 self.blocks_num,
287 ));
288 }
289 }
290}
291
292impl<T> Bbq<T> {
293 pub fn new(block_size: usize, blocks_num: usize) -> Result<Self> {
294 let blocks_layout = Layout::array::<Block<T>>(blocks_num)?;
295 let blocks = Global.allocate_zeroed(blocks_layout)?.cast::<Block<T>>();
296
297 unsafe {
298 blocks.as_ptr().add(0).write(Block::init(block_size, 0)?);
299 (1..blocks_num).try_for_each(|offset| {
300 blocks
301 .as_ptr()
302 .add(offset)
303 .write(Block::init(block_size, block_size)?);
304 Result::Ok(())
305 })?;
306 }
307
308 Ok(Self {
309 inner: Arc::new(BbqInner {
310 blocks,
311 phead_idx: AtomicUsize::new(0),
312 chead_idx: AtomicUsize::new(0),
313 blocks_num,
314 }),
315 })
316 }
317
318 fn get_phead_and_block(&self) -> Result<(usize, &Block<T>)> {
319 let phead_block_idx = self.phead_idx.load(std::sync::atomic::Ordering::SeqCst);
320
321 let phead_block_ref = unsafe { self.get_block_by_idx(phead_block_idx % self.blocks_num)? };
322 Ok((phead_block_idx, phead_block_ref))
323 }
324
325 fn get_chead_and_block(&self) -> Result<(usize, &Block<T>)> {
326 let chead_block_idx = self.chead_idx.load(std::sync::atomic::Ordering::SeqCst);
327 let chead_block_ref = unsafe { self.get_block_by_idx(chead_block_idx % self.blocks_num)? };
328 Ok((chead_block_idx, chead_block_ref))
329 }
330
331 unsafe fn get_block_by_idx(&self, block_idx: usize) -> Result<&Block<T>> {
332 self.blocks
333 .as_ptr()
334 .add(block_idx)
335 .as_ref()
336 .context("block ptr is null.")
337 }
338
339 fn enqueue(&self, item: T) -> Result<EnqueueState<T>> {
340 loop {
341 let (phead_block_idx, phead_block) = self.get_phead_and_block()?;
342 if let Some(entry_mut_ref) = phead_block.allocate_entry()? {
343 *entry_mut_ref = item;
344 phead_block.committed_cursor.fetch_add_offset(1);
345 return Ok(EnqueueState::Available);
346 } else {
347 match self.advance_phead(phead_block_idx)? {
348 PBlockState::NoEntry => return Ok(EnqueueState::Busy(item)),
349 PBlockState::NotAvailable => return Ok(EnqueueState::Full(item)),
350 PBlockState::Available => continue,
351 }
352 }
353 }
354 }
355
356 fn dequeue(&self) -> Result<DequeueState<T>> {
357 loop {
358 let (chead_idx, chead_block) = self.get_chead_and_block()?;
359 match chead_block.try_consume_entry()? {
360 CBlockState::Consumed(entry) => return Ok(DequeueState::Ok(entry)),
361 CBlockState::NoEntry => return Ok(DequeueState::Empty),
362 CBlockState::NotAvailable => return Ok(DequeueState::Busy),
363 CBlockState::BlockDone => {
364 if !self.advance_chead(chead_idx)? {
365 return Ok(DequeueState::Empty);
366 }
367 }
368 }
369 }
370 }
371
372 fn advance_phead(&self, phead_idx: usize) -> Result<PBlockState> {
373 let phead_block = unsafe { self.get_block_by_idx(phead_idx % self.blocks_num) }?;
374 let phead_next_block = unsafe { self.get_block_by_idx((phead_idx + 1) % self.blocks_num) }?;
375
376 let phead_vsn = Cursor::vsn(phead_block.committed_cursor.as_raw());
377 let nblk_consumed_raw = phead_next_block.consumed_cursor.as_raw();
378 let nblk_consumed_offset = Cursor::offset(nblk_consumed_raw);
379
380 if nblk_consumed_offset == phead_next_block.size {
381 phead_next_block
382 .committed_cursor
383 .fetch_max(Cursor::new_raw(0, phead_vsn + 1));
384 phead_next_block
385 .allocated_cursor
386 .fetch_max(Cursor::new_raw(0, phead_vsn + 1));
387 self.phead_idx
388 .fetch_max(phead_idx + 1, std::sync::atomic::Ordering::SeqCst);
389 return Ok(PBlockState::Available);
390 } else {
391 let nblk_reserved_raw = phead_next_block.reserved_cursor.as_raw();
392 let nblk_reserved_offset = Cursor::offset(nblk_reserved_raw);
393 if nblk_reserved_offset == nblk_consumed_offset {
394 return Ok(PBlockState::NoEntry);
395 } else {
396 return Ok(PBlockState::NotAvailable);
397 }
398 }
399 }
400
401 fn advance_chead(&self, chead_idx: usize) -> Result<bool> {
402 let chead_block = unsafe { self.get_block_by_idx(chead_idx % self.blocks_num) }?;
403 let chead_next_block = unsafe { self.get_block_by_idx((chead_idx + 1) % self.blocks_num) }?;
404
405 let chead_vsn = Cursor::vsn(chead_block.consumed_cursor.as_raw());
406 let nblk_committed_vsn = Cursor::vsn(chead_next_block.committed_cursor.as_raw());
407
408 if nblk_committed_vsn < chead_vsn + 1 {
411 return Ok(false);
412 }
413
414 chead_next_block
415 .consumed_cursor
416 .fetch_max(Cursor::new_raw(0, chead_vsn + 1));
417 chead_next_block
418 .reserved_cursor
419 .fetch_max(Cursor::new_raw(0, chead_vsn + 1));
420 self.chead_idx
421 .fetch_max(chead_idx + 1, std::sync::atomic::Ordering::SeqCst);
422
423 Ok(true)
424 }
425}
426
427const SLEEP_MILLES: u64 = 10;
428
429impl<T> BlockingQueue for Bbq<T> {
430 type Item = T;
431
432 fn push(&self, item: Self::Item) -> Result<()> {
434 let mut item = item;
435 loop {
436 match self.enqueue(item)? {
437 EnqueueState::Full(it) => item = it,
438 EnqueueState::Busy(it) => item = it,
439 EnqueueState::Available => return Ok(()),
440 }
441 sleep(Duration::from_millis(SLEEP_MILLES));
443 }
444 }
445
446 fn pop(&self) -> Result<Self::Item> {
448 loop {
449 if let DequeueState::Ok(item) = self.dequeue()? {
450 return Ok(item);
451 }
452 sleep(Duration::from_millis(SLEEP_MILLES));
454 }
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use std::thread;
461
462 use crate::{bbq_impl::Cursor, Bbq, BlockingQueue};
463
464 #[test]
465 fn test_cursor() {
466 let cursor = Cursor::new_raw(4, 11);
467 assert_eq!(4, Cursor::offset(cursor));
468 assert_eq!(11, Cursor::vsn(cursor));
469
470 let cursor = Cursor::init_arc(0, 0);
471
472 let old = cursor.fetch_max(Cursor::new_raw(1, 0));
473 assert_eq!(0, Cursor::offset(old));
474 assert_eq!(0, Cursor::vsn(old));
475
476 let old = cursor.fetch_max(Cursor::new_raw(0, 1));
477 assert_eq!(1, Cursor::offset(old));
478 assert_eq!(0, Cursor::vsn(old));
479 }
480
481 #[test]
482 fn test_push_and_pop() {
483 let bbq = Bbq::<u64>::new(2, 3).unwrap();
484 bbq.push(11).unwrap();
485 bbq.push(12).unwrap();
486 bbq.push(13).unwrap();
487 bbq.push(14).unwrap();
488
489 bbq.pop().unwrap();
490 bbq.pop().unwrap();
491 bbq.pop().unwrap();
492
493 bbq.push(15).unwrap();
494
495 bbq.push(16).unwrap();
496 bbq.pop().unwrap();
497
498 bbq.push(17).unwrap();
499 bbq.push(18).unwrap();
500 }
501
502 #[test]
503 fn test_push_pop_concurrent() {
504 let bbq_1 = Bbq::<u64>::new(1000, 1000).unwrap();
505 let bbq_2 = bbq_1.clone();
506 let bbq_3 = bbq_1.clone();
507 let bbq_4 = bbq_1.clone();
508 let bbq_5 = bbq_1.clone();
509 let bbq_6 = bbq_1.clone();
510
511 let handle_1 = thread::spawn(move || {
512 for i in 0..200_000 {
513 bbq_1.push(i).unwrap();
514 }
515 });
516
517 let handle_2 = thread::spawn(move || {
518 for i in 0..200_000 {
519 bbq_2.push(i).unwrap();
520 }
521 });
522
523 let handle_3 = thread::spawn(move || {
524 for i in 0..200_000 {
525 bbq_3.push(i).unwrap();
526 }
527 });
528
529 let handle_4 = thread::spawn(move || {
530 for _ in 0..200_000 {
531 bbq_4.pop().unwrap();
532 }
533 });
534
535 let handle_5 = thread::spawn(move || {
536 for _ in 0..200_000 {
537 bbq_5.pop().unwrap();
538 }
539 });
540
541 let handle_6 = thread::spawn(move || {
542 for _ in 0..200_000 {
543 bbq_6.pop().unwrap();
544 }
545 });
546
547 handle_1.join().unwrap();
548 handle_2.join().unwrap();
549 handle_3.join().unwrap();
550 handle_4.join().unwrap();
551 handle_5.join().unwrap();
552 handle_6.join().unwrap();
553 }
554}