use std::collections::{BinaryHeap, HashMap};
use std::cmp::{Ordering, Reverse};
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use parking_lot::{Mutex, RwLock};
#[inline]
pub fn encode_u64_be(value: u64) -> [u8; 8] {
value.to_be_bytes()
}
#[inline]
pub fn decode_u64_be(bytes: &[u8]) -> u64 {
let mut arr = [0u8; 8];
arr.copy_from_slice(&bytes[..8]);
u64::from_be_bytes(arr)
}
#[inline]
pub fn encode_priority(priority: i64, ascending: bool) -> [u8; 8] {
if ascending {
let mapped = (priority as i128 + i64::MAX as i128 + 1) as u64;
encode_u64_be(mapped)
} else {
let mapped = (i64::MAX as i128 - priority as i128) as u64;
encode_u64_be(mapped)
}
}
#[inline]
pub fn decode_priority(bytes: &[u8], ascending: bool) -> i64 {
let mapped = decode_u64_be(bytes);
if ascending {
(mapped as i128 - i64::MAX as i128 - 1) as i64
} else {
(i64::MAX as i128 - mapped as i128) as i64
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct QueueKey {
pub queue_id: String,
pub priority: i64,
pub ready_ts: u64,
pub sequence: u64,
pub task_id: String,
}
impl QueueKey {
pub fn new(
queue_id: impl Into<String>,
priority: i64,
ready_ts: u64,
sequence: u64,
task_id: impl Into<String>,
) -> Self {
Self {
queue_id: queue_id.into(),
priority,
ready_ts,
sequence,
task_id: task_id.into(),
}
}
pub fn encode(&self, ascending_priority: bool) -> Vec<u8> {
let mut key = Vec::with_capacity(64);
key.extend_from_slice(b"queue/");
key.extend_from_slice(self.queue_id.as_bytes());
key.push(b'/');
key.extend_from_slice(&encode_priority(self.priority, ascending_priority));
key.push(b'/');
key.extend_from_slice(&encode_u64_be(self.ready_ts));
key.push(b'/');
key.extend_from_slice(&encode_u64_be(self.sequence));
key.push(b'/');
key.extend_from_slice(self.task_id.as_bytes());
key
}
pub fn queue_prefix(queue_id: &str) -> Vec<u8> {
let mut prefix = Vec::with_capacity(32);
prefix.extend_from_slice(b"queue/");
prefix.extend_from_slice(queue_id.as_bytes());
prefix.push(b'/');
prefix
}
}
impl Ord for QueueKey {
fn cmp(&self, other: &Self) -> Ordering {
self.queue_id.cmp(&other.queue_id)
.then(self.priority.cmp(&other.priority))
.then(self.ready_ts.cmp(&other.ready_ts))
.then(self.sequence.cmp(&other.sequence))
.then(self.task_id.cmp(&other.task_id))
}
}
impl PartialOrd for QueueKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TaskState {
Pending,
Claimed,
Completed,
DeadLettered,
}
#[derive(Debug, Clone)]
pub struct Task {
pub key: QueueKey,
pub payload: Vec<u8>,
pub state: TaskState,
pub attempts: u32,
pub max_attempts: u32,
pub created_at: u64,
pub claimed_at: Option<u64>,
pub claimed_by: Option<String>,
pub lease_expires_at: Option<u64>,
}
impl Task {
pub fn new(key: QueueKey, payload: Vec<u8>) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
key,
payload,
state: TaskState::Pending,
attempts: 0,
max_attempts: 3,
created_at: now,
claimed_at: None,
claimed_by: None,
lease_expires_at: None,
}
}
pub fn with_max_attempts(mut self, max: u32) -> Self {
self.max_attempts = max;
self
}
pub fn is_visible(&self, now_millis: u64) -> bool {
match self.state {
TaskState::Pending => self.key.ready_ts <= now_millis,
TaskState::Claimed => {
self.lease_expires_at
.map(|exp| now_millis >= exp)
.unwrap_or(false)
}
TaskState::Completed | TaskState::DeadLettered => false,
}
}
pub fn should_dead_letter(&self) -> bool {
self.attempts >= self.max_attempts
}
}
#[derive(Debug, Clone)]
pub struct Claim {
pub task_id: String,
pub owner: String,
pub claimed_at: u64,
pub expires_at: u64,
}
impl Claim {
pub fn new(task_id: impl Into<String>, owner: impl Into<String>, lease_ms: u64) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
task_id: task_id.into(),
owner: owner.into(),
claimed_at: now,
expires_at: now + lease_ms,
}
}
pub fn is_expired(&self, now_millis: u64) -> bool {
now_millis >= self.expires_at
}
pub fn encode_key(queue_id: &str, task_id: &str) -> Vec<u8> {
let mut key = Vec::with_capacity(64);
key.extend_from_slice(b"queue_claim/");
key.extend_from_slice(queue_id.as_bytes());
key.push(b'/');
key.extend_from_slice(task_id.as_bytes());
key
}
}
#[derive(Debug)]
pub enum DequeueResult {
Success(Task),
Empty,
Contention(usize),
Error(String),
}
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub queue_id: String,
pub default_visibility_timeout_ms: u64,
pub max_attempts: u32,
pub ascending_priority: bool,
pub dead_letter_queue_id: Option<String>,
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
queue_id: "default".to_string(),
default_visibility_timeout_ms: 30_000, max_attempts: 3,
ascending_priority: true,
dead_letter_queue_id: None,
}
}
}
impl QueueConfig {
pub fn new(queue_id: impl Into<String>) -> Self {
Self {
queue_id: queue_id.into(),
..Default::default()
}
}
pub fn with_visibility_timeout(mut self, timeout_ms: u64) -> Self {
self.default_visibility_timeout_ms = timeout_ms;
self
}
pub fn with_max_attempts(mut self, max: u32) -> Self {
self.max_attempts = max;
self
}
pub fn with_ascending_priority(mut self, ascending: bool) -> Self {
self.ascending_priority = ascending;
self
}
pub fn with_dead_letter_queue(mut self, dlq_id: impl Into<String>) -> Self {
self.dead_letter_queue_id = Some(dlq_id.into());
self
}
}
pub struct PriorityQueue {
config: QueueConfig,
tasks: RwLock<std::collections::BTreeMap<QueueKey, Task>>,
claims: RwLock<HashMap<String, Claim>>,
sequence: AtomicU64,
}
impl PriorityQueue {
pub fn new(config: QueueConfig) -> Self {
Self {
config,
tasks: RwLock::new(std::collections::BTreeMap::new()),
claims: RwLock::new(HashMap::new()),
sequence: AtomicU64::new(0),
}
}
pub fn queue_id(&self) -> &str {
&self.config.queue_id
}
pub fn enqueue(&self, priority: i64, payload: Vec<u8>) -> Task {
self.enqueue_delayed(priority, payload, 0)
}
pub fn enqueue_delayed(&self, priority: i64, payload: Vec<u8>, delay_ms: u64) -> Task {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let sequence = self.sequence.fetch_add(1, AtomicOrdering::SeqCst);
let task_id = format!("{:016x}{:016x}", now, sequence);
let key = QueueKey::new(
&self.config.queue_id,
priority,
now + delay_ms,
sequence,
task_id,
);
let task = Task::new(key.clone(), payload)
.with_max_attempts(self.config.max_attempts);
self.tasks.write().insert(key, task.clone());
task
}
pub fn dequeue(&self, worker_id: impl Into<String>) -> DequeueResult {
self.dequeue_with_timeout(worker_id, self.config.default_visibility_timeout_ms)
}
pub fn dequeue_with_timeout(
&self,
worker_id: impl Into<String>,
visibility_timeout_ms: u64,
) -> DequeueResult {
let worker = worker_id.into();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
self.cleanup_expired_claims(now);
let mut tasks = self.tasks.write();
let mut claims = self.claims.write();
let mut contention_count = 0;
for (key, task) in tasks.iter_mut() {
if !task.is_visible(now) {
continue;
}
if let Some(claim) = claims.get(&key.task_id) {
if !claim.is_expired(now) && claim.owner != worker {
contention_count += 1;
continue;
}
}
let claim = Claim::new(&key.task_id, &worker, visibility_timeout_ms);
task.state = TaskState::Claimed;
task.attempts += 1;
task.claimed_at = Some(now);
task.claimed_by = Some(worker.clone());
task.lease_expires_at = Some(claim.expires_at);
claims.insert(key.task_id.clone(), claim);
return DequeueResult::Success(task.clone());
}
if contention_count > 0 {
DequeueResult::Contention(contention_count)
} else {
DequeueResult::Empty
}
}
pub fn ack(&self, task_id: &str) -> Result<(), String> {
let mut tasks = self.tasks.write();
let mut claims = self.claims.write();
let key = tasks.iter()
.find(|(_, t)| t.key.task_id == task_id)
.map(|(k, _)| k.clone());
if let Some(key) = key {
tasks.remove(&key);
claims.remove(task_id);
Ok(())
} else {
Err(format!("Task not found: {}", task_id))
}
}
pub fn nack(&self, task_id: &str, new_priority: Option<i64>, delay_ms: Option<u64>) -> Result<(), String> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut tasks = self.tasks.write();
let mut claims = self.claims.write();
let entry = tasks.iter()
.find(|(_, t)| t.key.task_id == task_id)
.map(|(k, t)| (k.clone(), t.clone()));
if let Some((old_key, mut task)) = entry {
if task.should_dead_letter() {
task.state = TaskState::DeadLettered;
tasks.remove(&old_key);
claims.remove(task_id);
return Err(format!("Task dead-lettered after {} attempts", task.attempts));
}
let new_priority = new_priority.unwrap_or(task.key.priority);
let new_ready_ts = delay_ms.map(|d| now + d).unwrap_or(now);
let new_sequence = self.sequence.fetch_add(1, AtomicOrdering::SeqCst);
let new_key = QueueKey::new(
&self.config.queue_id,
new_priority,
new_ready_ts,
new_sequence,
&task.key.task_id,
);
task.key = new_key.clone();
task.state = TaskState::Pending;
task.claimed_at = None;
task.claimed_by = None;
task.lease_expires_at = None;
tasks.remove(&old_key);
tasks.insert(new_key, task);
claims.remove(task_id);
Ok(())
} else {
Err(format!("Task not found: {}", task_id))
}
}
pub fn extend_visibility(&self, task_id: &str, additional_ms: u64) -> Result<(), String> {
let mut claims = self.claims.write();
let mut tasks = self.tasks.write();
if let Some(claim) = claims.get_mut(task_id) {
claim.expires_at += additional_ms;
let entry = tasks.iter_mut()
.find(|(_, t)| t.key.task_id == task_id);
if let Some((_, task)) = entry {
task.lease_expires_at = Some(claim.expires_at);
}
Ok(())
} else {
Err(format!("No active claim for task: {}", task_id))
}
}
pub fn stats(&self) -> QueueStats {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let tasks = self.tasks.read();
let claims = self.claims.read();
let mut pending = 0;
let mut delayed = 0;
let mut inflight = 0;
for task in tasks.values() {
match task.state {
TaskState::Pending => {
if task.key.ready_ts > now {
delayed += 1;
} else {
pending += 1;
}
}
TaskState::Claimed => {
if task.lease_expires_at.map(|exp| now < exp).unwrap_or(false) {
inflight += 1;
} else {
pending += 1; }
}
_ => {}
}
}
QueueStats {
queue_id: self.config.queue_id.clone(),
pending,
delayed,
inflight,
total: tasks.len(),
active_claims: claims.len(),
}
}
fn cleanup_expired_claims(&self, now_millis: u64) {
let expired: Vec<_> = {
let claims = self.claims.read();
claims.iter()
.filter(|(_, c)| c.is_expired(now_millis))
.map(|(id, _)| id.clone())
.collect()
};
let mut tasks = self.tasks.write();
let mut claims = self.claims.write();
for task_id in expired {
claims.remove(&task_id);
for (_, task) in tasks.iter_mut() {
if task.key.task_id == task_id && task.state == TaskState::Claimed {
task.state = TaskState::Pending;
task.claimed_at = None;
task.claimed_by = None;
task.lease_expires_at = None;
break;
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct QueueStats {
pub queue_id: String,
pub pending: usize,
pub delayed: usize,
pub inflight: usize,
pub total: usize,
pub active_claims: usize,
}
pub struct StreamingTopK<T> {
heap: BinaryHeap<HeapEntry<T>>,
k: usize,
ascending: bool,
}
struct HeapEntry<T> {
value: T,
natural_order: bool,
}
impl<T: Ord> PartialEq for HeapEntry<T> {
fn eq(&self, other: &Self) -> bool {
self.value == other.value
}
}
impl<T: Ord> Eq for HeapEntry<T> {}
impl<T: Ord> PartialOrd for HeapEntry<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: Ord> Ord for HeapEntry<T> {
fn cmp(&self, other: &Self) -> Ordering {
if self.natural_order {
self.value.cmp(&other.value)
} else {
other.value.cmp(&self.value)
}
}
}
impl<T: Ord + Clone> StreamingTopK<T> {
pub fn new(k: usize, ascending: bool) -> Self {
Self {
heap: BinaryHeap::with_capacity(k + 1),
k,
ascending,
}
}
pub fn push(&mut self, value: T) {
if self.k == 0 {
return;
}
let entry = HeapEntry {
value,
natural_order: self.ascending,
};
if self.heap.len() < self.k {
self.heap.push(entry);
} else {
if let Some(top) = self.heap.peek() {
let should_replace = if self.ascending {
entry.value < top.value
} else {
entry.value > top.value
};
if should_replace {
self.heap.pop();
self.heap.push(entry);
}
}
}
}
pub fn threshold(&self) -> Option<&T> {
self.heap.peek().map(|e| &e.value)
}
pub fn is_full(&self) -> bool {
self.heap.len() >= self.k
}
pub fn into_sorted_vec(self) -> Vec<T> {
let mut values: Vec<_> = self.heap.into_iter().map(|e| e.value).collect();
if self.ascending {
values.sort();
} else {
values.sort_by(|a, b| b.cmp(a));
}
values
}
pub fn len(&self) -> usize {
self.heap.len()
}
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
}
pub struct MultiColumnTopK<T, F>
where
F: Fn(&T, &T) -> Ordering,
{
heap: Vec<T>,
k: usize,
comparator: F,
}
impl<T: Clone, F: Fn(&T, &T) -> Ordering> MultiColumnTopK<T, F> {
pub fn new(k: usize, comparator: F) -> Self {
Self {
heap: Vec::with_capacity(k + 1),
k,
comparator,
}
}
pub fn push(&mut self, value: T) {
if self.k == 0 {
return;
}
self.heap.push(value);
if self.heap.len() > self.k {
let mut worst_idx = 0;
for i in 1..self.heap.len() {
if (self.comparator)(&self.heap[i], &self.heap[worst_idx]) == Ordering::Greater {
worst_idx = i;
}
}
self.heap.swap_remove(worst_idx);
}
}
pub fn into_sorted_vec(mut self) -> Vec<T> {
self.heap.sort_by(&self.comparator);
self.heap
}
pub fn len(&self) -> usize {
self.heap.len()
}
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OrderByLimitStrategy {
IndexPushdown,
StreamingTopK,
FullSort,
}
impl OrderByLimitStrategy {
pub fn choose(
has_matching_index: bool,
estimated_rows: usize,
limit: usize,
) -> Self {
if has_matching_index {
return OrderByLimitStrategy::IndexPushdown;
}
let use_streaming = limit < 1000 || (limit as f64) < (estimated_rows as f64).sqrt();
if use_streaming {
OrderByLimitStrategy::StreamingTopK
} else {
OrderByLimitStrategy::FullSort
}
}
pub fn description(&self) -> &'static str {
match self {
OrderByLimitStrategy::IndexPushdown =>
"Index Pushdown: O(log N + K) using ordered index",
OrderByLimitStrategy::StreamingTopK =>
"Streaming Top-K: O(N log K) time, O(K) space",
OrderByLimitStrategy::FullSort =>
"Full Sort: O(N log N) time, O(N) space",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_encoding() {
let key1 = QueueKey::new("tasks", 1, 1000, 1, "task-1");
let key2 = QueueKey::new("tasks", 2, 1000, 1, "task-2");
let key3 = QueueKey::new("tasks", 1, 2000, 1, "task-3");
let encoded1 = key1.encode(true);
let encoded2 = key2.encode(true);
let encoded3 = key3.encode(true);
assert!(encoded1 < encoded2, "Priority 1 should come before priority 2");
assert!(encoded1 < encoded3, "Earlier ready_ts should come first");
}
#[test]
fn test_priority_encoding() {
let p1 = encode_priority(-100, true);
let p2 = encode_priority(0, true);
let p3 = encode_priority(100, true);
assert!(p1 < p2, "Negative priority should come first in ascending");
assert!(p2 < p3, "Zero should come before positive in ascending");
let d1 = encode_priority(-100, false);
let d2 = encode_priority(0, false);
let d3 = encode_priority(100, false);
assert!(d3 < d2, "Higher priority should come first in descending");
assert!(d2 < d1, "Zero should come before negative in descending");
}
#[test]
fn test_enqueue_dequeue() {
let config = QueueConfig::new("test-queue");
let queue = PriorityQueue::new(config);
queue.enqueue(3, b"low priority".to_vec());
queue.enqueue(1, b"high priority".to_vec());
queue.enqueue(2, b"medium priority".to_vec());
match queue.dequeue("worker-1") {
DequeueResult::Success(task) => {
assert_eq!(task.key.priority, 1);
assert_eq!(task.payload, b"high priority");
}
_ => panic!("Expected success"),
}
match queue.dequeue("worker-1") {
DequeueResult::Success(task) => {
assert_eq!(task.key.priority, 2);
}
_ => panic!("Expected success"),
}
}
#[test]
fn test_ack_removes_task() {
let config = QueueConfig::new("test-queue");
let queue = PriorityQueue::new(config);
queue.enqueue(1, b"task 1".to_vec());
let task = match queue.dequeue("worker-1") {
DequeueResult::Success(t) => t,
_ => panic!("Expected success"),
};
assert!(queue.ack(&task.key.task_id).is_ok());
match queue.dequeue("worker-1") {
DequeueResult::Empty => {}
_ => panic!("Expected empty"),
}
}
#[test]
fn test_nack_returns_task() {
let config = QueueConfig::new("test-queue")
.with_visibility_timeout(100); let queue = PriorityQueue::new(config);
queue.enqueue(1, b"task 1".to_vec());
let task = match queue.dequeue("worker-1") {
DequeueResult::Success(t) => t,
_ => panic!("Expected success"),
};
assert!(queue.nack(&task.key.task_id, Some(0), None).is_ok());
match queue.dequeue("worker-2") {
DequeueResult::Success(t) => {
assert_eq!(t.key.priority, 0);
assert_eq!(t.attempts, 2); }
_ => panic!("Expected success"),
}
}
#[test]
fn test_streaming_topk_ascending() {
let mut topk = StreamingTopK::new(3, true);
for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
topk.push(i);
}
let result = topk.into_sorted_vec();
assert_eq!(result, vec![1, 2, 3]);
}
#[test]
fn test_streaming_topk_descending() {
let mut topk = StreamingTopK::new(3, false);
for i in [5, 2, 8, 1, 9, 3, 7, 4, 6] {
topk.push(i);
}
let result = topk.into_sorted_vec();
assert_eq!(result, vec![9, 8, 7]);
}
#[test]
fn test_streaming_topk_k1() {
let mut topk = StreamingTopK::new(1, true);
for i in [5, 2, 8, 1, 9, 3] {
topk.push(i);
}
let result = topk.into_sorted_vec();
assert_eq!(result, vec![1]);
}
#[test]
fn test_multi_column_topk() {
#[derive(Clone, Debug, PartialEq)]
struct Task {
priority: i64,
created_at: u64,
id: String,
}
let comparator = |a: &Task, b: &Task| {
match a.priority.cmp(&b.priority) {
Ordering::Equal => b.created_at.cmp(&a.created_at), other => other, }
};
let mut topk = MultiColumnTopK::new(3, comparator);
topk.push(Task { priority: 1, created_at: 100, id: "a".into() });
topk.push(Task { priority: 2, created_at: 200, id: "b".into() });
topk.push(Task { priority: 1, created_at: 200, id: "c".into() }); topk.push(Task { priority: 1, created_at: 150, id: "d".into() });
topk.push(Task { priority: 3, created_at: 100, id: "e".into() });
let result = topk.into_sorted_vec();
assert_eq!(result.len(), 3);
assert_eq!(result[0].id, "c"); assert_eq!(result[1].id, "d"); assert_eq!(result[2].id, "a"); }
#[test]
fn test_strategy_selection() {
assert_eq!(
OrderByLimitStrategy::choose(true, 1_000_000, 10),
OrderByLimitStrategy::IndexPushdown
);
assert_eq!(
OrderByLimitStrategy::choose(false, 1_000_000, 10),
OrderByLimitStrategy::StreamingTopK
);
assert_eq!(
OrderByLimitStrategy::choose(false, 10_000, 5_000),
OrderByLimitStrategy::FullSort
);
assert_eq!(
OrderByLimitStrategy::choose(false, 1_000, 900),
OrderByLimitStrategy::StreamingTopK
);
}
#[test]
fn test_queue_stats() {
let config = QueueConfig::new("test-queue");
let queue = PriorityQueue::new(config);
queue.enqueue(1, b"task 1".to_vec());
queue.enqueue(2, b"task 2".to_vec());
queue.enqueue(3, b"task 3".to_vec());
let stats = queue.stats();
assert_eq!(stats.total, 3);
assert_eq!(stats.pending, 3);
assert_eq!(stats.inflight, 0);
let _ = queue.dequeue("worker-1");
let stats = queue.stats();
assert_eq!(stats.pending, 2);
assert_eq!(stats.inflight, 1);
}
#[test]
fn test_delayed_task() {
let config = QueueConfig::new("test-queue");
let queue = PriorityQueue::new(config);
queue.enqueue_delayed(1, b"delayed task".to_vec(), 3_600_000);
match queue.dequeue("worker-1") {
DequeueResult::Empty => {}
_ => panic!("Delayed task should not be visible"),
}
let stats = queue.stats();
assert_eq!(stats.delayed, 1);
assert_eq!(stats.pending, 0);
}
}