use crate::{Chunk, ChunkStorage, Ident};
use crate::value::Value;
use std::rc::Rc;
struct QueueState {
first_chunk_at: usize,
last_chunk_at: usize,
read_at: usize,
write_at: usize,
len: usize,
}
pub struct Queue {
ident: Ident,
typical_chunk_size: usize,
chunks: Vec<Chunk>,
state: Value<QueueState>,
chunks_to_drop: Vec<Chunk>,
storage: Rc<dyn ChunkStorage>
}
enum NextItemRef {
SameChunk(usize),
NextChunk,
}
impl Queue {
pub fn new(ident: &Ident, typical_chunk_size: usize, storage: Rc<dyn ChunkStorage>) -> Self {
let mut queue = Queue {
state: Value::load_or_default(ident.sub("q_state"), QueueState {
first_chunk_at: 0,
last_chunk_at: 0,
read_at: 0,
write_at: 0,
len: 0,
}, Rc::clone(&storage)),
ident: ident.clone(),
typical_chunk_size,
chunks: Vec::new(),
chunks_to_drop: Vec::new(),
storage: storage
};
if queue.state.write_at > 0 {
let mut chunk_offset = queue.state.first_chunk_at;
while chunk_offset <= queue.state.last_chunk_at {
let chunk = queue.storage.load_chunk(ident.sub(chunk_offset));
chunk_offset += chunk.len();
queue.chunks.push(chunk);
}
}
queue
}
pub fn len(&self) -> usize {
self.state.len
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[allow(clippy::cast_ptr_alignment)]
pub unsafe fn enqueue(&mut self, size: usize) -> *mut u8 {
enum EnqueueResult {
Success(*mut u8),
RetryInNewChunkOfSize(usize),
};
let result = {
let ref_size = ::std::mem::size_of::<NextItemRef>();
let min_space = ref_size + size + ref_size;
if let Some(chunk) = self.chunks.last_mut() {
let offset = self.state.write_at - self.state.last_chunk_at;
let entry_ptr = chunk.as_mut_ptr().offset(offset as isize);
if offset + min_space <= chunk.len() {
*(entry_ptr as *mut NextItemRef) = NextItemRef::SameChunk(ref_size + size);
let payload_ptr = entry_ptr.offset(ref_size as isize);
self.state.write_at += ref_size + size;
self.state.len += 1;
EnqueueResult::Success(payload_ptr)
} else {
*(entry_ptr as *mut NextItemRef) = NextItemRef::NextChunk;
let new_chunk_size = ::std::cmp::max(self.typical_chunk_size, min_space);
self.state.last_chunk_at += chunk.len();
self.state.write_at = self.state.last_chunk_at;
EnqueueResult::RetryInNewChunkOfSize(new_chunk_size)
}
} else {
let new_chunk_size = ::std::cmp::max(self.typical_chunk_size, min_space);
EnqueueResult::RetryInNewChunkOfSize(new_chunk_size)
}
};
match result {
EnqueueResult::Success(payload_ptr) => payload_ptr,
EnqueueResult::RetryInNewChunkOfSize(new_chunk_size) => {
self.chunks.push(self.storage.create_chunk(
self.ident.sub(self.state.last_chunk_at),
new_chunk_size,
));
self.enqueue(size)
}
}
}
pub unsafe fn dequeue(&mut self) -> Option<*const u8> {
enum DequeueResult {
Empty,
Success(*const u8),
RetryInNextChunk,
};
let result = if self.state.read_at == self.state.write_at {
DequeueResult::Empty
} else {
let offset = self.state.read_at - self.state.first_chunk_at;
let chunk = &mut self.chunks[0];
let entry_ptr = chunk.as_mut_ptr().offset(offset as isize);
#[allow(clippy::cast_ptr_alignment)]
match *(entry_ptr as *mut NextItemRef) {
NextItemRef::NextChunk => {
self.state.first_chunk_at += chunk.len();
self.state.read_at = self.state.first_chunk_at;
DequeueResult::RetryInNextChunk
}
NextItemRef::SameChunk(total_size) => {
let payload_ptr = entry_ptr.offset(::std::mem::size_of::<NextItemRef>() as isize);
self.state.read_at += total_size;
self.state.len -= 1;
DequeueResult::Success(payload_ptr)
}
}
};
match result {
DequeueResult::Empty => None,
DequeueResult::Success(payload_ptr) => Some(payload_ptr),
DequeueResult::RetryInNextChunk => {
self.chunks_to_drop.push(self.chunks.remove(0));
self.dequeue()
}
}
}
pub unsafe fn drop_old_chunks(&mut self) {
for chunk in self.chunks_to_drop.drain(..) {
self.storage.forget_chunk(chunk);
}
}
}