use core::fmt::Debug;
use std::{collections::BTreeMap, rc::Rc};
use bytes::{Bytes, BytesMut};
use ufotofu::{prelude::*, producer::clone_from_owned_slice};
use frugal_async::Mutex;
use crate::{
prelude::*,
storage::{CreateEntryError, NondestructiveInsert},
};
#[derive(Debug)]
pub struct MemoryStore<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT> {
rc: Rc<Mutex<Store_<MCL, MCC, MPL, N, S, PD, AT>>>,
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
MemoryStore<MCL, MCC, MPL, N, S, PD, AT>
{
pub fn new() -> Self {
MemoryStore {
rc: Rc::new(Mutex::new(Store_::new())),
}
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT> Default
for MemoryStore<MCL, MCC, MPL, N, S, PD, AT>
{
fn default() -> Self {
Self::new()
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT> Clone
for MemoryStore<MCL, MCC, MPL, N, S, PD, AT>
{
fn clone(&self) -> Self {
Self {
rc: self.rc.clone(),
}
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
MemoryStore<MCL, MCC, MPL, N, S, PD, AT>
where
PD: Clone + Ord,
N: Clone,
S: Clone,
{
async fn create_entry_impl<Data, P, H>(
&mut self,
data: &Data,
mut payload_producer: P,
payload_length: u64,
ingredients: &AT::Ingredients,
nondestructive: bool,
) -> Result<
NondestructiveInsert<MCL, MCC, MPL, N, S, PD, AT>,
CreateEntryError<Infallible, P::Error, AT::CreationError>,
>
where
Data: ?Sized + Namespaced<N> + Coordinatelike<MCL, MCC, MPL, S>,
P: BulkProducer<Item = u8, Final = ()>,
H: Default + anyhash::Hasher<PD>,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + Debug + Clone,
N: Debug + Clone + Ord,
S: Debug + Clone + Ord,
PD: Debug,
{
let mut payload = BytesMut::with_capacity(payload_length as usize);
let payload = loop {
match payload_producer
.expose_items_sync(|items| {
payload.extend_from_slice(items);
(items.len(), ())
})
.await
{
Ok(Left(())) => {}
Ok(Right(())) => {
if payload.len() as u64 == payload_length {
break payload.freeze();
} else {
return Err(CreateEntryError::IncorrectPayloadLength);
}
}
Err(err) => return Err(CreateEntryError::ProducerError(err)),
}
};
let entry: Entry<MCL, MCC, MPL, N, S, PD> = Entry::builder()
.namespace_id(data.wdm_namespace_id().clone())
.subspace_id(data.wdm_subspace_id().clone())
.path(data.wdm_path().clone())
.timestamp(data.wdm_timestamp())
.payload::<_, H, PD>(&payload)
.build();
let entry = entry
.into_authorised_entry(ingredients)
.map_err(CreateEntryError::AuthorisationTokenError)?;
let mut store = self.rc.write().await;
Ok(store.do_insert_entry(entry, nondestructive, Some(payload)))
}
}
#[derive(Debug)]
struct Store_<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT> {
namespaces: BTreeMap<N, NamespaceStore<MCL, MCC, MPL, S, PD, AT>>,
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
Store_<MCL, MCC, MPL, N, S, PD, AT>
{
fn new() -> Self {
Store_ {
namespaces: BTreeMap::new(),
}
}
fn get_or_create_namespace_store(
&mut self,
namespace_id: &N,
) -> &mut NamespaceStore<MCL, MCC, MPL, S, PD, AT>
where
N: Ord + Clone,
{
if !self.namespaces.contains_key(namespace_id) {
let _ = self
.namespaces
.insert(namespace_id.clone(), NamespaceStore::new());
}
self.namespaces.get_mut(namespace_id).unwrap()
}
fn do_insert_entry(
&mut self,
authorised_entry: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
prevent_pruning: bool,
payload: Option<Bytes>,
) -> NondestructiveInsert<MCL, MCC, MPL, N, S, PD, AT>
where
N: Ord + Clone,
S: Ord + Clone,
PD: Ord + Clone,
AT: Clone,
{
let namespace_store =
self.get_or_create_namespace_store(authorised_entry.wdm_namespace_id());
let subspace_store =
namespace_store.get_or_create_subspace_store(authorised_entry.wdm_subspace_id());
for (path, entry) in subspace_store.entries.iter() {
if path.is_prefix_of(authorised_entry.wdm_path())
&& entry.is_newer_than(authorised_entry.entry())
{
return NondestructiveInsert::Outdated;
}
}
if subspace_store.handle_insertion(&authorised_entry, prevent_pruning, true, payload) {
NondestructiveInsert::Prevented
} else {
NondestructiveInsert::Success(authorised_entry)
}
}
}
#[derive(Debug)]
struct NamespaceStore<const MCL: usize, const MCC: usize, const MPL: usize, S, PD, AT> {
subspaces: BTreeMap<S, SubspaceStore<MCL, MCC, MPL, PD, AT>>,
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, S, PD, AT>
NamespaceStore<MCL, MCC, MPL, S, PD, AT>
{
fn new() -> Self {
NamespaceStore {
subspaces: BTreeMap::new(),
}
}
fn get_or_create_subspace_store(
&mut self,
subspace_id: &S,
) -> &mut SubspaceStore<MCL, MCC, MPL, PD, AT>
where
S: Ord + Clone,
{
if !self.subspaces.contains_key(subspace_id) {
let _ = self
.subspaces
.insert(subspace_id.clone(), SubspaceStore::new());
}
self.subspaces.get_mut(subspace_id).unwrap()
}
}
#[derive(Debug)]
struct SubspaceStore<const MCL: usize, const MCC: usize, const MPL: usize, PD, AT> {
entries: BTreeMap<Path<MCL, MCC, MPL>, ControlEntry<PD, AT>>,
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, PD, AT>
SubspaceStore<MCL, MCC, MPL, PD, AT>
{
fn new() -> Self {
Self {
entries: BTreeMap::new(),
}
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, PD, AT>
SubspaceStore<MCL, MCC, MPL, PD, AT>
where
PD: Ord + Clone,
AT: Clone,
{
fn handle_insertion<N, S>(
&mut self,
new_entry: &AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
prevent_pruning: bool,
do_insert_if_necessary: bool,
payload: Option<Bytes>,
) -> bool {
let prune_these: Vec<_> = self
.entries
.iter()
.filter_map(|(path, entry)| {
if new_entry.wdm_path().is_prefix_of(path)
&& !entry.is_newer_than(new_entry.entry())
{
Some(path.clone())
} else {
None
}
})
.collect();
if prevent_pruning && !prune_these.is_empty() {
true
} else {
for path_to_prune in prune_these {
self.entries.remove(&path_to_prune);
}
if do_insert_if_necessary {
self.entries.insert(
new_entry.wdm_path().clone(),
ControlEntry {
authorisation_token: new_entry.authorisation_token().clone(),
payload: payload.unwrap_or_default(),
payload_digest: new_entry.wdm_payload_digest().clone(),
payload_length: new_entry.wdm_payload_length(),
timestamp: new_entry.wdm_timestamp(),
},
);
}
false
}
}
}
#[derive(Debug, Clone)]
struct ControlEntry<PD, AT> {
timestamp: Timestamp,
payload_length: u64,
payload_digest: PD,
authorisation_token: AT,
payload: Bytes,
}
impl<PD, AT> ControlEntry<PD, AT>
where
PD: Ord,
{
fn is_newer_than<const MCL: usize, const MCC: usize, const MPL: usize, N, S>(
&self,
entry: &Entry<MCL, MCC, MPL, N, S, PD>,
) -> bool {
entry.wdm_timestamp() < self.timestamp
|| (entry.wdm_timestamp() == self.timestamp
&& *entry.wdm_payload_digest() < self.payload_digest)
|| (entry.wdm_timestamp() == self.timestamp
&& *entry.wdm_payload_digest() == self.payload_digest
&& entry.wdm_payload_length() < self.payload_length)
}
}
impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
Store<MCL, MCC, MPL, N, S, PD, AT> for MemoryStore<MCL, MCC, MPL, N, S, PD, AT>
where
PD: Clone + Ord,
N: Clone + Ord,
S: Clone + Ord,
AT: Clone,
{
type InternalError = Infallible;
async fn create_entry<Data, P, H>(
&mut self,
data: &Data,
payload_producer: P,
payload_length: u64,
ingredients: &AT::Ingredients,
) -> Result<
Option<AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>,
CreateEntryError<Self::InternalError, P::Error, AT::CreationError>,
>
where
Data: ?Sized + Namespaced<N> + Coordinatelike<MCL, MCC, MPL, S>,
P: BulkProducer<Item = u8, Final = ()>,
H: Default + anyhash::Hasher<PD>,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + Debug,
N: Debug,
S: Debug,
PD: Debug,
{
match self
.create_entry_impl::<Data, P, H>(
data,
payload_producer,
payload_length,
ingredients,
false,
)
.await?
{
NondestructiveInsert::Success(yay) => Ok(Some(yay)),
NondestructiveInsert::Outdated => Ok(None),
NondestructiveInsert::Prevented => unreachable!(),
}
}
async fn create_entry_nondestructive<Data, P, H>(
&mut self,
data: &Data,
payload_producer: P,
payload_length: u64,
ingredients: &AT::Ingredients,
) -> Result<
NondestructiveInsert<MCL, MCC, MPL, N, S, PD, AT>,
CreateEntryError<Self::InternalError, P::Error, AT::CreationError>,
>
where
Data: ?Sized + Namespaced<N> + Coordinatelike<MCL, MCC, MPL, S>,
P: BulkProducer<Item = u8, Final = ()>,
H: Default + anyhash::Hasher<PD>,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD> + Debug,
N: Debug,
S: Debug,
PD: Debug,
{
self.create_entry_impl::<Data, P, H>(
data,
payload_producer,
payload_length,
ingredients,
true,
)
.await
}
async fn insert_entry(
&mut self,
entry: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
) -> Result<bool, Self::InternalError> {
let mut store = self.rc.write().await;
match store.do_insert_entry(entry, false, None) {
NondestructiveInsert::Outdated => Ok(false),
NondestructiveInsert::Prevented => unreachable!(),
NondestructiveInsert::Success(_yay) => Ok(true),
}
}
async fn forget_entry<K>(
&mut self,
namespace_id: &N,
key: &K,
expected_digest: Option<PD>,
) -> Result<bool, Self::InternalError>
where
K: Keylike<MCL, MCC, MPL, S>,
PD: PartialEq,
{
let mut store = self.rc.write().await;
let namespace_store = store.get_or_create_namespace_store(namespace_id);
let subspace_store = namespace_store.get_or_create_subspace_store(key.wdm_subspace_id());
let found = subspace_store.entries.get(key.wdm_path());
match found {
None => Ok(false),
Some(entry) => {
if let Some(expected) = expected_digest
&& entry.payload_digest != expected
{
return Ok(false);
}
Ok(subspace_store.entries.remove(key.wdm_path()).is_some())
}
}
}
async fn forget_area(
&mut self,
namespace_id: &N,
area: &Area<MCL, MCC, MPL, S>,
) -> Result<(), Self::InternalError> {
let entries_to_forget = {
let mut store = self.rc.write().await;
let namespace_store = store.get_or_create_namespace_store(namespace_id);
let mut entries_to_forget = vec![];
match area.subspace() {
Some(subspace_id) => {
let subspace_store = namespace_store.get_or_create_subspace_store(subspace_id);
for (path, entry) in subspace_store.entries.iter() {
if !area.times().includes_value(&entry.timestamp) {
continue;
}
if path.is_prefixed_by(area.path()) {
entries_to_forget.push((subspace_id.clone(), path.clone()));
}
}
}
None => {
for (subspace_id, subspace_store) in namespace_store.subspaces.iter() {
for (path, entry) in subspace_store.entries.iter() {
if !area.times().includes_value(&entry.timestamp) {
continue;
}
if path.is_prefixed_by(area.path()) {
entries_to_forget.push((subspace_id.clone(), path.clone()));
}
}
}
}
}
entries_to_forget
};
for forget_this in entries_to_forget {
self.forget_entry(namespace_id, &forget_this, None)
.await
.expect("cannot fail when `expected_digest` is `None`");
}
Ok(())
}
async fn forget_namespace(&mut self, namespace_id: &N) -> Result<(), Self::InternalError> {
let mut store = self.rc.write().await;
store.namespaces.remove(namespace_id);
Ok(())
}
async fn get_entry<K, Slice>(
&mut self,
namespace_id: &N,
key: &K,
expected_digest: Option<PD>,
payload_slice: &Slice,
) -> Result<
Option<(
AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
impl BulkProducer<
Item = u8,
Final = (),
Error = super::PayloadProducerError<Self::InternalError>,
>,
)>,
Self::InternalError,
>
where
K: Keylike<MCL, MCC, MPL, S>,
Slice: std::ops::RangeBounds<u64>,
{
let mut store = self.rc.write().await;
let namespace_store = store.get_or_create_namespace_store(namespace_id);
let subspace_store = namespace_store.get_or_create_subspace_store(key.wdm_subspace_id());
match subspace_store.entries.get(key.wdm_path()) {
None => Ok(None),
Some(found) => {
if let Some(expected) = expected_digest
&& found.payload_digest != expected
{
return Ok(None);
}
let entry = Entry::builder()
.namespace_id(namespace_id.clone())
.subspace_id(key.wdm_subspace_id().clone())
.path(key.wdm_path().clone())
.timestamp(found.timestamp)
.payload_length(found.payload_length)
.payload_digest(found.payload_digest.clone())
.build();
let authed = unsafe {
PossiblyAuthorisedEntry {
entry,
authorisation_token: found.authorisation_token.clone(),
}
.into_authorised_entry_unchecked()
};
Ok(Some((
authed,
clone_from_owned_slice(found.payload.slice((
payload_slice.start_bound().map(|start| *start as usize),
payload_slice.end_bound().map(|end| *end as usize),
)))
.to_map_error(|_| unreachable!()),
)))
}
}
}
async fn get_area(
&mut self,
namespace_id: N,
area: Area<MCL, MCC, MPL, S>,
) -> impl Producer<
Item = AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
Final = (),
Error = Self::InternalError,
> {
let entries_to_produce = {
let mut store = self.rc.write().await;
let namespace_store = store.get_or_create_namespace_store(&namespace_id);
let mut entries_to_produce = vec![];
match area.subspace() {
Some(subspace_id) => {
let subspace_store = namespace_store.get_or_create_subspace_store(subspace_id);
for (path, entry) in subspace_store.entries.iter() {
if !area.times().includes_value(&entry.timestamp) {
continue;
}
if path.is_prefixed_by(area.path()) {
let new_entry = Entry::builder()
.namespace_id(namespace_id.clone())
.subspace_id(subspace_id.clone())
.path(path.clone())
.timestamp(entry.timestamp)
.payload_length(entry.payload_length)
.payload_digest(entry.payload_digest.clone())
.build();
let authed = unsafe {
PossiblyAuthorisedEntry {
entry: new_entry,
authorisation_token: entry.authorisation_token.clone(),
}
.into_authorised_entry_unchecked()
};
entries_to_produce.push(authed);
}
}
}
None => {
for (subspace_id, subspace_store) in namespace_store.subspaces.iter() {
for (path, entry) in subspace_store.entries.iter() {
if !area.times().includes_value(&entry.timestamp) {
continue;
}
if path.is_prefixed_by(area.path()) {
let new_entry = Entry::builder()
.namespace_id(namespace_id.clone())
.subspace_id(subspace_id.clone())
.path(path.clone())
.timestamp(entry.timestamp)
.payload_length(entry.payload_length)
.payload_digest(entry.payload_digest.clone())
.build();
let authed = unsafe {
PossiblyAuthorisedEntry {
entry: new_entry,
authorisation_token: entry.authorisation_token.clone(),
}
.into_authorised_entry_unchecked()
};
entries_to_produce.push(authed);
}
}
}
}
}
entries_to_produce
};
entries_to_produce
.into_producer()
.to_map_error(|_| unreachable!())
}
async fn flush(&mut self) -> Result<(), Self::InternalError> {
Ok(())
}
}