#![allow(unused_variables)]
use crate::eval::value::Value;
use super::comparator::Comparator;
use super::{Container, capacities};
use std::sync::{Arc, RwLock};
use std::cmp::Ordering;
#[derive(Clone, Debug)]
struct HeapEntry {
value: Value,
priority: Value,
}
impl HeapEntry {
fn new(value: Value, priority: Value) -> Self {
Self { value, priority }
}
}
#[derive(Clone, Debug)]
pub struct PriorityQueue {
heap: Vec<HeapEntry>,
comparator: Comparator,
is_max_heap: bool,
name: Option<String>,
}
impl PriorityQueue {
pub fn new() -> Self {
Self::with_capacity_and_comparator(
capacities::DEFAULT_PRIORITY_QUEUE_CAPACITY,
Comparator::with_default(),
true,
)
}
pub fn new_min_heap() -> Self {
Self::with_capacity_and_comparator(
capacities::DEFAULT_PRIORITY_QUEUE_CAPACITY,
Comparator::with_default(),
false,
)
}
pub fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_comparator(capacity, Comparator::with_default(), true)
}
pub fn with_comparator(comparator: Comparator) -> Self {
Self::with_capacity_and_comparator(
capacities::DEFAULT_PRIORITY_QUEUE_CAPACITY,
comparator,
true,
)
}
pub fn with_capacity_and_comparator(
capacity: usize,
comparator: Comparator,
is_max_heap: bool,
) -> Self {
Self {
heap: Vec::with_capacity(capacity),
comparator,
is_max_heap,
name: None,
}
}
pub fn with_name(name: impl Into<String>) -> Self {
let mut queue = Self::new();
queue.name = Some(name.into());
queue
}
pub fn insert(&mut self, value: Value, priority: Value) {
let entry = HeapEntry::new(value, priority);
self.heap.push(entry);
self.heapify_up(self.heap.len() - 1);
}
pub fn extract(&mut self) -> Option<(Value, Value)> {
if self.heap.is_empty() {
return None;
}
if self.heap.len() == 1 {
let entry = self.heap.pop().unwrap();
return Some((entry.value, entry.priority));
}
let last = self.heap.pop().unwrap();
let root = if !self.heap.is_empty() {
let old_root = std::mem::replace(&mut self.heap[0], last);
self.heapify_down(0);
old_root
} else {
last
};
Some((root.value, root.priority))
}
pub fn peek(&self) -> Option<(&Value, &Value)> {
self.heap.first().map(|entry| (&entry.value, &entry.priority))
}
pub fn change_priority(&mut self, target: &Value, new_priority: Value) -> bool {
if let Some(index) = self.find_value_index(target) {
let old_priority = std::mem::replace(&mut self.heap[index].priority, new_priority.clone());
let cmp = self.compare_priorities(&new_priority, &old_priority);
match (self.is_max_heap, cmp) {
(true, Ordering::Greater) | (false, Ordering::Less) => {
self.heapify_up(index);
}
(true, Ordering::Less) | (false, Ordering::Greater) => {
self.heapify_down(index);
}
_ => {} }
true
} else {
false
}
}
pub fn remove(&mut self, target: &Value) -> Option<Value> {
if let Some(index) = self.find_value_index(target) {
if index == self.heap.len() - 1 {
let removed_entry = self.heap.pop().unwrap();
Some(removed_entry.value)
} else {
let last = self.heap.pop().unwrap();
let old_entry = std::mem::replace(&mut self.heap[index], last);
let removed = old_entry.value;
let old_priority = old_entry.priority;
let cmp = self.compare_priorities(&self.heap[index].priority, &old_priority);
match (self.is_max_heap, cmp) {
(true, Ordering::Greater) | (false, Ordering::Less) => {
self.heapify_up(index);
}
_ => {
self.heapify_down(index);
}
}
Some(removed)
}
} else {
None
}
}
pub fn contains(&self, target: &Value) -> bool {
self.find_value_index(target).is_some()
}
pub fn values(&self) -> Vec<Value> {
self.heap.iter().map(|entry| entry.value.clone()).collect()
}
pub fn value_refs(&self) -> Vec<&Value> {
self.heap.iter().map(|entry| &entry.value).collect()
}
pub fn priorities(&self) -> Vec<Value> {
self.heap.iter().map(|entry| entry.priority.clone()).collect()
}
pub fn priority_refs(&self) -> Vec<&Value> {
self.heap.iter().map(|entry| &entry.priority).collect()
}
pub fn entries(&self) -> Vec<(Value, Value)> {
self.heap
.iter()
.map(|entry| (entry.value.clone(), entry.priority.clone()))
.collect()
}
pub fn entry_refs(&self) -> Vec<(&Value, &Value)> {
self.heap
.iter()
.map(|entry| (&entry.value, &entry.priority))
.collect()
}
pub fn iter(&self) -> impl Iterator<Item = (&Value, &Value)> {
self.heap.iter().map(|entry| (&entry.value, &entry.priority))
}
pub fn drain_sorted(&mut self) -> Vec<(Value, Value)> {
let mut result = Vec::with_capacity(self.heap.len());
while let Some(entry) = self.extract() {
result.push(entry);
}
result
}
pub fn to_sorted_vec(&self) -> Vec<(Value, Value)> {
let mut clone = self.clone();
clone.drain_sorted()
}
pub fn capacity(&self) -> usize {
self.heap.capacity()
}
pub fn reserve(&mut self, additional: usize) {
self.heap.reserve(additional);
}
pub fn shrink_to_fit(&mut self) {
self.heap.shrink_to_fit();
}
fn find_value_index(&self, target: &Value) -> Option<usize> {
self.heap
.iter()
.position(|entry| entry.value == *target)
}
fn compare_priorities(&self, a: &Value, b: &Value) -> Ordering {
let cmp = self.comparator.compare(a, b);
if self.is_max_heap {
cmp
} else {
cmp.reverse()
}
}
fn heapify_up(&mut self, mut index: usize) {
while index > 0 {
let parent_index = (index - 1) / 2;
let cmp = self.compare_priorities(
&self.heap[index].priority,
&self.heap[parent_index].priority,
);
if cmp == Ordering::Greater {
self.heap.swap(index, parent_index);
index = parent_index;
} else {
break;
}
}
}
fn heapify_down(&mut self, mut index: usize) {
loop {
let left_child = 2 * index + 1;
let right_child = 2 * index + 2;
let mut largest = index;
if left_child < self.heap.len() {
let cmp = self.compare_priorities(
&self.heap[left_child].priority,
&self.heap[largest].priority,
);
if cmp == Ordering::Greater {
largest = left_child;
}
}
if right_child < self.heap.len() {
let cmp = self.compare_priorities(
&self.heap[right_child].priority,
&self.heap[largest].priority,
);
if cmp == Ordering::Greater {
largest = right_child;
}
}
if largest != index {
self.heap.swap(index, largest);
index = largest;
} else {
break;
}
}
}
pub fn from_vec(mut entries: Vec<(Value, Value)>) -> Self {
let comparator = Comparator::with_default();
let heap: Vec<HeapEntry> = entries
.drain(..)
.map(|(value, priority)| HeapEntry::new(value, priority))
.collect();
let mut queue = Self {
heap,
comparator,
is_max_heap: true,
name: None,
};
if queue.heap.len() > 1 {
for i in (0..=((queue.heap.len() - 2) / 2)).rev() {
queue.heapify_down(i);
}
}
queue
}
pub fn merge(&mut self, other: &Self) {
for entry in &other.heap {
self.insert(entry.value.clone(), entry.priority.clone());
}
}
pub fn union(&self, other: &Self) -> Self {
let mut result = self.clone();
result.merge(other);
result
}
}
impl Container for PriorityQueue {
fn len(&self) -> usize {
self.heap.len()
}
fn clear(&mut self) {
self.heap.clear();
}
}
impl Default for PriorityQueue {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
pub struct ThreadSafePriorityQueue {
inner: Arc<RwLock<PriorityQueue>>,
}
impl ThreadSafePriorityQueue {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(PriorityQueue::new())),
}
}
pub fn new_min_heap() -> Self {
Self {
inner: Arc::new(RwLock::new(PriorityQueue::new_min_heap())),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
inner: Arc::new(RwLock::new(PriorityQueue::with_capacity(capacity))),
}
}
pub fn with_comparator(comparator: Comparator) -> Self {
Self {
inner: Arc::new(RwLock::new(PriorityQueue::with_comparator(comparator))),
}
}
pub fn insert(&self, value: Value, priority: Value) {
self.inner.write().unwrap().insert(value, priority);
}
pub fn extract(&self) -> Option<(Value, Value)> {
self.inner.write().unwrap().extract()
}
pub fn peek(&self) -> Option<(Value, Value)> {
self.inner
.read()
.unwrap()
.peek()
.map(|(v, p)| (v.clone(), p.clone()))
}
pub fn change_priority(&self, target: &Value, new_priority: Value) -> bool {
self.inner
.write()
.unwrap()
.change_priority(target, new_priority)
}
pub fn remove(&self, target: &Value) -> Option<Value> {
self.inner.write().unwrap().remove(target)
}
pub fn contains(&self, target: &Value) -> bool {
self.inner.read().unwrap().contains(target)
}
pub fn len(&self) -> usize {
self.inner.read().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.inner.read().unwrap().is_empty()
}
pub fn clear(&self) {
self.inner.write().unwrap().clear();
}
pub fn values(&self) -> Vec<Value> {
self.inner.read().unwrap().values()
}
pub fn priorities(&self) -> Vec<Value> {
self.inner.read().unwrap().priorities()
}
pub fn entries(&self) -> Vec<(Value, Value)> {
self.inner.read().unwrap().entries()
}
pub fn drain_sorted(&self) -> Vec<(Value, Value)> {
self.inner.write().unwrap().drain_sorted()
}
pub fn to_sorted_vec(&self) -> Vec<(Value, Value)> {
self.inner.read().unwrap().to_sorted_vec()
}
pub fn capacity(&self) -> usize {
self.inner.read().unwrap().capacity()
}
pub fn with_read<F, R>(&self, f: F) -> R
where
F: FnOnce(&PriorityQueue) -> R,
{
f(&self.inner.read().unwrap())
}
pub fn with_write<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut PriorityQueue) -> R,
{
f(&mut self.inner.write().unwrap())
}
}
impl Default for ThreadSafePriorityQueue {
fn default() -> Self {
Self::new()
}
}
impl PriorityQueue {
pub fn top_k(&self, k: usize) -> Vec<(Value, Value)> {
let mut clone = self.clone();
let mut result = Vec::with_capacity(k.min(self.len()));
for _ in 0..k.min(self.len()) {
if let Some(entry) = clone.extract() {
result.push(entry);
} else {
break;
}
}
result
}
pub fn extract_above_threshold(&mut self, threshold: &Value) -> Vec<(Value, Value)> {
let mut result = Vec::new();
while let Some((value, priority)) = self.peek() {
let cmp = self.compare_priorities(priority, threshold);
if cmp == Ordering::Greater {
if let Some(entry) = self.extract() {
result.push(entry);
}
} else {
break;
}
}
result
}
pub fn filter<F>(&self, mut predicate: F) -> Self
where
F: FnMut(&Value, &Value) -> bool,
{
let filtered_entries: Vec<_> = self
.heap
.iter()
.filter(|entry| predicate(&entry.value, &entry.priority))
.map(|entry| (entry.value.clone(), entry.priority.clone()))
.collect();
Self::from_vec(filtered_entries)
}
pub fn map<F>(&self, mut f: F) -> Self
where
F: FnMut(&Value, &Value) -> (Value, Value),
{
let mapped_entries: Vec<_> = self
.heap
.iter()
.map(|entry| f(&entry.value, &entry.priority))
.collect();
Self::from_vec(mapped_entries)
}
pub fn fold<F, Acc>(&self, init: Acc, mut f: F) -> Acc
where
F: FnMut(Acc, &Value, &Value) -> Acc,
{
let sorted = self.to_sorted_vec();
sorted
.iter()
.fold(init, |acc, (value, priority)| f(acc, value, priority))
}
pub fn partition<F>(&self, mut predicate: F) -> (Self, Self)
where
F: FnMut(&Value, &Value) -> bool,
{
let mut true_entries = Vec::new();
let mut false_entries = Vec::new();
for entry in &self.heap {
if predicate(&entry.value, &entry.priority) {
true_entries.push((entry.value.clone(), entry.priority.clone()));
} else {
false_entries.push((entry.value.clone(), entry.priority.clone()));
}
}
(Self::from_vec(true_entries), Self::from_vec(false_entries))
}
pub fn min_max(&self) -> Option<((Value, Value), (Value, Value))> {
if self.is_empty() {
return None;
}
let mut min_entry = &self.heap[0];
let mut max_entry = &self.heap[0];
for entry in &self.heap[1..] {
match self.comparator.compare(&entry.priority, &min_entry.priority) {
Ordering::Less => min_entry = entry,
Ordering::Greater => {
if self.comparator.compare(&entry.priority, &max_entry.priority) == Ordering::Greater {
max_entry = entry;
}
}
Ordering::Equal => {}
}
}
Some((
(min_entry.value.clone(), min_entry.priority.clone()),
(max_entry.value.clone(), max_entry.priority.clone()),
))
}
pub fn stats(&self) -> PriorityQueueStats {
let depth = if self.heap.is_empty() {
0
} else {
(self.heap.len() as f64).log2().ceil() as usize
};
let avg_priority = if self.heap.is_empty() {
None
} else {
let sum: f64 = self
.heap
.iter()
.filter_map(|entry| entry.priority.as_number())
.sum();
Some(sum / self.heap.len() as f64)
};
PriorityQueueStats {
size: self.heap.len(),
capacity: self.heap.capacity(),
depth,
is_max_heap: self.is_max_heap,
avg_priority,
}
}
}
#[derive(Debug, Clone)]
pub struct PriorityQueueStats {
pub size: usize,
pub capacity: usize,
pub depth: usize,
pub is_max_heap: bool,
pub avg_priority: Option<f64>,
}
impl std::fmt::Display for PriorityQueueStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PriorityQueue Stats: size={}, capacity={}, depth={}, type={}, avg_priority={:?}",
self.size,
self.capacity,
self.depth,
if self.is_max_heap { "max-heap" } else { "min-heap" },
self.avg_priority
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_operations() {
let mut pq = PriorityQueue::new();
assert!(pq.is_empty());
assert_eq!(pq.len(), 0);
pq.insert(Value::string("low"), Value::number(1.0));
pq.insert(Value::string("high"), Value::number(10.0));
pq.insert(Value::string("medium"), Value::number(5.0));
assert_eq!(pq.len(), 3);
assert_eq!(pq.peek(), Some((&Value::string("high"), &Value::number(10.0))));
assert_eq!(pq.extract(), Some((Value::string("high"), Value::number(10.0))));
assert_eq!(pq.extract(), Some((Value::string("medium"), Value::number(5.0))));
assert_eq!(pq.extract(), Some((Value::string("low"), Value::number(1.0))));
assert!(pq.is_empty());
}
#[test]
fn test_min_heap() {
let mut pq = PriorityQueue::new_min_heap();
pq.insert(Value::string("high"), Value::number(10.0));
pq.insert(Value::string("low"), Value::number(1.0));
pq.insert(Value::string("medium"), Value::number(5.0));
assert_eq!(pq.extract(), Some((Value::string("low"), Value::number(1.0))));
assert_eq!(pq.extract(), Some((Value::string("medium"), Value::number(5.0))));
assert_eq!(pq.extract(), Some((Value::string("high"), Value::number(10.0))));
}
#[test]
fn test_change_priority() {
let mut pq = PriorityQueue::new();
pq.insert(Value::string("a"), Value::number(1.0));
pq.insert(Value::string("b"), Value::number(2.0));
pq.insert(Value::string("c"), Value::number(3.0));
assert!(pq.change_priority(&Value::string("a"), Value::number(10.0)));
assert_eq!(pq.peek(), Some((&Value::string("a"), &Value::number(10.0))));
assert!(!pq.change_priority(&Value::string("d"), Value::number(5.0)));
}
#[test]
fn test_remove() {
let mut pq = PriorityQueue::new();
pq.insert(Value::string("a"), Value::number(1.0));
pq.insert(Value::string("b"), Value::number(2.0));
pq.insert(Value::string("c"), Value::number(3.0));
assert!(pq.contains(&Value::string("b")));
assert_eq!(pq.remove(&Value::string("b")), Some(Value::string("b")));
assert!(!pq.contains(&Value::string("b")));
assert_eq!(pq.len(), 2);
assert_eq!(pq.remove(&Value::string("d")), None);
}
#[test]
fn test_from_vec() {
let entries = vec![
(Value::string("a"), Value::number(3.0)),
(Value::string("b"), Value::number(1.0)),
(Value::string("c"), Value::number(4.0)),
(Value::string("d"), Value::number(2.0)),
];
let pq = PriorityQueue::from_vec(entries);
assert_eq!(pq.len(), 4);
let sorted = pq.to_sorted_vec();
assert_eq!(sorted[0], (Value::string("c"), Value::number(4.0)));
assert_eq!(sorted[1], (Value::string("a"), Value::number(3.0)));
assert_eq!(sorted[2], (Value::string("d"), Value::number(2.0)));
assert_eq!(sorted[3], (Value::string("b"), Value::number(1.0)));
}
#[test]
fn test_merge() {
let mut pq1 = PriorityQueue::new();
pq1.insert(Value::string("a"), Value::number(1.0));
pq1.insert(Value::string("b"), Value::number(3.0));
let mut pq2 = PriorityQueue::new();
pq2.insert(Value::string("c"), Value::number(2.0));
pq2.insert(Value::string("d"), Value::number(4.0));
pq1.merge(&pq2);
assert_eq!(pq1.len(), 4);
let sorted = pq1.to_sorted_vec();
assert_eq!(sorted[0].1, Value::number(4.0));
assert_eq!(sorted[3].1, Value::number(1.0));
}
#[test]
fn test_thread_safe_priority_queue() {
let pq = ThreadSafePriorityQueue::new();
pq.insert(Value::string("test"), Value::number(5.0));
assert_eq!(pq.len(), 1);
assert!(pq.contains(&Value::string("test")));
let (value, priority) = pq.extract().unwrap();
assert_eq!(value, Value::string("test"));
assert_eq!(priority, Value::number(5.0));
assert!(pq.is_empty());
}
#[test]
fn test_specialized_operations() {
let mut pq = PriorityQueue::new();
for i in 1..=10 {
pq.insert(Value::number(i as f64), Value::number(i as f64));
}
let top3 = pq.top_k(3);
assert_eq!(top3.len(), 3);
assert_eq!(top3[0].1, Value::number(10.0));
assert_eq!(top3[2].1, Value::number(8.0));
let above_5 = pq.extract_above_threshold(&Value::number(5.0));
assert!(above_5.len() >= 5);
let evens = pq.filter(|_, priority| {
if let Some(n) = priority.as_number() {
n as i64 % 2 == 0
} else {
false
}
});
let (odds, evens) = pq.partition(|_, priority| {
if let Some(n) = priority.as_number() {
n as i64 % 2 == 1
} else {
false
}
});
assert!(odds.len() > 0);
assert!(evens.len() > 0);
}
#[test]
fn test_large_priority_queue() {
let mut pq = PriorityQueue::with_capacity(1000);
for i in 0..1000 {
pq.insert(Value::number(i as f64), Value::number((i * 17) as f64 % 1000.0));
}
assert_eq!(pq.len(), 1000);
let mut last_priority = f64::INFINITY;
for _ in 0..1000 {
if let Some((_, priority)) = pq.extract() {
let p = priority.as_number().unwrap();
assert!(p <= last_priority);
last_priority = p;
}
}
assert!(pq.is_empty());
}
#[test]
fn test_stats() {
let mut pq = PriorityQueue::new();
for i in 1..=5 {
pq.insert(Value::string(format!("item{}", i)), Value::number(i as f64));
}
let stats = pq.stats();
assert_eq!(stats.size, 5);
assert!(stats.depth > 0);
assert!(stats.is_max_heap);
assert!(stats.avg_priority.is_some());
assert_eq!(stats.avg_priority.unwrap(), 3.0);
}
}