# PriorityQueue
`PriorityQueue<P, K, V>` is a replicated, eventually consistent double-ended
priority queue (DEPQ). Each entry has a **priority** (`P`), a unique **key**
(`K`), and a **value** (`V`). It supports efficient access to both the minimum
and maximum priority elements, key-based lookups, priority updates, and range
removals.
Internally it maintains two indexes:
- `by_key: im::HashMap<K, (P, V)>` — O(log n) key lookups
- `by_priority: im::OrdMap<P, im::HashMap<K, V>>` — O(log n) min/max and range
operations
Both indexes use deterministic hashers for consistent iteration order across
nodes.
## Construction
```rust,ignore
use mosaik::collections::{PriorityQueue, StoreId, SyncConfig};
// Writer — can read and write
let pq = PriorityQueue::<u64, String, Order>::writer(
&network,
StoreId::from("orderbook"),
);
// Writer with custom sync config
let pq = PriorityQueue::<u64, String, Order>::writer_with_config(
&network, store_id, config,
);
// Reader — read-only, deprioritized for leadership
let pq = PriorityQueue::<u64, String, Order>::reader(&network, store_id);
// Reader with custom sync config
let pq = PriorityQueue::<u64, String, Order>::reader_with_config(
&network, store_id, config,
);
// Aliases
let pq = PriorityQueue::<u64, String, Order>::new(&network, store_id);
let pq = PriorityQueue::<u64, String, Order>::new_with_config(
&network, store_id, config,
);
```
## Read operations
Available on both writers and readers.
| `len() -> usize` | O(1) | Number of entries |
| `is_empty() -> bool` | O(1) | Whether the queue is empty |
| `contains_key(&K) -> bool` | O(log n) | Test if a key exists |
| `get(&K) -> Option<V>` | O(log n) | Get the value for a key |
| `get_priority(&K) -> Option<P>` | O(log n) | Get the priority for a key |
| `get_min() -> Option<(P, K, V)>` | O(log n) | Entry with the lowest priority |
| `get_max() -> Option<(P, K, V)>` | O(log n) | Entry with the highest priority |
| `min_priority() -> Option<P>` | O(log n) | Lowest priority value |
| `max_priority() -> Option<P>` | O(log n) | Highest priority value |
| `iter() -> impl Iterator<Item = (P, K, V)>` | — | Ascending priority order (alias for `iter_asc`) |
| `iter_asc() -> impl Iterator<Item = (P, K, V)>` | — | Ascending priority order |
| `iter_desc() -> impl Iterator<Item = (P, K, V)>` | — | Descending priority order |
| `version() -> Version` | O(1) | Current committed state version |
| `when() -> &When` | O(1) | Access the state observer |
When multiple entries share the same priority, `get_min()` and `get_max()`
return an arbitrary entry from that priority bucket.
```rust,ignore
// Peek at extremes
if let Some((priority, key, value)) = pq.get_min() {
println!("Best bid: {key} at priority {priority}");
}
// Look up by key
let price = pq.get_priority(&"order-42".into());
// Iterate in order
for (priority, key, value) in pq.iter_desc() {
println!("{priority}: {key} = {value:?}");
}
```
## Write operations
Only available on `PriorityQueueWriter<P, K, V>`.
| `insert(P, K, V) -> Result<Version, Error<(P, K, V)>>` | Insert or update an entry |
| `extend(impl IntoIterator<Item = (P, K, V)>) -> Result<Version, Error<Vec<(P, K, V)>>>` | Batch insert |
| `update_priority(&K, P) -> Result<Version, Error<K>>` | Change priority of an existing key |
| `update_value(&K, V) -> Result<Version, Error<K>>` | Change value of an existing key |
| `compare_exchange_value(&K, V, Option<V>) -> Result<Version, Error<K>>` | Atomic compare-and-swap on value |
| `remove(&K) -> Result<Version, Error<K>>` | Remove by key |
| `remove_range(impl RangeBounds<P>) -> Result<Version, Error<()>>` | Remove all entries in a priority range |
| `clear() -> Result<Version, Error<()>>` | Remove all entries |
If `insert` is called with a key that already exists, both its priority and
value are updated. `update_priority` and `update_value` are no-ops if the key
doesn't exist (they still commit to the log).
```rust,ignore
// Insert
let v = pq.insert(100, "order-1".into(), order).await?;
// Batch insert
let v = pq.extend([
(100, "order-1".into(), order1),
(200, "order-2".into(), order2),
]).await?;
// Update just the priority
pq.update_priority(&"order-1".into(), 150).await?;
// Update just the value
pq.update_value(&"order-1".into(), new_order).await?;
// Atomic compare-and-swap on value (priority is preserved)
let v = pq.compare_exchange_value(
&"order-1".into(),
order1, // expected current value
Some(updated), // new value
).await?;
// Compare-and-swap to remove: expected matches, new is None
let v = pq.compare_exchange_value(
&"order-1".into(),
updated, // expected current value
None, // removes the entry
).await?;
// Remove a single entry
pq.remove(&"order-2".into()).await?;
// Remove all entries with priority below 50
pq.remove_range(..50u64).await?;
// Remove entries in a range
pq.remove_range(10..=20).await?;
// Clear everything
pq.clear().await?;
```
### Range syntax
`remove_range` accepts any `RangeBounds<P>`, so all standard Rust range
syntaxes work:
| `..cutoff` | Priorities below `cutoff` |
| `..=cutoff` | Priorities at or below `cutoff` |
| `cutoff..` | Priorities at or above `cutoff` |
| `lo..hi` | Priorities in `[lo, hi)` |
| `lo..=hi` | Priorities in `[lo, hi]` |
| `..` | All (equivalent to `clear()`) |
### Compare-and-swap semantics
`compare_exchange_value` atomically checks the value of an existing entry and
only applies the mutation if it matches the `expected` parameter. Unlike
`compare_exchange` on `Map` and `Cell`, this method operates **only on the
value** — the entry's priority is always preserved.
- **`key`**: The key of the entry to operate on (must already exist).
- **`expected`**: The expected current value (type `V`, not `Option<V>` — the
key must exist for the exchange to succeed).
- **`new`**: The replacement value. `Some(v)` updates the value in-place;
`None` removes the entire entry.
If the key does not exist or its current value does not match `expected`, the
operation is a **no-op** — it commits to the Raft log but does not change the
queue.
> **Note:** The entry's priority is never changed by `compare_exchange_value`.
> To atomically update priorities, use `update_priority` instead.
## Error handling
Same pattern as other collections — failed values are returned for retry:
```rust,ignore
match pq.insert(priority, key, value).await {
Ok(version) => { /* committed */ }
Err(Error::Offline((priority, key, value))) => {
// Retry later
}
Err(Error::NetworkDown) => {
// Permanent failure
}
}
```
## Status & observation
```rust,ignore
pq.when().online().await;
let v = pq.insert(100, "k".into(), val).await?;
pq.when().reaches(v).await;
pq.when().updated().await;
pq.when().offline().await;
```
## Dual-index architecture
The DEPQ maintains two synchronized indexes:
```text
by_key: HashMap<K, (P, V)> ← key lookups, membership tests
by_priority: OrdMap<P, HashMap<K, V>> ← min/max, range ops, ordered iteration
```
When a key is inserted or updated, both indexes are updated atomically within
the state machine's `apply_batch`. During snapshot sync, only the `by_key`
index is serialized; the `by_priority` index is reconstructed on the receiving
side during `append`.
## Group identity
```text
UniqueId::from("mosaik_collections_depq")
.derive(store_id)
.derive(type_name::<P>())
.derive(type_name::<K>())
.derive(type_name::<V>())
```