#![allow(clippy::expect_used)]
use crate::types::Batch;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::sync::{Condvar, Mutex};
pub(crate) struct PriorityQueue {
inner: Mutex<Inner>,
cv: Condvar,
}
struct Inner {
heap: BinaryHeap<Item>,
closed: bool,
}
struct Item {
size: u64,
batch: Batch,
}
impl PartialEq for Item {
fn eq(&self, other: &Self) -> bool {
self.size == other.size
}
}
impl Eq for Item {}
impl PartialOrd for Item {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for Item {
fn cmp(&self, other: &Self) -> Ordering {
self.size.cmp(&other.size)
}
}
impl PriorityQueue {
pub(crate) fn new() -> Self {
Self {
inner: Mutex::new(Inner {
heap: BinaryHeap::new(),
closed: false,
}),
cv: Condvar::new(),
}
}
pub(crate) fn push(&self, batch: Batch) {
let size = batch.size_bytes();
let mut inner = self.inner.lock().expect("queue mutex poisoned");
inner.heap.push(Item { size, batch });
self.cv.notify_one();
}
pub(crate) fn close(&self) {
let mut inner = self.inner.lock().expect("queue mutex poisoned");
inner.closed = true;
self.cv.notify_all();
}
pub(crate) fn pop(&self) -> Option<Batch> {
let mut inner = self.inner.lock().expect("queue mutex poisoned");
loop {
if let Some(item) = inner.heap.pop() {
return Some(item.batch);
}
if inner.closed {
return None;
}
inner = self.cv.wait(inner).expect("queue mutex poisoned");
}
}
}