use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::storage::schema::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueueSide {
Left, Right, }
#[derive(Debug, Clone)]
pub struct QueueMessage {
pub seq: u64,
pub payload: Value,
pub priority: Option<i32>,
pub enqueued_at_ns: u64,
pub attempts: u32,
}
pub struct QueueStore {
messages: BTreeMap<u64, QueueMessage>,
next_seq: AtomicU64,
max_size: usize,
priority_mode: bool,
priority_index: Option<BTreeMap<(std::cmp::Reverse<i32>, u64), u64>>,
}
impl QueueStore {
pub fn new(max_size: usize) -> Self {
Self {
messages: BTreeMap::new(),
next_seq: AtomicU64::new(1),
max_size,
priority_mode: false,
priority_index: None,
}
}
pub fn new_priority(max_size: usize) -> Self {
Self {
messages: BTreeMap::new(),
next_seq: AtomicU64::new(1),
max_size,
priority_mode: true,
priority_index: Some(BTreeMap::new()),
}
}
pub fn push_back(&mut self, payload: Value, priority: Option<i32>) -> Result<u64, QueueError> {
if self.max_size > 0 && self.messages.len() >= self.max_size {
return Err(QueueError::Full);
}
let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
let now_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let msg = QueueMessage {
seq,
payload,
priority,
enqueued_at_ns: now_ns,
attempts: 0,
};
if let Some(ref mut idx) = self.priority_index {
idx.insert((std::cmp::Reverse(priority.unwrap_or(0)), seq), seq);
}
self.messages.insert(seq, msg);
Ok(seq)
}
pub fn push_front(&mut self, payload: Value, priority: Option<i32>) -> Result<u64, QueueError> {
if self.max_size > 0 && self.messages.len() >= self.max_size {
return Err(QueueError::Full);
}
let seq = self
.messages
.keys()
.next()
.copied()
.unwrap_or(1)
.saturating_sub(1);
let now_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let msg = QueueMessage {
seq,
payload,
priority,
enqueued_at_ns: now_ns,
attempts: 0,
};
if let Some(ref mut idx) = self.priority_index {
idx.insert((std::cmp::Reverse(priority.unwrap_or(0)), seq), seq);
}
self.messages.insert(seq, msg);
Ok(seq)
}
pub fn pop_front(&mut self) -> Option<QueueMessage> {
if self.priority_mode {
if let Some(ref mut idx) = self.priority_index {
let key = idx.keys().next().copied()?;
let seq = idx.remove(&key)?;
return self.messages.remove(&seq);
}
}
let seq = *self.messages.keys().next()?;
let msg = self.messages.remove(&seq)?;
if let Some(ref mut idx) = self.priority_index {
idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
}
Some(msg)
}
pub fn pop_back(&mut self) -> Option<QueueMessage> {
let seq = *self.messages.keys().next_back()?;
let msg = self.messages.remove(&seq)?;
if let Some(ref mut idx) = self.priority_index {
idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
}
Some(msg)
}
pub fn peek_front(&self, count: usize) -> Vec<&QueueMessage> {
if self.priority_mode {
if let Some(ref idx) = self.priority_index {
return idx
.values()
.take(count)
.filter_map(|seq| self.messages.get(seq))
.collect();
}
}
self.messages.values().take(count).collect()
}
pub fn len(&self) -> usize {
self.messages.len()
}
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
pub fn is_full(&self) -> bool {
self.max_size > 0 && self.messages.len() >= self.max_size
}
pub fn purge(&mut self) -> usize {
let count = self.messages.len();
self.messages.clear();
if let Some(ref mut idx) = self.priority_index {
idx.clear();
}
count
}
pub fn get(&self, seq: u64) -> Option<&QueueMessage> {
self.messages.get(&seq)
}
pub fn remove(&mut self, seq: u64) -> Option<QueueMessage> {
let msg = self.messages.remove(&seq)?;
if let Some(ref mut idx) = self.priority_index {
idx.remove(&(std::cmp::Reverse(msg.priority.unwrap_or(0)), seq));
}
Some(msg)
}
pub fn increment_attempts(&mut self, seq: u64) -> Option<u32> {
if let Some(msg) = self.messages.get_mut(&seq) {
msg.attempts += 1;
Some(msg.attempts)
} else {
None
}
}
pub fn is_priority(&self) -> bool {
self.priority_mode
}
pub fn memory_bytes(&self) -> usize {
let mut size = std::mem::size_of::<Self>();
size += self.messages.len() * (std::mem::size_of::<QueueMessage>() + 48);
if let Some(ref idx) = self.priority_index {
size += idx.len() * 32;
}
size
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum QueueError {
Full,
NotFound(u64),
}
impl std::fmt::Display for QueueError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Full => write!(f, "queue is full"),
Self::NotFound(seq) => write!(f, "message {} not found", seq),
}
}
}
impl std::error::Error for QueueError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_queue_fifo() {
let mut q = QueueStore::new(0);
q.push_back(Value::text("first"), None).unwrap();
q.push_back(Value::text("second"), None).unwrap();
q.push_back(Value::text("third"), None).unwrap();
assert_eq!(q.len(), 3);
let msg = q.pop_front().unwrap();
assert_eq!(msg.payload, Value::text("first"));
let msg = q.pop_front().unwrap();
assert_eq!(msg.payload, Value::text("second"));
}
#[test]
fn test_queue_lifo() {
let mut q = QueueStore::new(0);
q.push_back(Value::text("first"), None).unwrap();
q.push_back(Value::text("second"), None).unwrap();
let msg = q.pop_back().unwrap();
assert_eq!(msg.payload, Value::text("second"));
}
#[test]
fn test_queue_lpush() {
let mut q = QueueStore::new(0);
q.push_back(Value::text("middle"), None).unwrap();
q.push_front(Value::text("front"), None).unwrap();
let msg = q.pop_front().unwrap();
assert_eq!(msg.payload, Value::text("front"));
}
#[test]
fn test_queue_max_size() {
let mut q = QueueStore::new(2);
assert!(q.push_back(Value::Integer(1), None).is_ok());
assert!(q.push_back(Value::Integer(2), None).is_ok());
assert_eq!(q.push_back(Value::Integer(3), None), Err(QueueError::Full));
assert!(q.is_full());
}
#[test]
fn test_queue_priority() {
let mut q = QueueStore::new_priority(0);
q.push_back(Value::text("low"), Some(1)).unwrap();
q.push_back(Value::text("high"), Some(10)).unwrap();
q.push_back(Value::text("medium"), Some(5)).unwrap();
let msg = q.pop_front().unwrap();
assert_eq!(msg.payload, Value::text("high"));
let msg = q.pop_front().unwrap();
assert_eq!(msg.payload, Value::text("medium"));
let msg = q.pop_front().unwrap();
assert_eq!(msg.payload, Value::text("low"));
}
#[test]
fn test_queue_peek() {
let mut q = QueueStore::new(0);
q.push_back(Value::text("a"), None).unwrap();
q.push_back(Value::text("b"), None).unwrap();
q.push_back(Value::text("c"), None).unwrap();
let peeked = q.peek_front(2);
assert_eq!(peeked.len(), 2);
assert_eq!(q.len(), 3); }
#[test]
fn test_queue_purge() {
let mut q = QueueStore::new(0);
q.push_back(Value::Integer(1), None).unwrap();
q.push_back(Value::Integer(2), None).unwrap();
let purged = q.purge();
assert_eq!(purged, 2);
assert!(q.is_empty());
}
#[test]
fn test_queue_remove_by_seq() {
let mut q = QueueStore::new(0);
let seq1 = q.push_back(Value::Integer(1), None).unwrap();
let seq2 = q.push_back(Value::Integer(2), None).unwrap();
let removed = q.remove(seq1).unwrap();
assert_eq!(removed.payload, Value::Integer(1));
assert_eq!(q.len(), 1);
assert!(q.get(seq2).is_some());
}
#[test]
fn test_queue_attempts() {
let mut q = QueueStore::new(0);
let seq = q.push_back(Value::text("msg"), None).unwrap();
assert_eq!(q.get(seq).unwrap().attempts, 0);
q.increment_attempts(seq);
assert_eq!(q.get(seq).unwrap().attempts, 1);
q.increment_attempts(seq);
assert_eq!(q.get(seq).unwrap().attempts, 2);
}
}