use crate::{
listener::{self, ListenerMap},
Bytes, Mutation,
};
use serde::{Deserialize, Serialize};
use std::{
cell::RefCell,
collections::{BTreeMap, HashSet},
fmt::{Debug, Formatter},
sync::{Arc, Mutex},
};
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub enum PrefixMapValue {
Value(Bytes),
Deleted,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub enum PrefixMap {
Children(BTreeMap<Bytes, PrefixMapValue>),
DeletedPrefixMap,
}
impl PrefixMap {
fn get(&self, key: &Bytes) -> Option<PrefixMapValue> {
match self {
PrefixMap::Children(children) => children.get(key).cloned(),
PrefixMap::DeletedPrefixMap => Some(PrefixMapValue::Deleted),
}
}
fn insert(&mut self, key: Bytes, value: PrefixMapValue) {
match self {
PrefixMap::Children(children) => {
children.insert(key, value);
}
PrefixMap::DeletedPrefixMap => {
if value == PrefixMapValue::Deleted {
return;
}
let mut new_children = BTreeMap::new();
new_children.insert(key, value);
*self = PrefixMap::Children(new_children);
}
}
}
}
impl Default for PrefixMap {
fn default() -> Self {
Self::Children(BTreeMap::new())
}
}
#[derive(Default)]
pub struct StoreLayer {
layer: BTreeMap<Vec<Bytes>, PrefixMap>,
dirty: HashSet<Vec<Bytes>>,
}
pub struct StoreInner {
layers: Mutex<Vec<StoreLayer>>,
listeners: Mutex<ListenerMap>,
}
impl Default for StoreInner {
fn default() -> Self {
Self {
layers: Mutex::new(vec![StoreLayer::default()]),
listeners: Mutex::new(ListenerMap::default()),
}
}
}
#[derive(Clone, Default)]
pub struct Store {
inner: Arc<StoreInner>,
}
impl Store {
pub fn prefixes(&self) -> Vec<Vec<Bytes>> {
let mut result = std::collections::BTreeSet::new();
let layers = self.inner.layers.lock().unwrap();
for layer in layers.iter() {
for (prefix, value) in layer.layer.iter() {
match value {
PrefixMap::Children(_) => {
result.insert(prefix.clone());
}
PrefixMap::DeletedPrefixMap => {
result.remove(prefix);
}
}
}
}
result.into_iter().collect()
}
pub fn ensure(&self, prefix: &[Bytes]) {
let mut layers = self.inner.layers.lock().unwrap();
let mut layer = layers.last_mut().unwrap();
layer.layer.entry(prefix.to_vec()).or_default();
}
pub fn push_overlay(&self) {
let mut layers = self.inner.layers.lock().unwrap();
layers.push(StoreLayer::default());
}
pub fn pop_overlay(&self) {
let mut layers = self.inner.layers.lock().unwrap();
layers.pop();
if layers.is_empty() {
tracing::error!("popped last overlay");
}
}
pub fn notify_dirty(&self) {
let mut dirty_prefixes = HashSet::new();
{
let mut layers = self.inner.layers.lock().unwrap();
for layer in layers.iter_mut() {
let new_prefixes = std::mem::take(&mut layer.dirty);
dirty_prefixes.extend(new_prefixes.into_iter());
}
}
let mut listeners = self.inner.listeners.lock().unwrap();
for prefix in dirty_prefixes.iter() {
listeners.alert(prefix);
}
}
pub fn top_layer_mutations(&self) -> Vec<Mutation> {
let mut layers = self.inner.layers.lock().unwrap();
let top_layer = layers.last().unwrap();
let mut mutations = vec![];
for (prefix, entries) in top_layer.layer.iter() {
mutations.push(Mutation {
prefix: prefix.clone(),
entries: entries.clone(),
});
}
mutations
}
pub fn alert(&self, prefix: &Vec<Bytes>) {
let mut listeners = self.inner.listeners.lock().unwrap();
listeners.alert(prefix);
}
pub fn combine_down(&self) {
let mut layers = self.inner.layers.lock().unwrap();
let Some(top_layer) = layers.pop() else {
return;
};
let Some(next_layer) = layers.last_mut() else {
return;
};
for (prefix, map) in top_layer.layer.iter() {
match map {
PrefixMap::Children(children) => {
let entry = next_layer
.layer
.entry(prefix.clone())
.or_insert_with(|| PrefixMap::Children(BTreeMap::new()));
match entry {
PrefixMap::Children(next_children) => {
for (key, value) in children.iter() {
next_children.insert(key.clone(), value.clone());
}
}
PrefixMap::DeletedPrefixMap => {
next_layer
.layer
.insert(prefix.clone(), PrefixMap::Children(children.clone()));
}
}
}
PrefixMap::DeletedPrefixMap => {
next_layer
.layer
.insert(prefix.clone(), PrefixMap::DeletedPrefixMap);
}
}
}
next_layer.dirty.extend(top_layer.dirty);
}
pub fn get(&self, prefix: &Vec<Bytes>, key: &Bytes) -> Option<Bytes> {
let layers = self.inner.layers.lock().unwrap();
for layer in layers.iter().rev() {
if let Some(map) = layer.layer.get(prefix) {
if let Some(value) = map.get(key) {
match value {
PrefixMapValue::Value(value) => return Some(value.clone()),
PrefixMapValue::Deleted => return None,
}
}
}
}
None
}
pub fn mutate(&self, mutations: &[Mutation]) {
let mut layers = self.inner.layers.lock().unwrap();
let top_layer = layers.last_mut().unwrap();
for mutation in mutations.iter() {
match &mutation.entries {
PrefixMap::DeletedPrefixMap => {
let mut map = top_layer.layer.entry(mutation.prefix.clone()).or_default();
*map = PrefixMap::DeletedPrefixMap;
}
PrefixMap::Children(children) => {
let mut map = top_layer.layer.entry(mutation.prefix.clone()).or_default();
for (key, value) in children.iter() {
map.insert(key.clone(), value.clone());
}
}
}
top_layer.dirty.insert(mutation.prefix.clone());
}
}
pub fn handle(&self) -> StoreHandle {
StoreHandle {
map: self.clone(),
prefix: vec![],
}
}
}
#[derive(Clone)]
pub struct StoreHandle {
map: Store,
prefix: Vec<Bytes>,
}
impl StoreHandle {
pub fn listen<F: Fn() -> bool + 'static + Send + Sync>(&self, listener: F) {
let mut listeners = self.map.inner.listeners.lock().unwrap();
listeners.listen(self.prefix.clone(), listener);
}
pub fn get(&self, key: &Bytes) -> Option<Bytes> {
self.map.get(&self.prefix, key)
}
pub fn set(&mut self, key: Bytes, value: Bytes) {
let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();
let mut map = top_layer.layer.entry(self.prefix.clone()).or_default();
top_layer.dirty.insert(self.prefix.clone());
map.insert(key, PrefixMapValue::Value(value));
}
pub fn delete(&mut self, key: Bytes) {
let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();
let mut map = top_layer.layer.entry(self.prefix.clone()).or_default();
top_layer.dirty.insert(self.prefix.clone());
map.insert(key, PrefixMapValue::Deleted);
}
pub fn child(&mut self, path_part: &[u8]) -> Self {
let mut prefix = self.prefix.clone();
prefix.push(path_part.to_vec());
self.map.ensure(&prefix);
Self {
map: self.map.clone(),
prefix,
}
}
pub fn delete_child(&mut self, path_part: &[u8]) {
let mut prefix = self.prefix.clone();
prefix.push(path_part.to_vec());
let mut layers = self.map.inner.layers.lock().unwrap();
let mut top_layer = layers.last_mut().unwrap();
let mut prefixes_to_delete = HashSet::new();
for layer in layers.iter() {
for (pfx, val) in layer.layer.iter() {
if pfx.starts_with(&prefix) {
prefixes_to_delete.insert(pfx.clone());
}
}
}
let mut top_layer = layers.last_mut().unwrap();
for pfx in prefixes_to_delete.iter() {
top_layer
.layer
.insert(pfx.clone(), PrefixMap::DeletedPrefixMap);
top_layer.dirty.insert(pfx.clone());
}
}
}
impl Debug for Store {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let layers = self.inner.layers.lock().unwrap();
for (i, layer) in layers.iter().enumerate() {
writeln!(f, "Layer {}", i)?;
for (prefix, map) in layer.layer.iter() {
writeln!(f, " {:?} -> {:?}", prefix, map)?;
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn child_creates_prefix() {
let store = Store::default();
let mut handle = store.handle();
let mut child_handle = handle.child(b"foo");
let _ = child_handle.child(b"bar");
assert_eq!(
store.prefixes(),
vec![
vec![b"foo".to_vec()],
vec![b"foo".to_vec(), b"bar".to_vec()],
]
);
}
#[test]
fn deleting_parent_deletes_child() {
let store = Store::default();
let mut handle = store.handle();
let mut child_handle = handle.child(b"foo");
let _ = child_handle.child(b"bar");
handle.delete_child(b"foo".as_slice());
assert_eq!(store.prefixes(), vec![] as Vec<Vec<Bytes>>);
}
}