mod disk;
mod memory;
use std::{future::Future, path::PathBuf, pin::Pin};
use rustolio_utils::{crypto::signature::PublicKey, prelude::*};
use tokio::sync::{mpsc, oneshot};
use super::{Key, Result, Value};
use memory::StoreController;
type HandleFn<R> =
Box<dyn for<'a> FnOnce(&'a mut StoreController) -> PinnedFuture<'a, R> + Send + Sync + 'static>;
type PinnedFuture<'a, R> = Pin<Box<dyn Future<Output = Result<R>> + Send + 'a>>;
type BoxAny = Box<dyn std::any::Any + Send + Sync + 'static>;
pub fn get(key: Key, signer: Option<PublicKey>) -> HandleFn<Option<Value>> {
Box::new(move |controller: &mut StoreController| {
Box::pin(async move { controller.get(key, signer).await })
})
}
pub fn set(key: Key, value: Value) -> HandleFn<()> {
Box::new(move |controller: &mut StoreController| {
Box::pin(async move {
controller.set(key, value).await?;
Ok(())
})
})
}
pub fn getset(key: Key, value: Value) -> HandleFn<Option<Value>> {
Box::new(move |controller: &mut StoreController| {
Box::pin(async move {
let old = controller.get(key, value.signer()).await?;
controller.set(key, value).await?;
Ok(old)
})
})
}
#[derive(Debug, service::Service)]
pub struct Service {
mem_channel: mpsc::Sender<memory::Action>,
disk_channel: mpsc::Sender<disk::Action>,
}
impl Service {
pub fn new(max_mem_usage: usize, disk_path: PathBuf) -> Self {
let (disk_channel, rx) = mpsc::channel(20);
disk::Store::init(disk_path, rx);
let (mem_channel, rx) = mpsc::channel(20);
memory::Store::init(max_mem_usage, rx, disk_channel.clone());
Self {
mem_channel,
disk_channel,
}
}
}
#[service_impl]
impl StoreReplication for AsRef<Service> {
async fn store_replication(&'static self, key: Key, value: Value) -> Result<()> {
if self
.as_ref()
.disk_channel
.send(disk::Action::Set(key, value))
.await
.is_err()
{
return Err(crate::Error::StoreClosed);
}
Ok(())
}
}
fn create_handle_fn<R: Threadsafe>(f: HandleFn<R>) -> HandleFn<BoxAny> {
Box::new(move |controller: &mut StoreController| {
let future = f(controller);
let future = async move {
let r: Result<R> = future.await;
r.map(|v| Box::new(v) as BoxAny)
};
Box::pin(future)
})
}
#[service_impl]
impl OperateStore for AsRef<Service> {
async fn handle<R: Threadsafe>(&'static self, f: HandleFn<R>) -> Result<R> {
let f = create_handle_fn(f);
let (tx, rx) = oneshot::channel();
if self
.as_ref()
.mem_channel
.send(memory::Action::Update(f, tx))
.await
.is_err()
{
return Err(crate::Error::StoreClosed);
}
let any = rx.await.unwrap()?;
Ok(unsafe {
*any.downcast().unwrap_unchecked()
})
}
async fn shutdown(&'static self) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.as_ref()
.mem_channel
.send(memory::Action::Shutdown(tx))
.await
.expect("Store already shut down");
rx.await.unwrap();
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::{test_utils::file::generate_dir, KeyType};
use super::*;
#[tokio::test]
async fn test_store_operation() {
let (dir, _guard) = generate_dir("store_operation");
let kv_store = Service::new(5, dir);
service!(kv_store as Service);
let key = Key::from_value(&0, KeyType::ReadWrite);
let value = Value::from_value(&"foo").unwrap();
let get_op = get(key, None);
let res = kv_store.handle(get_op).await.unwrap();
assert_eq!(res, None);
let set_op = set(key, value.clone());
kv_store.handle(set_op).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let get_op = get(key, None);
let res = kv_store.handle(get_op).await.unwrap();
assert_eq!(res, Some(value.clone()));
let new_value = Value::from_value(&"bar").unwrap();
let getset_op = getset(key, new_value.clone());
let res = kv_store.handle(getset_op).await.unwrap();
assert_eq!(res, Some(value));
tokio::time::sleep(Duration::from_millis(50)).await;
let get_op = get(key, None);
let res = kv_store.handle(get_op).await.unwrap();
assert_eq!(res, Some(new_value));
tokio::time::sleep(Duration::from_millis(50)).await;
}
#[tokio::test]
async fn test_store_replication() {
let (dir, _guard) = generate_dir("store_replication");
let kv_store = Service::new(5, dir);
service!(kv_store as Service);
let key = Key::from_value(&0, KeyType::ReadWrite);
let value = Value::from_value(&"foo").unwrap();
let get_op = get(key, None);
let res = kv_store.handle(get_op).await.unwrap();
assert_eq!(res, None);
kv_store
.store_replication(key, value.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let get_op = get(key, None);
let res = kv_store.handle(get_op).await.unwrap();
assert_eq!(res.unwrap(), value);
}
}