use {
super::{
CollectionConfig,
CollectionFromDef,
Error,
READER,
SyncConfig,
WRITER,
When,
primitives::{Key, OrderedKey, StoreId, Value, Version},
},
crate::{
Group,
GroupId,
Network,
PeerId,
UniqueId,
collections::sync::{
Snapshot,
SnapshotStateMachine,
SnapshotSync,
protocol::SnapshotRequest,
},
groups::{
ApplyContext,
CommandError,
Cursor,
LeadershipPreference,
StateMachine,
},
primitives::{EncodeError, Encoded},
},
core::{
any::type_name,
borrow::Borrow,
hash::Hash,
ops::{Range, RangeBounds},
},
futures::{FutureExt, TryFutureExt},
iroh::endpoint_info::EncodingError,
serde::{Deserialize, Serialize},
std::hash::BuildHasherDefault,
tokio::sync::watch,
};
type HashMap<K, V> =
im::HashMap<K, V, BuildHasherDefault<std::hash::DefaultHasher>>;
pub type PriorityQueueWriter<P, K, V, const CAP: u64 = { u64::MAX }> =
PriorityQueue<P, K, V, CAP, WRITER>;
pub type PriorityQueueReader<P, K, V, const CAP: u64 = { u64::MAX }> =
PriorityQueue<P, K, V, CAP, READER>;
pub type UnboundedPriorityQueue<P, K, V, const IS_WRITER: bool = WRITER> =
PriorityQueue<P, K, V, { u64::MAX }, IS_WRITER>;
pub type BoundedPriorityQueue<
P,
K,
V,
const CAP: u64,
const IS_WRITER: bool = WRITER,
> = PriorityQueue<P, K, V, CAP, IS_WRITER>;
pub struct PriorityQueue<
P: OrderedKey,
K: Key,
V: Value,
const CAP: u64 = { u64::MAX },
const IS_WRITER: bool = WRITER,
> {
when: When,
group: Group<DepqStateMachine<P, K, V, CAP>>,
data: watch::Receiver<PriorityQueueSnapshot<P, K, V>>,
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64, const IS_WRITER: bool>
PriorityQueue<P, K, V, CAP, IS_WRITER>
{
pub fn len(&self) -> usize {
self.data.borrow().by_key.len()
}
pub fn is_empty(&self) -> bool {
self.data.borrow().by_key.is_empty()
}
pub const fn capacity(&self) -> u64 {
CAP
}
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.data.borrow().clone().by_key.contains_key(key)
}
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self
.data
.borrow()
.clone()
.by_key
.get(key)
.map(|(_, v)| v.clone())
}
pub fn get_priority<Q>(&self, key: &Q) -> Option<P>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self
.data
.borrow()
.clone()
.by_key
.get(key)
.map(|(p, _)| p.clone())
}
pub fn get_min(&self) -> Option<(P, K, V)> {
let snap = self.data.borrow().clone();
snap.by_priority.get_min().and_then(|(p, bucket)| {
bucket
.iter()
.next()
.map(|(k, v)| (p.clone(), k.clone(), v.clone()))
})
}
pub fn get_max(&self) -> Option<(P, K, V)> {
let snap = self.data.borrow().clone();
snap.by_priority.get_max().and_then(|(p, bucket)| {
bucket
.iter()
.next()
.map(|(k, v)| (p.clone(), k.clone(), v.clone()))
})
}
pub fn max_priority(&self) -> Option<P> {
self
.data
.borrow()
.clone()
.by_priority
.get_max()
.map(|(p, _)| p.clone())
}
pub fn min_priority(&self) -> Option<P> {
self
.data
.borrow()
.clone()
.by_priority
.get_min()
.map(|(p, _)| p.clone())
}
pub fn iter(&self) -> impl Iterator<Item = (P, K, V)> {
self.iter_asc()
}
pub fn iter_asc(&self) -> impl Iterator<Item = (P, K, V)> {
let snap = self.data.borrow().clone();
snap.by_priority.into_iter().flat_map(|(p, bucket)| {
bucket.into_iter().map(move |(k, v)| (p.clone(), k, v))
})
}
pub fn iter_desc(&self) -> impl Iterator<Item = (P, K, V)> {
let snap = self.data.borrow().clone();
let entries: std::vec::Vec<_> = snap
.by_priority
.into_iter()
.rev()
.flat_map(|(p, bucket)| {
bucket.into_iter().map(move |(k, v)| (p.clone(), k, v))
})
.collect();
entries.into_iter()
}
pub const fn when(&self) -> &When {
&self.when
}
pub fn version(&self) -> Version {
Version(self.group.committed())
}
pub fn group_id(&self) -> &GroupId {
self.group.id()
}
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64>
PriorityQueueWriter<P, K, V, CAP>
{
pub fn writer(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer_with_config(network, store_id, CollectionConfig::default())
}
pub fn writer_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::create::<WRITER>(network, store_id, config.into())
}
pub fn new(network: &Network, store_id: impl Into<StoreId>) -> Self {
Self::writer(network, store_id)
}
pub fn new_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> Self {
Self::writer_with_config(network, store_id, config)
}
pub fn insert(
&self,
priority: P,
key: K,
value: V,
) -> impl Future<Output = Result<Version, Error<(P, K, V)>>> + Send + Sync + 'static
{
self.execute(
DepqCommand::Insert {
priority: Encoded(priority),
key: Encoded(key),
value: Encoded(value),
},
|cmd| match cmd {
DepqCommand::Insert {
priority,
key,
value,
} => Error::Offline((priority.0, key.0, value.0)),
_ => unreachable!(),
},
|cmd, e| match cmd {
DepqCommand::Insert {
priority,
key,
value,
} => Error::Encoding((priority.0, key.0, value.0), e),
_ => unreachable!(),
},
)
}
#[allow(clippy::type_complexity)]
pub fn extend<I>(
&self,
items: I,
) -> impl Future<Output = Result<Version, Error<Vec<(P, K, V)>>>>
+ Send
+ Sync
+ 'static
where
I: IntoIterator<Item = (P, K, V)>,
{
let entries: Vec<(Encoded<P>, Encoded<K>, Encoded<V>)> = items
.into_iter()
.map(|(p, k, v)| (Encoded(p), Encoded(k), Encoded(v)))
.collect();
let is_empty = entries.is_empty();
let current_version = self.group.committed();
let fut = self.execute(
DepqCommand::Extend { entries },
|cmd| match cmd {
DepqCommand::Extend { entries } => Error::Offline(
entries
.into_iter()
.map(|(p, k, v)| (p.0, k.0, v.0))
.collect(),
),
_ => unreachable!(),
},
|cmd, e| match cmd {
DepqCommand::Extend { entries } => Error::Encoding(
entries
.into_iter()
.map(|(p, k, v)| (p.0, k.0, v.0))
.collect(),
e,
),
_ => unreachable!(),
},
);
async move {
if is_empty {
Ok(Version(current_version))
} else {
fut.await
}
}
}
pub fn update_priority(
&self,
key: &K,
new_priority: P,
) -> impl Future<Output = Result<Version, Error<K>>> + Send + Sync + 'static
{
let key = key.clone();
self.execute(
DepqCommand::UpdatePriority {
key: Encoded(key),
priority: Encoded(new_priority),
},
|cmd| match cmd {
DepqCommand::UpdatePriority { key, .. } => Error::Offline(key.0),
_ => unreachable!(),
},
|cmd, e| match cmd {
DepqCommand::UpdatePriority { key, .. } => Error::Encoding(key.0, e),
_ => unreachable!(),
},
)
}
pub fn update_value(
&self,
key: &K,
new_value: V,
) -> impl Future<Output = Result<Version, Error<K>>> + Send + Sync + 'static
{
let key = key.clone();
self.execute(
DepqCommand::UpdateValue {
key: Encoded(key),
value: Encoded(new_value),
},
|cmd| match cmd {
DepqCommand::UpdateValue { key, .. } => Error::Offline(key.0),
_ => unreachable!(),
},
|cmd, e| match cmd {
DepqCommand::UpdateValue { key, .. } => Error::Encoding(key.0, e),
_ => unreachable!(),
},
)
}
pub fn compare_exchange_value(
&self,
key: &K,
expected: V,
new: Option<V>,
) -> impl Future<Output = Result<Version, Error<K>>> + Send + Sync + 'static
{
let key = key.clone();
self.execute(
DepqCommand::CompareExchangeValue {
key: Encoded(key),
expected: Encoded(expected),
new: new.map(Encoded),
},
|cmd| match cmd {
DepqCommand::CompareExchangeValue { key, .. } => Error::Offline(key.0),
_ => unreachable!(),
},
|cmd, e| match cmd {
DepqCommand::CompareExchangeValue { key, .. } => {
Error::Encoding(key.0, e)
}
_ => unreachable!(),
},
)
}
pub fn clear(
&self,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
self.execute(
DepqCommand::Clear,
|_| Error::Offline(()),
|_, _| unreachable!(),
)
}
pub fn remove(
&self,
key: &K,
) -> impl Future<Output = Result<Version, Error<K>>> + Send + Sync + 'static
{
let key = key.clone();
self.execute(
DepqCommand::RemoveKeys {
keys: vec![Encoded(key)],
},
|cmd| match cmd {
DepqCommand::RemoveKeys { mut keys } => {
Error::Offline(keys.remove(0).0)
}
_ => unreachable!(),
},
|cmd, e| match cmd {
DepqCommand::RemoveKeys { mut keys } => {
Error::Encoding(keys.remove(0).0, e)
}
_ => unreachable!(),
},
)
}
pub fn remove_keys(
&self,
keys: impl IntoIterator<Item = K>,
) -> impl Future<Output = Result<Version, Error<Vec<K>>>> + Send + Sync + 'static
{
let keys: Vec<Encoded<K>> = keys.into_iter().map(Encoded).collect();
let is_empty = keys.is_empty();
let current_version = self.group.committed();
let fut = self.execute(
DepqCommand::RemoveKeys { keys },
|cmd| match cmd {
DepqCommand::RemoveKeys { keys } => {
Error::Offline(keys.into_iter().map(|k| k.0).collect())
}
_ => unreachable!(),
},
|cmd, e| match cmd {
DepqCommand::RemoveKeys { keys } => {
Error::Encoding(keys.into_iter().map(|k| k.0).collect(), e)
}
_ => unreachable!(),
},
);
async move {
if is_empty {
Ok(Version(current_version))
} else {
fut.await
}
}
}
pub fn remove_range(
&self,
range: impl RangeBounds<P>,
) -> impl Future<Output = Result<Version, Error<()>>> + Send + Sync + 'static
{
let start =
SerBound::from_std(range.start_bound().map(|r| Encoded(r.clone())));
let end = SerBound::from_std(range.end_bound().map(|r| Encoded(r.clone())));
self.execute(
DepqCommand::RemoveRange { start, end },
|_| Error::Offline(()),
|_, e| Error::Encoding((), e),
)
}
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64, const IS_WRITER: bool>
PriorityQueue<P, K, V, CAP, IS_WRITER>
{
pub fn reader(
network: &Network,
store_id: impl Into<StoreId>,
) -> PriorityQueueReader<P, K, V, CAP> {
Self::reader_with_config(network, store_id, CollectionConfig::default())
}
pub fn reader_with_config(
network: &Network,
store_id: impl Into<StoreId>,
config: impl Into<CollectionConfig>,
) -> PriorityQueueReader<P, K, V, CAP> {
Self::create::<READER>(network, store_id, config.into())
}
fn create<const W: bool>(
network: &Network,
store_id: impl Into<StoreId>,
config: CollectionConfig,
) -> PriorityQueue<P, K, V, CAP, W> {
let store_id = store_id.into();
let machine = DepqStateMachine::<P, K, V, CAP>::new(
store_id, W,
config.sync,
network.local().id(),
);
let data = machine.data();
let mut builder = network
.groups()
.with_key(store_id)
.with_state_machine(machine);
for validator in config.auth {
builder = builder.require_ticket(validator);
}
let group = builder.join();
PriorityQueue::<P, K, V, CAP, W> {
when: When::new(group.when().clone()),
group,
data,
}
}
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64, const WRITER: bool>
CollectionFromDef for PriorityQueue<P, K, V, CAP, WRITER>
{
type Reader = PriorityQueueReader<P, K, V, CAP>;
type Writer = PriorityQueueWriter<P, K, V, CAP>;
fn reader_with_config(
network: &crate::Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Reader {
Self::Reader::reader_with_config(network, store_id, config)
}
fn writer_with_config(
network: &crate::Network,
store_id: StoreId,
config: CollectionConfig,
) -> Self::Writer {
Self::Writer::writer_with_config(network, store_id, config)
}
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64>
PriorityQueueWriter<P, K, V, CAP>
{
fn execute<TErr>(
&self,
command: DepqCommand<P, K, V>,
offline_err: impl FnOnce(DepqCommand<P, K, V>) -> Error<TErr>
+ Send
+ Sync
+ 'static,
encoding_err: impl FnOnce(DepqCommand<P, K, V>, EncodeError) -> Error<TErr>
+ Send
+ Sync
+ 'static,
) -> impl Future<Output = Result<Version, Error<TErr>>> + Send + Sync + 'static
{
self
.group
.execute(command)
.map_err(|e| match e {
CommandError::Offline(mut items) => {
let command = items.remove(0);
offline_err(command)
}
CommandError::Encoding(mut items, err) => {
let command = items.remove(0);
encoding_err(command, err)
}
CommandError::GroupTerminated => Error::NetworkDown,
CommandError::NoCommands => unreachable!(),
})
.map(|pos| pos.map(Version))
}
}
struct DepqStateMachine<
P: OrderedKey,
K: Key,
V: Value,
const CAP: u64 = { u64::MAX },
> {
data: PriorityQueueSnapshot<P, K, V>,
latest: watch::Sender<PriorityQueueSnapshot<P, K, V>>,
store_id: StoreId,
local_id: PeerId,
state_sync: SnapshotSync<Self>,
is_writer: bool,
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64>
DepqStateMachine<P, K, V, CAP>
{
pub fn new(
store_id: StoreId,
is_writer: bool,
sync_config: SyncConfig,
local_id: PeerId,
) -> Self {
let data = PriorityQueueSnapshot::default();
let state_sync = SnapshotSync::new(sync_config, |request| {
DepqCommand::TakeSnapshot(request)
});
let latest = watch::Sender::new(data.clone());
Self {
data,
latest,
store_id,
local_id,
state_sync,
is_writer,
}
}
pub fn data(&self) -> watch::Receiver<PriorityQueueSnapshot<P, K, V>> {
self.latest.subscribe()
}
fn apply_insert(&mut self, priority: P, key: K, value: V) {
if let Some((old_p, _)) = self.data.by_key.get(&key) {
let old_p = old_p.clone();
if let Some(bucket) = self.data.by_priority.get(&old_p) {
let mut bucket = bucket.clone();
bucket.remove(&key);
if bucket.is_empty() {
self.data.by_priority.remove(&old_p);
} else {
self.data.by_priority.insert(old_p, bucket);
}
}
}
self
.data
.by_key
.insert(key.clone(), (priority.clone(), value.clone()));
let bucket = self
.data
.by_priority
.get(&priority)
.cloned()
.unwrap_or_default();
let mut bucket = bucket;
bucket.insert(key, value);
self.data.by_priority.insert(priority, bucket);
while self.data.by_key.len() as u64 > CAP {
if let Some((min_p, min_bucket)) = self
.data
.by_priority
.get_min()
.map(|(p, b)| (p.clone(), b.clone()))
{
if let Some((evict_key, _)) = min_bucket.iter().next() {
let evict_key = evict_key.clone();
self.apply_remove(&evict_key);
} else {
break;
}
} else {
break;
}
}
}
fn apply_remove(&mut self, key: &K) {
if let Some((p, _)) = self.data.by_key.remove(key)
&& let Some(bucket) = self.data.by_priority.get(&p)
{
let mut bucket = bucket.clone();
bucket.remove(key);
if bucket.is_empty() {
self.data.by_priority.remove(&p);
} else {
self.data.by_priority.insert(p, bucket);
}
}
}
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64> StateMachine
for DepqStateMachine<P, K, V, CAP>
{
type Command = DepqCommand<P, K, V>;
type Query = ();
type QueryResult = ();
type StateSync = SnapshotSync<Self>;
fn apply(&mut self, command: Self::Command, ctx: &dyn ApplyContext) {
self.apply_batch([command], ctx);
}
fn apply_batch(
&mut self,
commands: impl IntoIterator<Item = Self::Command>,
ctx: &dyn ApplyContext,
) {
let mut commands_len = 0usize;
let mut sync_requests = vec![];
for command in commands {
match command {
DepqCommand::Clear => {
self.data = PriorityQueueSnapshot::default();
}
DepqCommand::Insert {
priority,
key,
value,
} => {
self.apply_insert(priority.0, key.0, value.0);
}
DepqCommand::CompareExchangeValue { key, expected, new } => {
if let Some((p, old_v)) = self.data.by_key.get(&key) {
if old_v.encode().ok() != expected.0.encode().ok() {
continue;
}
let p = p.clone();
if let Some(new) = new {
self.apply_insert(p, key.0, new.0);
} else {
self.apply_remove(&key.0);
}
}
}
DepqCommand::Extend { entries } => {
for (priority, key, value) in entries {
self.apply_insert(priority.0, key.0, value.0);
}
}
DepqCommand::UpdatePriority { key, priority } => {
if let Some((_, old_v)) = self.data.by_key.get(&key) {
let old_v = old_v.clone();
self.apply_insert(priority.0, key.0, old_v);
}
}
DepqCommand::UpdateValue { key, value } => {
if let Some((p, _)) = self.data.by_key.get(&key) {
let p = p.clone();
self.apply_insert(p, key.0, value.0);
}
}
DepqCommand::RemoveKeys { keys } => {
for key in keys {
self.apply_remove(&key.0);
}
}
DepqCommand::RemoveRange { start, end } => {
let range = (
start.to_std().map(|r| r.0.clone()),
end.to_std().map(|r| r.0.clone()),
);
let keys_to_remove: std::vec::Vec<K> = self
.data
.by_key
.iter()
.filter(|(_, (p, _))| range.contains(p))
.map(|(k, _)| k.clone())
.collect();
for key in keys_to_remove {
self.apply_remove(&key);
}
}
DepqCommand::TakeSnapshot(request) => {
if request.requested_by != self.local_id
&& !self.state_sync.is_expired(&request)
{
sync_requests.push(request);
}
}
}
commands_len += 1;
}
self.latest.send_replace(self.data.clone());
if !sync_requests.is_empty() {
let snapshot = self.create_snapshot();
let position = Cursor::new(
ctx.current_term(),
ctx.committed().index() + commands_len as u64,
);
for request in sync_requests {
self
.state_sync
.serve_snapshot(request, position, snapshot.clone());
}
}
}
fn signature(&self) -> crate::UniqueId {
UniqueId::from("mosaik_collections_depq")
.derive(self.store_id)
.derive(type_name::<P>())
.derive(type_name::<K>())
.derive(type_name::<V>())
.derive(CAP.to_le_bytes())
}
fn query(&self, (): Self::Query) {}
fn state_sync(&self) -> Self::StateSync {
self.state_sync.clone()
}
fn leadership_preference(&self) -> LeadershipPreference {
if self.is_writer {
LeadershipPreference::Normal
} else {
LeadershipPreference::Observer
}
}
}
impl<P: OrderedKey, K: Key, V: Value, const CAP: u64> SnapshotStateMachine
for DepqStateMachine<P, K, V, CAP>
{
type Snapshot = PriorityQueueSnapshot<P, K, V>;
fn create_snapshot(&self) -> Self::Snapshot {
PriorityQueueSnapshot {
by_key: self.data.by_key.clone(),
by_priority: self.data.by_priority.clone(),
}
}
fn install_snapshot(&mut self, snapshot: Self::Snapshot) {
self.data = snapshot;
self.latest.send_replace(self.data.clone());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound = "P: OrderedKey, K: Key, V: Value")]
enum DepqCommand<P, K, V> {
Clear,
Insert {
priority: Encoded<P>,
key: Encoded<K>,
value: Encoded<V>,
},
Extend {
entries: Vec<(Encoded<P>, Encoded<K>, Encoded<V>)>,
},
UpdatePriority {
key: Encoded<K>,
priority: Encoded<P>,
},
UpdateValue {
key: Encoded<K>,
value: Encoded<V>,
},
CompareExchangeValue {
key: Encoded<K>,
expected: Encoded<V>,
new: Option<Encoded<V>>,
},
RemoveKeys {
keys: Vec<Encoded<K>>,
},
RemoveRange {
start: SerBound<Encoded<P>>,
end: SerBound<Encoded<P>>,
},
TakeSnapshot(SnapshotRequest),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum SerBound<T> {
Included(T),
Excluded(T),
Unbounded,
}
impl<T: Clone> SerBound<T> {
fn from_std(b: core::ops::Bound<T>) -> Self {
match b {
core::ops::Bound::Included(v) => Self::Included(v),
core::ops::Bound::Excluded(v) => Self::Excluded(v),
core::ops::Bound::Unbounded => Self::Unbounded,
}
}
const fn to_std(&self) -> core::ops::Bound<&T> {
match self {
Self::Included(v) => core::ops::Bound::Included(v),
Self::Excluded(v) => core::ops::Bound::Excluded(v),
Self::Unbounded => core::ops::Bound::Unbounded,
}
}
}
#[derive(Clone)]
pub struct PriorityQueueSnapshot<P: OrderedKey, K: Key, V: Value> {
by_key: HashMap<K, (P, V)>,
by_priority: im::OrdMap<P, HashMap<K, V>>,
}
impl<P: OrderedKey, K: Key, V: Value> Default
for PriorityQueueSnapshot<P, K, V>
{
fn default() -> Self {
Self {
by_key: HashMap::default(),
by_priority: im::OrdMap::new(),
}
}
}
impl<P: OrderedKey, K: Key, V: Value> Snapshot
for PriorityQueueSnapshot<P, K, V>
{
type Item = (Encoded<P>, Encoded<K>, Encoded<V>);
fn len(&self) -> u64 {
self.by_key.len() as u64
}
fn iter_range(
&self,
range: Range<u64>,
) -> Option<impl Iterator<Item = Self::Item>> {
if range.end > self.by_key.len() as u64 {
return None;
}
Some(
self
.by_key
.clone()
.into_iter()
.map(|(k, (p, v))| (Encoded(p), Encoded(k), Encoded(v)))
.skip(range.start as usize)
.take((range.end - range.start) as usize),
)
}
fn append(&mut self, items: impl IntoIterator<Item = Self::Item>) {
for (priority, key, value) in items {
let (priority, key, value) = (priority.0, key.0, value.0);
self
.by_key
.insert(key.clone(), (priority.clone(), value.clone()));
let mut bucket =
self.by_priority.get(&priority).cloned().unwrap_or_default();
bucket.insert(key, value);
self.by_priority.insert(priority, bucket);
}
}
}