use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
use crate::key_buffer::ArenaKeyHandle;
#[derive(Debug, Clone)]
pub struct QueueIndexConfig {
pub base: TableIndexConfig,
pub priority_column: Option<String>,
pub timestamp_column: Option<String>,
pub fifo_column: Option<String>,
pub enable_min_key_cache: bool,
pub enable_size_tracking: bool,
}
impl QueueIndexConfig {
pub fn new(queue_name: impl Into<String>) -> Self {
Self {
base: TableIndexConfig::new(queue_name, IndexPolicy::ScanOptimized),
priority_column: None,
timestamp_column: None,
fifo_column: None,
enable_min_key_cache: true,
enable_size_tracking: true,
}
}
pub fn with_priority_column(mut self, column: impl Into<String>) -> Self {
self.priority_column = Some(column.into());
self
}
pub fn with_timestamp_column(mut self, column: impl Into<String>) -> Self {
self.timestamp_column = Some(column.into());
self
}
pub fn with_fifo_column(mut self, column: impl Into<String>) -> Self {
self.fifo_column = Some(column.into());
self
}
pub fn with_min_key_cache(mut self, enable: bool) -> Self {
self.enable_min_key_cache = enable;
self
}
pub fn with_size_tracking(mut self, enable: bool) -> Self {
self.enable_size_tracking = enable;
self
}
pub fn key_columns(&self) -> Vec<&str> {
let mut columns = Vec::new();
if let Some(ref col) = self.priority_column {
columns.push(col.as_str());
}
if let Some(ref col) = self.timestamp_column {
columns.push(col.as_str());
}
if let Some(ref col) = self.fifo_column {
columns.push(col.as_str());
}
columns
}
}
pub struct QueueIndex<V: Clone + Send + Sync> {
entries: RwLock<BTreeMap<CompositeQueueKey, V>>,
min_key_cache: RwLock<Option<CompositeQueueKey>>,
size: AtomicUsize,
version: AtomicU64,
config: QueueIndexConfig,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CompositeQueueKey {
pub priority: i64,
pub timestamp: u64,
pub sequence: u64,
pub task_id: String,
}
impl CompositeQueueKey {
pub fn new(priority: i64, timestamp: u64, sequence: u64, task_id: impl Into<String>) -> Self {
Self {
priority,
timestamp,
sequence,
task_id: task_id.into(),
}
}
pub fn encode(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(32 + self.task_id.len());
let priority_encoded = (self.priority as i128 + i64::MAX as i128 + 1) as u64;
bytes.extend_from_slice(&priority_encoded.to_be_bytes());
bytes.extend_from_slice(&self.timestamp.to_be_bytes());
bytes.extend_from_slice(&self.sequence.to_be_bytes());
bytes.extend_from_slice(self.task_id.as_bytes());
bytes
}
pub fn decode(bytes: &[u8]) -> Option<Self> {
if bytes.len() < 24 {
return None;
}
let priority_encoded = u64::from_be_bytes(bytes[0..8].try_into().ok()?);
let priority = (priority_encoded as i128 - i64::MAX as i128 - 1) as i64;
let timestamp = u64::from_be_bytes(bytes[8..16].try_into().ok()?);
let sequence = u64::from_be_bytes(bytes[16..24].try_into().ok()?);
let task_id = String::from_utf8(bytes[24..].to_vec()).ok()?;
Some(Self {
priority,
timestamp,
sequence,
task_id,
})
}
}
impl<V: Clone + Send + Sync> QueueIndex<V> {
pub fn new(config: QueueIndexConfig) -> Self {
Self {
entries: RwLock::new(BTreeMap::new()),
min_key_cache: RwLock::new(None),
size: AtomicUsize::new(0),
version: AtomicU64::new(0),
config,
}
}
pub fn insert(&self, key: CompositeQueueKey, value: V) {
let is_new_min = {
let entries = self.entries.read();
entries.first_key_value()
.map(|(min, _)| &key < min)
.unwrap_or(true)
};
{
let mut entries = self.entries.write();
let was_absent = entries.insert(key.clone(), value).is_none();
if was_absent {
self.size.fetch_add(1, Ordering::Relaxed);
}
}
if is_new_min && self.config.enable_min_key_cache {
*self.min_key_cache.write() = Some(key);
}
self.version.fetch_add(1, Ordering::Release);
}
pub fn peek_min(&self) -> Option<(CompositeQueueKey, V)> {
if self.config.enable_min_key_cache {
let cache = self.min_key_cache.read();
if let Some(ref cached_key) = *cache {
let entries = self.entries.read();
if let Some(value) = entries.get(cached_key) {
return Some((cached_key.clone(), value.clone()));
}
}
}
let entries = self.entries.read();
let result = entries.first_key_value()
.map(|(k, v)| (k.clone(), v.clone()));
if self.config.enable_min_key_cache {
if let Some((ref key, _)) = result {
*self.min_key_cache.write() = Some(key.clone());
}
}
result
}
pub fn pop_min(&self) -> Option<(CompositeQueueKey, V)> {
let result = {
let mut entries = self.entries.write();
entries.pop_first()
};
if result.is_some() {
self.size.fetch_sub(1, Ordering::Relaxed);
if self.config.enable_min_key_cache {
*self.min_key_cache.write() = None;
}
self.version.fetch_add(1, Ordering::Release);
}
result
}
pub fn remove(&self, key: &CompositeQueueKey) -> Option<V> {
let result = {
let mut entries = self.entries.write();
entries.remove(key)
};
if result.is_some() {
self.size.fetch_sub(1, Ordering::Relaxed);
if self.config.enable_min_key_cache {
let should_invalidate = {
let cache = self.min_key_cache.read();
cache.as_ref().map(|c| c == key).unwrap_or(false)
};
if should_invalidate {
*self.min_key_cache.write() = None;
}
}
self.version.fetch_add(1, Ordering::Release);
}
result
}
pub fn get(&self, key: &CompositeQueueKey) -> Option<V> {
self.entries.read().get(key).cloned()
}
pub fn contains(&self, key: &CompositeQueueKey) -> bool {
self.entries.read().contains_key(key)
}
pub fn len(&self) -> usize {
if self.config.enable_size_tracking {
self.size.load(Ordering::Relaxed)
} else {
self.entries.read().len()
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
pub fn scan_by_priority(&self, max_priority: i64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
let entries = self.entries.read();
entries.iter()
.take_while(|(k, _)| k.priority <= max_priority)
.take(limit)
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
pub fn scan_ready(&self, now: u64, limit: usize) -> Vec<(CompositeQueueKey, V)> {
let entries = self.entries.read();
entries.iter()
.filter(|(k, _)| k.timestamp <= now)
.take(limit)
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
pub fn config(&self) -> &QueueIndexConfig {
&self.config
}
}
pub struct QueueTableRegistry {
base: TableIndexRegistry,
queue_configs: RwLock<std::collections::HashMap<String, QueueIndexConfig>>,
}
impl QueueTableRegistry {
pub fn new() -> Self {
Self {
base: TableIndexRegistry::with_default_policy(IndexPolicy::Balanced),
queue_configs: RwLock::new(std::collections::HashMap::new()),
}
}
pub fn register_queue(&self, config: QueueIndexConfig) {
self.base.configure_table(config.base.clone());
self.queue_configs.write().insert(
config.base.table_name.clone(),
config,
);
}
pub fn is_queue(&self, table_name: &str) -> bool {
self.queue_configs.read().contains_key(table_name)
}
pub fn get_queue_config(&self, table_name: &str) -> Option<QueueIndexConfig> {
self.queue_configs.read().get(table_name).cloned()
}
pub fn base(&self) -> &TableIndexRegistry {
&self.base
}
}
impl Default for QueueTableRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct QueueIndexStats {
pub size: usize,
pub inserts: u64,
pub pops: u64,
pub peeks: u64,
pub cache_hit_rate: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_composite_key_ordering() {
let k1 = CompositeQueueKey::new(1, 100, 1, "task1");
let k2 = CompositeQueueKey::new(2, 100, 1, "task2");
let k3 = CompositeQueueKey::new(1, 200, 1, "task3");
let k4 = CompositeQueueKey::new(1, 100, 2, "task4");
assert!(k1 < k2);
assert!(k1 < k3);
assert!(k1 < k4);
}
#[test]
fn test_composite_key_encode_decode() {
let original = CompositeQueueKey::new(-100, 12345, 999, "my-task-id");
let encoded = original.encode();
let decoded = CompositeQueueKey::decode(&encoded).unwrap();
assert_eq!(decoded.priority, original.priority);
assert_eq!(decoded.timestamp, original.timestamp);
assert_eq!(decoded.sequence, original.sequence);
assert_eq!(decoded.task_id, original.task_id);
}
#[test]
fn test_queue_index_insert_pop() {
let config = QueueIndexConfig::new("test_queue");
let index: QueueIndex<String> = QueueIndex::new(config);
index.insert(CompositeQueueKey::new(3, 100, 1, "low"), "low priority".to_string());
index.insert(CompositeQueueKey::new(1, 100, 1, "high"), "high priority".to_string());
index.insert(CompositeQueueKey::new(2, 100, 1, "medium"), "medium priority".to_string());
assert_eq!(index.len(), 3);
let (key, value) = index.pop_min().unwrap();
assert_eq!(key.priority, 1);
assert_eq!(value, "high priority");
let (key, _) = index.pop_min().unwrap();
assert_eq!(key.priority, 2);
let (key, _) = index.pop_min().unwrap();
assert_eq!(key.priority, 3);
assert!(index.is_empty());
}
#[test]
fn test_queue_index_peek() {
let config = QueueIndexConfig::new("test_queue");
let index: QueueIndex<i32> = QueueIndex::new(config);
index.insert(CompositeQueueKey::new(2, 100, 1, "task1"), 1);
index.insert(CompositeQueueKey::new(1, 100, 1, "task2"), 2);
let (key, value) = index.peek_min().unwrap();
assert_eq!(key.priority, 1);
assert_eq!(value, 2);
assert_eq!(index.len(), 2);
let (key, _) = index.peek_min().unwrap();
assert_eq!(key.priority, 1);
}
#[test]
fn test_queue_index_remove() {
let config = QueueIndexConfig::new("test_queue");
let index: QueueIndex<i32> = QueueIndex::new(config);
let key1 = CompositeQueueKey::new(1, 100, 1, "task1");
let key2 = CompositeQueueKey::new(2, 100, 1, "task2");
index.insert(key1.clone(), 1);
index.insert(key2.clone(), 2);
let removed = index.remove(&key1);
assert_eq!(removed, Some(1));
assert_eq!(index.len(), 1);
let (key, _) = index.pop_min().unwrap();
assert_eq!(key.task_id, "task2");
}
#[test]
fn test_scan_by_priority() {
let config = QueueIndexConfig::new("test_queue");
let index: QueueIndex<i32> = QueueIndex::new(config);
for i in 1..=10 {
index.insert(CompositeQueueKey::new(i, 100, 1, format!("task{}", i)), i as i32);
}
let results = index.scan_by_priority(3, 100);
assert_eq!(results.len(), 3);
assert_eq!(results[0].0.priority, 1);
assert_eq!(results[1].0.priority, 2);
assert_eq!(results[2].0.priority, 3);
}
#[test]
fn test_scan_ready() {
let config = QueueIndexConfig::new("test_queue");
let index: QueueIndex<i32> = QueueIndex::new(config);
index.insert(CompositeQueueKey::new(1, 100, 1, "ready1"), 1);
index.insert(CompositeQueueKey::new(1, 200, 1, "ready2"), 2);
index.insert(CompositeQueueKey::new(1, 300, 1, "future"), 3);
let results = index.scan_ready(200, 100);
assert_eq!(results.len(), 2);
}
#[test]
fn test_queue_registry() {
let registry = QueueTableRegistry::new();
let queue_config = QueueIndexConfig::new("task_queue")
.with_priority_column("priority")
.with_timestamp_column("ready_at");
registry.register_queue(queue_config);
assert!(registry.is_queue("task_queue"));
assert!(!registry.is_queue("regular_table"));
let config = registry.get_queue_config("task_queue").unwrap();
assert_eq!(config.priority_column, Some("priority".to_string()));
}
#[test]
fn test_fifo_within_priority() {
let config = QueueIndexConfig::new("test_queue");
let index: QueueIndex<String> = QueueIndex::new(config);
index.insert(CompositeQueueKey::new(1, 100, 3, "third"), "third".to_string());
index.insert(CompositeQueueKey::new(1, 100, 1, "first"), "first".to_string());
index.insert(CompositeQueueKey::new(1, 100, 2, "second"), "second".to_string());
let (_, v1) = index.pop_min().unwrap();
let (_, v2) = index.pop_min().unwrap();
let (_, v3) = index.pop_min().unwrap();
assert_eq!(v1, "first");
assert_eq!(v2, "second");
assert_eq!(v3, "third");
}
}