use hashlink::LinkedHashMap;
use rustolio_utils::{crypto::signature::PublicKey, prelude::Encode};
use tokio::sync::{mpsc, oneshot};
use crate::{Key, KeyType, Result, Value};
use super::{disk, BoxAny, HandleFn};
pub enum Action {
Update(HandleFn<BoxAny>, oneshot::Sender<Result<BoxAny>>),
Shutdown(oneshot::Sender<()>),
}
pub struct Store {
max_usage: usize,
current_usage: usize,
}
impl Store {
pub fn init(
max_usage: usize,
mut mem_channel: mpsc::Receiver<Action>,
disk_channel: mpsc::Sender<disk::Action>,
) {
tokio::spawn(async move {
let mut shutdown_tx = None;
let mut controller = StoreController::new(max_usage, disk_channel);
loop {
match mem_channel.recv().await {
Some(Action::Shutdown(tx)) => {
shutdown_tx = Some(tx);
mem_channel.close();
}
Some(Action::Update(f, tx)) => {
tx.send(f(&mut controller).await).unwrap();
}
None => {
shutdown_tx.unwrap().send(()).unwrap();
break;
}
}
}
});
}
}
pub struct StoreController {
max_usage: usize,
current_usage: usize,
data: LinkedHashMap<Key, Value, crate::key::KeyState>,
disk_channel: mpsc::Sender<disk::Action>,
}
impl StoreController {
fn new(max_usage: usize, disk_channel: mpsc::Sender<disk::Action>) -> Self {
Self {
max_usage,
current_usage: 0,
data: LinkedHashMap::default(),
disk_channel,
}
}
pub async fn get(&mut self, key: Key, signer: Option<PublicKey>) -> Result<Option<Value>> {
let value = self.get_mem(key);
if let Some(value) = value {
match key.ty() {
KeyType::ReadWrite | KeyType::ReadSecureWrite => {}
KeyType::SecureReadWrite => {
if value.signer() != signer {
return Err(crate::Error::NotAllowed);
}
}
}
return Ok(Some(value));
}
let (tx, rx) = oneshot::channel();
if self
.disk_channel
.send(disk::Action::Get(key, signer, tx))
.await
.is_err()
{
return Err(crate::Error::StoreClosed);
}
let value = rx.await.unwrap()?;
if value.is_some() {
self.set_mem(key, value.clone().unwrap());
}
Ok(value)
}
pub async fn set(&mut self, key: Key, value: Value) -> Result<()> {
self.set_mem(key, value.clone())?;
if self
.disk_channel
.send(disk::Action::Set(key, value))
.await
.is_err()
{
return Err(crate::Error::StoreClosed);
}
Ok(())
}
fn get_mem(&mut self, key: Key) -> Option<Value> {
self.data.to_back(&key).cloned()
}
fn set_mem(&mut self, key: Key, value: Value) -> Result<Option<Value>> {
let signer = value.signer();
self.current_usage += calc_size(&key, &value);
let old = self.data.insert(key, value);
if let Some(old) = old.as_ref() {
if old.signer() != signer {
return Err(crate::Error::NotAllowed);
}
self.current_usage -= calc_size(&key, old);
}
while self.current_usage > self.max_usage {
let Some((key, value)) = self.data.pop_front() else {
panic!("Value to big to store");
};
let size = calc_size(&key, &value);
self.current_usage -= size;
}
Ok(old)
}
}
#[inline]
fn calc_size(key: &crate::Key, value: &crate::Value) -> usize {
key.encode_size() + value.encode_size()
}