#![allow(unused)]
use std::sync::{
Arc,
atomic::{
AtomicBool,
AtomicU64,
AtomicUsize,
Ordering,
},
};
use crossbeam_queue::SegQueue;
use crate::compaction::job::CompactionJob;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum JobPriority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
impl JobPriority {
pub fn from_score(score: f64) -> Self {
if score >= 100.0 {
Self::Critical
} else if score >= 50.0 {
Self::High
} else if score >= 10.0 {
Self::Normal
} else {
Self::Low
}
}
}
struct QueuedJob {
job: CompactionJob,
priority: JobPriority,
enqueued_at: std::time::Instant,
}
pub struct CompactionQueue {
critical: SegQueue<Arc<CompactionJob>>,
high: SegQueue<Arc<CompactionJob>>,
normal: SegQueue<Arc<CompactionJob>>,
low: SegQueue<Arc<CompactionJob>>,
total_queued: AtomicUsize,
in_progress: AtomicUsize,
completed: AtomicU64,
shutdown: AtomicBool,
completed_jobs: SegQueue<Arc<CompactionJob>>,
}
impl CompactionQueue {
pub fn new() -> Self {
Self {
critical: SegQueue::new(),
high: SegQueue::new(),
normal: SegQueue::new(),
low: SegQueue::new(),
total_queued: AtomicUsize::new(0),
in_progress: AtomicUsize::new(0),
completed: AtomicU64::new(0),
shutdown: AtomicBool::new(false),
completed_jobs: SegQueue::new(),
}
}
pub fn enqueue(&self, job: CompactionJob) {
if self.shutdown.load(Ordering::Acquire) {
return; }
let priority = JobPriority::from_score(job.score);
let job = Arc::new(job);
match priority {
| JobPriority::Critical => self.critical.push(job),
| JobPriority::High => self.high.push(job),
| JobPriority::Normal => self.normal.push(job),
| JobPriority::Low => self.low.push(job),
}
self.total_queued.fetch_add(1, Ordering::Release);
}
pub fn dequeue(&self) -> Option<Arc<CompactionJob>> {
if self.shutdown.load(Ordering::Acquire) {
return None;
}
let job = self
.critical
.pop()
.or_else(|| self.high.pop())
.or_else(|| self.normal.pop())
.or_else(|| self.low.pop());
let job = match job {
| Some(j) => j,
| None => return None,
};
self.total_queued.fetch_sub(1, Ordering::Release);
self.in_progress.fetch_add(1, Ordering::Release);
Some(job)
}
pub fn mark_completed(&self, job: Arc<CompactionJob>) {
self.in_progress.fetch_sub(1, Ordering::Release);
self.completed.fetch_add(1, Ordering::Release);
self.completed_jobs.push(job);
}
pub fn drain_completed(&self) -> Vec<Arc<CompactionJob>> {
let mut jobs = Vec::new();
while let Some(job) = self.completed_jobs.pop() {
jobs.push(job);
}
jobs
}
pub fn queued_count(&self) -> usize {
self.total_queued.load(Ordering::Acquire)
}
pub fn in_progress_count(&self) -> usize {
self.in_progress.load(Ordering::Acquire)
}
pub fn completed_count(&self) -> u64 {
self.completed.load(Ordering::Acquire)
}
pub fn is_empty(&self) -> bool {
self.queued_count() == 0 && self.in_progress_count() == 0
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Acquire)
}
pub fn drain(&self) -> Vec<Arc<CompactionJob>> {
let mut jobs = Vec::new();
while let Some(job) = self.critical.pop() {
jobs.push(job);
}
while let Some(job) = self.high.pop() {
jobs.push(job);
}
while let Some(job) = self.normal.pop() {
jobs.push(job);
}
while let Some(job) = self.low.pop() {
jobs.push(job);
}
self.total_queued.store(0, Ordering::Release);
jobs
}
pub fn stats(&self) -> QueueStats {
QueueStats {
queued: self.queued_count(),
in_progress: self.in_progress_count(),
completed: self.completed_count(),
is_shutdown: self.is_shutdown(),
}
}
}
impl Default for CompactionQueue {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy)]
pub struct QueueStats {
pub queued: usize,
pub in_progress: usize,
pub completed: u64,
pub is_shutdown: bool,
}
impl std::fmt::Display for QueueStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Queue: {} queued, {} running, {} completed{}",
self.queued,
self.in_progress,
self.completed,
if self.is_shutdown { " [SHUTDOWN]" } else { "" }
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
compaction::job::{
CompactionInput,
CompactionJobType,
CompactionOutput,
},
levels::KeyRange,
};
fn create_test_job(score: f64) -> CompactionJob {
let input = CompactionInput {
level: 0,
segments: vec![],
key_range: KeyRange::new(vec![], vec![], 0),
total_size: 0,
};
let output = CompactionOutput::new(1, 64 * 1024 * 1024);
CompactionJob::new(1, CompactionJobType::Flush, input, None, output, vec![1])
}
#[test]
fn test_queue_creation() {
let queue = CompactionQueue::new();
assert_eq!(queue.queued_count(), 0);
assert_eq!(queue.in_progress_count(), 0);
assert_eq!(queue.completed_count(), 0);
assert!(queue.is_empty());
}
#[test]
fn test_priority_from_score() {
assert_eq!(JobPriority::from_score(150.0), JobPriority::Critical);
assert_eq!(JobPriority::from_score(75.0), JobPriority::High);
assert_eq!(JobPriority::from_score(25.0), JobPriority::Normal);
assert_eq!(JobPriority::from_score(5.0), JobPriority::Low);
}
#[test]
fn test_enqueue_dequeue() {
let queue = CompactionQueue::new();
let mut job = create_test_job(50.0);
job.score = 50.0;
queue.enqueue(job);
assert_eq!(queue.queued_count(), 1);
let job_arc = queue.dequeue().unwrap();
assert_eq!(queue.queued_count(), 0);
assert_eq!(queue.in_progress_count(), 1);
queue.mark_completed(job_arc);
assert_eq!(queue.in_progress_count(), 0);
assert_eq!(queue.completed_count(), 1);
}
#[test]
fn test_priority_ordering() {
let queue = CompactionQueue::new();
let mut low_job = create_test_job(1.0);
low_job.score = 1.0;
queue.enqueue(low_job);
let mut normal_job = create_test_job(15.0);
normal_job.score = 15.0;
queue.enqueue(normal_job);
let mut high_job = create_test_job(60.0);
high_job.score = 60.0;
queue.enqueue(high_job);
let mut critical_job = create_test_job(150.0);
critical_job.score = 150.0;
queue.enqueue(critical_job);
assert_eq!(queue.queued_count(), 4);
assert_eq!(queue.dequeue().unwrap().score, 150.0); assert_eq!(queue.dequeue().unwrap().score, 60.0); assert_eq!(queue.dequeue().unwrap().score, 15.0); assert_eq!(queue.dequeue().unwrap().score, 1.0); assert!(queue.dequeue().is_none());
}
#[test]
fn test_shutdown() {
let queue = CompactionQueue::new();
assert!(!queue.is_shutdown());
queue.shutdown();
assert!(queue.is_shutdown());
let job = create_test_job(50.0);
queue.enqueue(job);
assert_eq!(queue.queued_count(), 0);
}
#[test]
fn test_drain() {
let queue = CompactionQueue::new();
for i in 0..10 {
let mut job = create_test_job(i as f64);
job.score = i as f64;
queue.enqueue(job);
}
assert_eq!(queue.queued_count(), 10);
let drained = queue.drain();
assert_eq!(drained.len(), 10);
assert_eq!(queue.queued_count(), 0);
assert!(queue.is_empty());
}
#[test]
fn test_concurrent_enqueue_dequeue() {
use std::thread;
let queue = Arc::new(CompactionQueue::new());
let mut producers = vec![];
for _ in 0..4 {
let q = queue.clone();
producers.push(thread::spawn(move || {
for i in 0..100 {
let mut job = create_test_job(i as f64);
job.score = i as f64;
q.enqueue(job);
}
}));
}
for p in producers {
p.join().unwrap();
}
let mut consumers = vec![];
for _ in 0..4 {
let q = queue.clone();
consumers.push(thread::spawn(move || {
let mut count = 0;
while let Some(job) = q.dequeue() {
q.mark_completed(job);
count += 1;
}
count
}));
}
let mut total_consumed = 0;
for c in consumers {
total_consumed += c.join().unwrap();
}
assert_eq!(total_consumed, 400);
assert_eq!(queue.completed_count(), 400);
}
#[test]
fn test_stats() {
let queue = CompactionQueue::new();
let job = create_test_job(50.0);
queue.enqueue(job);
let stats = queue.stats();
assert_eq!(stats.queued, 1);
assert_eq!(stats.in_progress, 0);
assert_eq!(stats.completed, 0);
assert!(!stats.is_shutdown);
queue.dequeue();
let stats = queue.stats();
assert_eq!(stats.queued, 0);
assert_eq!(stats.in_progress, 1);
}
#[test]
fn test_drain_completed() {
let queue = CompactionQueue::new();
let job = create_test_job(50.0);
queue.enqueue(job);
let job_arc = queue.dequeue().unwrap();
assert_eq!(queue.drain_completed().len(), 0);
queue.mark_completed(job_arc);
let drained = queue.drain_completed();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, 1);
}
#[test]
fn test_drain_completed_empty() {
let queue = CompactionQueue::new();
assert!(queue.drain_completed().is_empty());
}
#[test]
fn test_multiple_completed_jobs() {
let queue = CompactionQueue::new();
for i in 0..3 {
let mut job = create_test_job(i as f64);
job.id = i + 1;
queue.enqueue(job);
}
let mut completed = Vec::new();
while let Some(job) = queue.dequeue() {
completed.push(job);
}
for job in completed {
queue.mark_completed(job);
}
let drained = queue.drain_completed();
assert_eq!(drained.len(), 3);
}
}