use std::collections::{btree_map::Entry, BTreeMap, BTreeSet};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum KeyStatus {
Materialized,
Pending,
Missing,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LookupOutcome<K>
where
K: Ord + Clone,
{
Hit(K),
Upquery(K),
AlreadyPending(K),
}
impl<K> LookupOutcome<K>
where
K: Ord + Clone,
{
#[must_use]
pub const fn key(&self) -> &K {
match self {
Self::Hit(key) | Self::Upquery(key) | Self::AlreadyPending(key) => key,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MaterializedKeys<K>
where
K: Ord + Clone,
{
materialized: BTreeSet<K>,
pending: BTreeSet<K>,
pending_waiters: BTreeMap<K, usize>,
}
impl<K> Default for MaterializedKeys<K>
where
K: Ord + Clone,
{
fn default() -> Self {
Self::new()
}
}
impl<K> MaterializedKeys<K>
where
K: Ord + Clone,
{
#[must_use]
pub const fn new() -> Self {
Self {
materialized: BTreeSet::new(),
pending: BTreeSet::new(),
pending_waiters: BTreeMap::new(),
}
}
#[must_use]
pub fn status(&self, key: &K) -> KeyStatus {
if self.materialized.contains(key) {
KeyStatus::Materialized
} else if self.pending.contains(key) {
KeyStatus::Pending
} else {
KeyStatus::Missing
}
}
pub fn lookup(&mut self, key: K) -> LookupOutcome<K> {
if self.materialized.contains(&key) {
return LookupOutcome::Hit(key);
}
if self.pending.contains(&key) {
*self.pending_waiters.entry(key.clone()).or_insert(0) += 1;
return LookupOutcome::AlreadyPending(key);
}
self.pending.insert(key.clone());
self.pending_waiters.insert(key.clone(), 1);
LookupOutcome::Upquery(key)
}
pub fn mark_materialized(&mut self, key: K) -> usize {
let waiters = self.pending_waiters.remove(&key).unwrap_or(0);
self.pending.remove(&key);
self.materialized.insert(key);
waiters
}
pub fn mark_many_materialized<I>(&mut self, keys: I) -> usize
where
I: IntoIterator<Item = K>,
{
keys.into_iter()
.map(|key| self.mark_materialized(key))
.sum()
}
pub fn forget(&mut self, key: &K) -> bool {
self.materialized.remove(key)
}
pub fn fail_pending(&mut self, key: &K) -> usize {
let waiters = self.pending_waiters.remove(key).unwrap_or(0);
self.pending.remove(key);
waiters
}
#[must_use]
pub const fn materialized(&self) -> &BTreeSet<K> {
&self.materialized
}
#[must_use]
pub const fn pending(&self) -> &BTreeSet<K> {
&self.pending
}
#[must_use]
pub fn materialized_len(&self) -> usize {
self.materialized.len()
}
#[must_use]
pub fn pending_len(&self) -> usize {
self.pending.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.materialized.is_empty() && self.pending.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BatchLookup<K>
where
K: Ord + Clone,
{
pub hits: Vec<K>,
pub upqueries: Vec<K>,
pub already_pending: Vec<K>,
}
impl<K> MaterializedKeys<K>
where
K: Ord + Clone,
{
pub fn lookup_batch<I>(&mut self, keys: I) -> BatchLookup<K>
where
I: IntoIterator<Item = K>,
{
let mut batch = BatchLookup {
hits: Vec::new(),
upqueries: Vec::new(),
already_pending: Vec::new(),
};
for key in keys {
match self.lookup(key) {
LookupOutcome::Hit(key) => batch.hits.push(key),
LookupOutcome::Upquery(key) => batch.upqueries.push(key),
LookupOutcome::AlreadyPending(key) => batch.already_pending.push(key),
}
}
batch
}
}
#[derive(Debug, Clone, Default)]
pub struct MaterializationTracker<K>
where
K: Ord + Clone,
{
arrangements: BTreeMap<String, MaterializedKeys<K>>,
}
impl<K> MaterializationTracker<K>
where
K: Ord + Clone,
{
#[must_use]
pub const fn new() -> Self {
Self {
arrangements: BTreeMap::new(),
}
}
pub fn register(&mut self, name: impl Into<String>) -> Result<(), ArrangementAlreadyTracked> {
let name = name.into();
match self.arrangements.entry(name) {
Entry::Vacant(slot) => {
slot.insert(MaterializedKeys::new());
Ok(())
}
Entry::Occupied(slot) => Err(ArrangementAlreadyTracked {
name: slot.key().clone(),
}),
}
}
#[must_use]
pub fn arrangement_mut(&mut self, name: &str) -> Option<&mut MaterializedKeys<K>> {
self.arrangements.get_mut(name)
}
#[must_use]
pub fn arrangement(&self, name: &str) -> Option<&MaterializedKeys<K>> {
self.arrangements.get(name)
}
#[must_use]
pub fn len(&self) -> usize {
self.arrangements.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.arrangements.is_empty()
}
pub fn forget_arrangement(&mut self, name: &str) -> Option<MaterializedKeys<K>> {
self.arrangements.remove(name)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ArrangementAlreadyTracked {
pub name: String,
}
impl std::fmt::Display for ArrangementAlreadyTracked {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "arrangement '{}' is already tracked", self.name)
}
}
impl std::error::Error for ArrangementAlreadyTracked {}
#[cfg(test)]
mod tests {
use super::{
ArrangementAlreadyTracked, BatchLookup, KeyStatus, LookupOutcome, MaterializationTracker,
MaterializedKeys,
};
#[test]
fn lookup_promotes_missing_keys_to_pending_and_returns_upquery() {
let mut keys = MaterializedKeys::<u64>::new();
assert_eq!(keys.lookup(7), LookupOutcome::Upquery(7));
assert_eq!(keys.status(&7), KeyStatus::Pending);
assert_eq!(keys.lookup(7), LookupOutcome::AlreadyPending(7));
assert_eq!(keys.pending_len(), 1);
}
#[test]
fn mark_materialized_returns_waiter_count_and_clears_pending() {
let mut keys = MaterializedKeys::<u64>::new();
let _ = keys.lookup(7);
let _ = keys.lookup(7);
assert_eq!(keys.mark_materialized(7), 2);
assert_eq!(keys.status(&7), KeyStatus::Materialized);
assert_eq!(keys.pending_len(), 0);
}
#[test]
fn fail_pending_resets_state_and_reports_waiters() {
let mut keys = MaterializedKeys::<u64>::new();
let _ = keys.lookup(7);
let _ = keys.lookup(7);
assert_eq!(keys.fail_pending(&7), 2);
assert_eq!(keys.status(&7), KeyStatus::Missing);
}
#[test]
fn forget_drops_materialized_keys() {
let mut keys = MaterializedKeys::<u64>::new();
let _ = keys.lookup(7);
keys.mark_materialized(7);
assert!(keys.forget(&7));
assert_eq!(keys.status(&7), KeyStatus::Missing);
}
#[test]
fn lookup_batch_splits_by_outcome() {
let mut keys = MaterializedKeys::<u64>::new();
let _ = keys.lookup(1);
keys.mark_materialized(1);
let _ = keys.lookup(2);
let BatchLookup {
hits,
upqueries,
already_pending,
} = keys.lookup_batch([1_u64, 2, 3]);
assert_eq!(hits, [1]);
assert_eq!(already_pending, [2]);
assert_eq!(upqueries, [3]);
}
#[test]
fn bulk_mark_returns_aggregate_waiter_count() {
let mut keys = MaterializedKeys::<u64>::new();
let _ = keys.lookup(1);
let _ = keys.lookup(1);
let _ = keys.lookup(2);
assert_eq!(keys.mark_many_materialized([1, 2, 3]), 3);
}
#[test]
fn tracker_routes_lookups_per_arrangement() {
let mut tracker = MaterializationTracker::<u64>::new();
tracker.register("posts-by-author").unwrap();
let posts = tracker.arrangement_mut("posts-by-author").unwrap();
assert_eq!(posts.lookup(7), LookupOutcome::Upquery(7));
assert_eq!(
tracker
.arrangement("posts-by-author")
.map(MaterializedKeys::pending_len),
Some(1)
);
}
#[test]
fn duplicate_registration_errors() {
let mut tracker = MaterializationTracker::<u64>::new();
tracker.register("a").unwrap();
let err = tracker.register("a").unwrap_err();
assert_eq!(err, ArrangementAlreadyTracked { name: "a".into() });
}
}