use crate::bplustree::NodeView;
use crate::codec::bincode::NoopNodeViewCodec;
use crate::layout::PAGE_SIZE;
use crate::storage::epoch::EpochManager;
use crate::storage::{HasEpoch, NodeStorage, PageStorage, StorageError};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
pub struct PagedNodeStorage<S: PageStorage> {
store: Arc<S>,
epoch_mgr: Arc<EpochManager>,
cache: RwLock<HashMap<u64, NodeView>>,
}
impl<S: PageStorage + Send + Sync + 'static> HasEpoch for PagedNodeStorage<S> {
fn epoch_mgr(&self) -> &Arc<EpochManager> {
&self.epoch_mgr
}
}
impl<S: PageStorage + Send + Sync + 'static> PagedNodeStorage<S> {
pub fn new<P: AsRef<Path>>(storage_path: P, _manifest_path: P) -> Result<Self, std::io::Error> {
Ok(Self {
store: Arc::new(S::open(storage_path)?),
epoch_mgr: Arc::new(EpochManager::new()),
cache: RwLock::new(HashMap::new()),
})
}
pub fn from_parts(store: Arc<S>, epoch_mgr: Arc<EpochManager>) -> Self {
Self {
store,
epoch_mgr,
cache: RwLock::new(HashMap::new()),
}
}
pub fn page_storage(&self) -> &S {
&self.store
}
pub fn page_storage_shared(&self) -> Arc<S> {
Arc::clone(&self.store)
}
}
impl<S: PageStorage + Send + Sync + 'static> NodeStorage for PagedNodeStorage<S> {
fn read_node_view(&self, page_id: u64) -> Result<Option<NodeView>, StorageError> {
{
let cache = self.cache.read().unwrap();
if let Some(&view) = cache.get(&page_id) {
return Ok(Some(view));
}
}
let mut buf = [0u8; PAGE_SIZE];
self.store.read_page(page_id, &mut buf)?;
let mut view = NoopNodeViewCodec::decode(&buf)?;
view.set_page_id(page_id);
let mut cache = self.cache.write().unwrap();
cache.insert(page_id, view);
Ok(Some(view))
}
fn write_node_view(&self, node_view: &NodeView) -> Result<u64, StorageError> {
let buf = NoopNodeViewCodec::encode(node_view)?;
let page_id = self.store.write_page(buf)?;
let mut cached = *node_view;
cached.set_page_id(page_id);
let mut cache = self.cache.write().unwrap();
cache.insert(page_id, cached);
Ok(page_id)
}
fn write_node_view_at_offset(
&self,
node_view: &NodeView,
offset: u64,
) -> Result<u64, StorageError> {
let buf = NoopNodeViewCodec::encode(node_view)?;
let page_id = self.store.write_page_at_offset(offset, buf)?;
let mut cached = *node_view;
cached.set_page_id(page_id);
let mut cache = self.cache.write().unwrap();
cache.insert(page_id, cached);
Ok(page_id)
}
fn flush(&self) -> Result<(), StorageError> {
self.store.flush().map_err(StorageError::Io)
}
fn free_node(&self, id: u64) -> Result<(), StorageError> {
{
let mut cache = self.cache.write().unwrap();
cache.remove(&id);
}
self.store.free_page(id).map_err(StorageError::Io)
}
}