use std::sync::atomic::AtomicBool;
use crate::{
error::ValidationErrorKind,
filter::{CombinedFilter, FilterTrait}, blob::DeleteResult,
};
use super::{prelude::*, read_result::BlobRecordTimestamp};
use bytes::Bytes;
use futures::stream::FuturesOrdered;
use tokio::fs::{create_dir, create_dir_all};
const BLOB_FILE_EXTENSION: &str = "blob";
#[derive(Debug)]
pub struct Storage<K>
where
for<'a> K: Key<'a>,
{
inner: Arc<Inner<K>>,
observer: Observer<K>
}
#[derive(Debug)]
pub(crate) struct Inner<K>
where
for<'a> K: Key<'a>,
{
config: Config,
safe: RwLock<Safe<K>>,
next_blob_id: AtomicUsize,
iodriver: IoDriver,
corrupted_blobs: AtomicUsize,
fsync_in_progress: AtomicBool,
}
#[derive(Debug)]
pub(crate) struct Safe<K>
where
for<'a> K: Key<'a>,
{
active_blob: Option<Box<ASRwLock<Blob<K>>>>,
blobs: Arc<RwLock<HierarchicalFilters<K, CombinedFilter<K>, Blob<K>>>>,
}
struct ReadBlobsResult<K>
where
for<'a> K: Key<'a>,
{
blobs: Vec<Blob<K>>,
new_corrupted_blob_count: usize,
max_blob_id: Option<usize>
}
async fn work_dir_content(wd: &Path) -> Result<Option<Vec<DirEntry>>> {
let mut files = Vec::new();
let mut dir = read_dir(wd).await?;
while let Some(file) = dir.next_entry().await.transpose() {
if let Ok(file) = file {
files.push(file);
}
}
let content = if files
.iter()
.filter_map(|file| Some(file.file_name().as_os_str().to_str()?.to_owned()))
.any(|name| name.ends_with(BLOB_FILE_EXTENSION))
{
debug!("working dir contains files, try init existing");
Some(files)
} else {
debug!("working dir is uninitialized, starting empty storage");
None
};
Ok(content)
}
impl<K> Storage<K>
where
for<'a> K: Key<'a> + 'static,
{
pub(crate) fn new(config: Config, iodriver: IoDriver) -> Self {
let inner = Arc::new(Inner::new(config, iodriver));
let observer = Observer::new(inner.clone());
Self { inner, observer }
}
pub async fn init(&mut self) -> Result<()> {
self.init_ext(true).await
}
pub async fn init_lazy(&mut self) -> Result<()> {
self.init_ext(false).await
}
async fn init_ext(&mut self, with_active: bool) -> Result<()> {
self.prepare_work_dir()
.await
.context("failed to prepare work dir")?;
let wd = self
.inner
.config
.work_dir()
.ok_or_else(|| Error::from(ErrorKind::Uninitialized))?;
let cont_res = work_dir_content(wd)
.await
.with_context(|| format!("failed to read work dir content: {}", wd.display()));
trace!("work dir content loaded");
if let Some(files) = cont_res? {
trace!("storage init from existing files");
self.init_from_existing(files, with_active)
.await
.context("failed to init from existing blobs")?
} else {
self.init_new().await?
};
trace!("new storage initialized");
self.launch_observer();
trace!("observer started");
Ok(())
}
pub async fn has_active_blob(&self) -> bool {
self.inner.has_active_blob().await
}
pub async fn try_create_active_blob(&self) -> Result<()> {
self.inner.create_active_blob().await
}
pub async fn create_active_blob_in_background(&self) {
self.observer.create_active_blob().await
}
pub async fn try_close_active_blob(&self) -> Result<()> {
let result = self.inner.close_active_blob().await;
self.observer.try_dump_old_blob_indexes().await;
result
}
pub async fn close_active_blob_in_background(&self) {
self.observer.close_active_blob().await;
self.observer.try_dump_old_blob_indexes().await
}
pub async fn try_restore_active_blob(&self) -> Result<()> {
self.inner.restore_active_blob().await
}
pub async fn restore_active_blob_in_background(&self) {
self.observer.restore_active_blob().await
}
pub async fn write(&self, key: impl AsRef<K>, value: Bytes, timestamp: BlobRecordTimestamp) -> Result<()> {
self.write_with_optional_meta(key, value, timestamp, None).await
}
pub async fn write_with(&self, key: impl AsRef<K>, value: Bytes, timestamp: BlobRecordTimestamp, meta: Meta) -> Result<()> {
self.write_with_optional_meta(key, value, timestamp, Some(meta)).await
}
pub async fn free_excess_resources(&self) -> usize {
let memory = self.inactive_index_memory().await;
self.observer.try_dump_old_blob_indexes().await;
memory
}
pub async fn inactive_index_memory(&self) -> usize {
let safe = self.inner.safe.read().await;
let blobs = safe.blobs.read().await;
blobs.iter().fold(0, |s, n| s + n.index_memory())
}
pub async fn index_memory(&self) -> usize {
self.active_index_memory().await + self.inactive_index_memory().await
}
async fn write_with_optional_meta(
&self,
key: impl AsRef<K>,
value: Bytes,
timestamp: BlobRecordTimestamp,
meta: Option<Meta>,
) -> Result<()> {
let key = key.as_ref();
debug!("storage write with {:?}, {}b, {:?}", key, value.len(), meta);
if self.try_create_active_blob().await.is_ok() {
info!("Active blob was set during write operation");
}
if !self.inner.config.allow_duplicates()
&& self.contains_with(key, meta.as_ref()).await?.is_found()
{
warn!(
"record with key {:?} and meta {:?} exists",
key.as_ref(),
meta
);
return Ok(());
}
let record = Record::create(key, timestamp.into(), value, meta)
.with_context(|| "storage write with record creation failed")?;
let safe = self.inner.safe.read().await;
let blob = safe
.active_blob
.as_ref()
.ok_or_else(Error::active_blob_not_set)?;
let result = Blob::write(blob, key, record).await.or_else::<anyhow::Error, _>(|err| {
let e = err.downcast::<Error>()?;
if let ErrorKind::FileUnavailable(kind) = e.kind() {
let work_dir = self
.inner
.config
.work_dir()
.ok_or_else(Error::uninitialized)?;
Err(Error::work_dir_unavailable(work_dir, e.to_string(), kind.to_owned()).into())
} else {
Err(e.into())
}
})?;
self.try_update_active_blob(blob).await?;
if self.inner.should_try_fsync(result.dirty_bytes) {
self.observer.try_fsync_data().await;
}
Ok(())
}
async fn try_update_active_blob(&self, active_blob: &Box<ASRwLock<Blob<K>>>) -> Result<()> {
let config_max_size = self
.inner
.config
.max_blob_size()
.ok_or_else(|| Error::from(ErrorKind::Uninitialized))?;
let config_max_count = self
.inner
.config
.max_data_in_blob()
.ok_or_else(|| Error::from(ErrorKind::Uninitialized))?;
let active_blob = active_blob.read().await;
if active_blob.file_size() >= config_max_size
|| active_blob.records_count() as u64 >= config_max_count
{
let dur = active_blob.created_at().elapsed().map_err(|e| e.duration());
let dur = match dur {
Ok(d) => d,
Err(d) => d,
};
if dur.as_millis() > self.inner.config.debounce_interval_ms() as u128 {
self.observer.try_update_active_blob().await;
}
}
Ok(())
}
#[inline]
pub async fn read(&self, key: impl AsRef<K>) -> Result<ReadResult<Bytes>> {
let key = key.as_ref();
debug!("storage read {:?}", key);
self.read_with_optional_meta(key, None).await
}
#[inline]
pub async fn read_with(&self, key: impl AsRef<K>, meta: &Meta) -> Result<ReadResult<Bytes>> {
let key = key.as_ref();
debug!("storage read with {:?}", key);
self.read_with_optional_meta(key, Some(meta))
.await
.with_context(|| "read with optional meta failed")
}
pub async fn read_all(&self, key: impl AsRef<K>) -> Result<Vec<Entry>> {
let mut entries = self.read_all_with_deletion_marker(key).await?;
if let Some(e) = entries.last() {
if e.is_deleted() {
entries.truncate(entries.len() - 1);
}
}
Ok(entries)
}
pub async fn read_all_with_deletion_marker(&self, key: impl AsRef<K>) -> Result<Vec<Entry>> {
let key = key.as_ref();
let mut all_entries = Vec::new();
let mut deletion_marker_presence = false;
let mut affected_blobs_count = 0;
let safe = self.inner.safe.read().await;
let active_blob = safe.active_blob.as_ref();
if let Some(active_blob) = active_blob {
let entries = active_blob
.read()
.await
.read_all_entries_with_deletion_marker(key)
.await?;
debug!(
"storage core read all active blob entries {}",
entries.len()
);
deletion_marker_presence = deletion_marker_presence || entries.last().map(|e| e.is_deleted()).unwrap_or(false);
affected_blobs_count += if entries.len() > 0 { 1 } else { 0 };
all_entries.extend(entries);
}
let blobs = safe.blobs.read().await;
let mut futures = blobs
.iter_possible_childs_rev(key)
.map(|b| b.1.data.read_all_entries_with_deletion_marker(key))
.collect::<FuturesOrdered<_>>();
while let Some(data) = futures.next().await {
let entries = data?;
deletion_marker_presence = deletion_marker_presence || entries.last().map(|e| e.is_deleted()).unwrap_or(false);
affected_blobs_count += if entries.len() > 0 { 1 } else { 0 };
all_entries.extend(entries);
}
debug!(
"storage core read from non-active total {} entries",
all_entries.len()
);
if affected_blobs_count > 1 {
all_entries.sort_by(|a, b| b.timestamp().cmp(&a.timestamp()));
if deletion_marker_presence {
let first_del = all_entries.iter().position(|h| h.is_deleted());
if let Some(first_del) = first_del {
all_entries.truncate(first_del + 1);
}
}
}
Ok(all_entries)
}
async fn get_latest_entry(
safe: &Safe<K>,
key: &K,
meta: Option<&Meta>,
) -> Result<ReadResult<Entry>> {
let mut latest_entry: ReadResult<Entry> = ReadResult::NotFound;
if let Some(ablob) = safe.active_blob.as_ref() {
let ablob_entry = ablob.read().await.get_latest_entry(key, meta, true).await.map_err(|err| {
debug!("get_latest_entry from active blob returned error: {:?}", err);
err
})?;
latest_entry = latest_entry.latest(ablob_entry);
}
let blobs = safe.blobs.read().await;
let mut stream = blobs
.iter_possible_childs_rev(key)
.map(|(_, blob)| blob.data.get_latest_entry(key, meta, true))
.collect::<FuturesOrdered<_>>();
while let Some(entry) = stream.next().await {
let entry = entry.map_err(|err| {
debug!("get_latest_entry from closed blob returned error: {:?}", err);
err
})?;
latest_entry = latest_entry.latest(entry);
}
Ok(latest_entry)
}
async fn read_with_optional_meta(
&self,
key: &K,
meta: Option<&Meta>,
) -> Result<ReadResult<Bytes>> {
debug!("storage read with optional meta {:?}, {:?}", key, meta);
let safe = self.inner.safe.read().await;
let latest_entry = Self::get_latest_entry(&safe, key, meta).await?;
match latest_entry {
ReadResult::Found(entry) => {
trace!("Storage::read_with_optional_meta: entry found");
let buf = entry
.load()
.await
.with_context(|| format!("Failed to read data for key {:?} with meta {:?}", key, meta))?
.into_data();
trace!("Storage::read_with_optional_meta: loaded bytes: {}", buf.len());
Ok(ReadResult::Found(buf))
},
ReadResult::Deleted(ts) => Ok(ReadResult::Deleted(ts)),
ReadResult::NotFound => Ok(ReadResult::NotFound)
}
}
pub async fn close(self) -> Result<()> {
let mut res = Ok(());
{
let mut safe = self.inner.safe.write().await;
let active_blob = safe.active_blob.take();
if let Some(blob) = active_blob {
let mut blob = blob.write().await;
res = res.and(
blob.dump()
.await
.map(|_| info!("active blob dumped"))
.with_context(|| format!("blob {} dump failed", blob.name())),
)
}
};
self.observer.shutdown().await;
res
}
pub async fn blobs_count(&self) -> usize {
let safe = self.inner.safe.read().await;
let count = safe.blobs.read().await.len();
if safe.active_blob.is_some() {
count + 1
} else {
count
}
}
pub fn corrupted_blobs_count(&self) -> usize {
self.inner.corrupted_blobs.load(Ordering::Acquire)
}
pub async fn active_index_memory(&self) -> usize {
let safe = self.inner.safe.read().await;
if let Some(ablob) = safe.active_blob.as_ref() {
ablob.read().await.index_memory()
} else {
0
}
}
pub async fn disk_used(&self) -> u64 {
let safe = self.inner.safe.read().await;
let lock = safe.blobs.read().await;
let mut result = 0;
if let Some(ablob) = safe.active_blob.as_ref() {
result += ablob.read().await.disk_used();
}
for blob in lock.iter() {
result += blob.disk_used();
}
result
}
#[must_use]
pub fn next_blob_id(&self) -> usize {
self.inner.next_blob_id.load(Ordering::Acquire)
}
async fn prepare_work_dir(&mut self) -> Result<()> {
let work_dir = self.inner.config.work_dir().ok_or_else(|| {
error!("Work dir is not set");
Error::uninitialized()
})?;
let path = Path::new(work_dir);
if path.exists() {
debug!("work dir exists: {}", path.display());
} else if self.inner.config.create_work_dir() {
debug!("creating work dir recursively: {}", path.display());
create_dir_all(path).await?;
} else {
error!("work dir path not found: {}", path.display());
return Err(Error::work_dir_unavailable(
path,
"work dir path not found".to_owned(),
IOErrorKind::NotFound,
))
.with_context(|| "failed to prepare work dir");
}
Ok(())
}
async fn init_new(&mut self) -> Result<()> {
let corrupted = Self::count_old_corrupted_blobs(&self.inner.config).await;
self.inner
.corrupted_blobs
.store(corrupted, Ordering::Release);
let next = self.inner.next_blob_name()?;
let mut safe = self.inner.safe.write().await;
let blob =
Blob::open_new(next, self.inner.iodriver.clone(), self.inner.config.blob()).await?;
safe.active_blob = Some(Box::new(ASRwLock::new(blob)));
Ok(())
}
async fn init_from_existing(&mut self, files: Vec<DirEntry>, with_active: bool) -> Result<()> {
trace!("init from existing: {:#?}", files);
let existed_corrupted_blob_count = Self::count_old_corrupted_blobs(&self.inner.config).await;
let disk_access_sem = self.inner.get_dump_sem();
let ReadBlobsResult { mut blobs, max_blob_id, new_corrupted_blob_count} = Self::read_blobs(
&files,
self.inner.iodriver.clone(),
disk_access_sem,
&self.inner.config,
)
.await
.context("failed to read blobs")?;
self.inner
.corrupted_blobs
.store(existed_corrupted_blob_count + new_corrupted_blob_count, Ordering::Release);
self.inner
.next_blob_id
.store(max_blob_id.map_or(0, |i| i + 1), Ordering::Release);
debug!("{} blobs successfully created", blobs.len());
blobs.sort_by_key(Blob::id);
let active_blob = if with_active {
if blobs.is_empty() && new_corrupted_blob_count > 0 {
let next = self.inner.next_blob_name()?;
Some(Blob::open_new(next, self.inner.iodriver.clone(), self.inner.config.blob()).await?)
} else {
Some(Self::pop_active(&mut blobs, &self.inner.config).await?)
}
} else {
None
};
for blob in &mut blobs {
debug!("dump all blobs except active blob");
blob.dump().await?;
}
let mut safe = self.inner.safe.write().await;
safe.active_blob = active_blob.map(|ab| Box::new(ASRwLock::new(ab)));
*safe.blobs.write().await =
HierarchicalFilters::from_vec(self.inner.config.bloom_filter_group_size(), 1, blobs).await;
self.inner.next_blob_id.fetch_max(safe.max_id().await.map_or(0, |i| i + 1), Ordering::AcqRel);
Ok(())
}
async fn pop_active(blobs: &mut Vec<Blob<K>>, config: &Config) -> Result<Blob<K>> {
let mut active_blob = blobs
.pop()
.ok_or_else(|| {
let wd = config.work_dir();
error!("No blobs in {:?} to create an active one", wd);
Error::from(ErrorKind::Uninitialized)
})?;
active_blob.load_index().await?;
Ok(active_blob)
}
async fn read_blobs(
files: &[DirEntry],
iodriver: IoDriver,
disk_access_sem: Arc<Semaphore>,
config: &Config,
) -> Result<ReadBlobsResult<K>> {
let mut corrupted = 0;
let mut max_blob_id: Option<usize> = None;
debug!("read working directory content");
let dir_content = files.iter().map(DirEntry::path);
debug!("read {} entities", dir_content.len());
let dir_files = dir_content.filter(|path| path.is_file());
debug!("filter potential blob files");
let blob_files = dir_files.filter_map(|path| {
if path.extension()?.to_str()? == BLOB_FILE_EXTENSION {
Some(path)
} else {
None
}
});
debug!("init blobs from found files");
let mut futures: FuturesUnordered<_> = blob_files
.map(|file| async {
let sem = disk_access_sem.clone();
let _sem = sem.acquire().await.expect("sem is closed");
Blob::from_file(file.clone(), iodriver.clone(), config.blob())
.await
.map_err(|e| (e, file))
})
.collect();
debug!("async init blobs from file");
let mut blobs = Vec::new();
while let Some(blob_res) = futures.next().await {
match blob_res {
Ok(blob) => {
max_blob_id = max_blob_id.max(Some(blob.id()));
blobs.push(blob);
},
Err((e, file)) => {
let msg = format!("Failed to read existing blob: {}", file.display());
if let Ok(file_name) = blob::FileName::from_path(&file) {
max_blob_id = max_blob_id.max(Some(file_name.id()));
}
if config.ignore_corrupted() {
error!("{}, cause: {:#}", msg, e);
} else if Self::should_save_corrupted_blob(&e) {
warn!(
"Corrupted BLOB detected. Save corrupted blob '{}' to directory '{}'",
file.display(),
config.corrupted_dir_name()
);
Self::save_corrupted_blob(&file, config.corrupted_dir_name())
.await
.with_context(|| {
anyhow!(format!("failed to save corrupted blob {:?}", file))
})?;
corrupted += 1;
} else {
return Err(e.context(msg));
}
}
}
}
Ok(ReadBlobsResult { blobs, new_corrupted_blob_count: corrupted, max_blob_id })
}
async fn count_old_corrupted_blobs(config: &Config) -> usize {
let mut corrupted = 0;
if let Some(work_dir_path) = config.work_dir() {
let mut corrupted_dir_path = work_dir_path.to_path_buf();
corrupted_dir_path.push(config.corrupted_dir_name());
if corrupted_dir_path.exists() {
let dir = read_dir(&corrupted_dir_path).await;
if let Err(e) = dir {
warn!(
"can't read corrupted blob dir {}: {}",
corrupted_dir_path.display(),
e
);
return corrupted;
}
let mut dir = dir.unwrap();
while let Ok(Some(file)) = dir.next_entry().await {
let path = file.path();
if path.is_file() {
let extension = path.extension();
if let Some(BLOB_FILE_EXTENSION) = extension.and_then(|ext| ext.to_str()) {
corrupted += 1;
}
}
}
}
}
corrupted
}
fn should_save_corrupted_blob(error: &anyhow::Error) -> bool {
debug!("decide wether to save corrupted blobs: {:#}", error);
if let Some(error) = error.downcast_ref::<Error>() {
return match error.kind() {
ErrorKind::Bincode(_) => true,
ErrorKind::Validation { kind, cause: _ } => {
!matches!(kind, ValidationErrorKind::BlobVersion)
}
_ => false,
};
}
false
}
async fn save_corrupted_blob(path: &Path, corrupted_dir_name: &str) -> Result<()> {
let parent = path
.parent()
.ok_or_else(|| anyhow!("[{}] blob path don't have parent directory", path.display()))?;
let file_name = path
.file_name()
.ok_or_else(|| anyhow!("[{}] blob path don't have file name", path.display()))?
.to_os_string();
let corrupted_dir_path = parent.join(corrupted_dir_name);
let corrupted_path = corrupted_dir_path.join(file_name);
if corrupted_dir_path.exists() {
debug!("{} dir exists", path.display());
} else {
debug!("creating dir for corrupted files: {}", path.display());
create_dir(corrupted_dir_path).await.with_context(|| {
format!(
"failed to create dir for corrupted files: {}",
path.display()
)
})?;
}
tokio::fs::rename(&path, &corrupted_path)
.await
.with_context(|| {
anyhow!(format!(
"failed to move file {:?} to {:?}",
path, corrupted_path
))
})?;
Self::remove_index_by_blob_path(path).await?;
Ok(())
}
async fn remove_index_by_blob_path(path: &Path) -> Result<()> {
let index_path = path.with_extension(blob::BLOB_INDEX_FILE_EXTENSION);
if index_path.exists() {
tokio::fs::remove_file(&index_path)
.await
.with_context(|| anyhow!(format!("failed to remove file {:?}", index_path)))?;
}
Ok(())
}
pub async fn contains(&self, key: impl AsRef<K>) -> Result<ReadResult<BlobRecordTimestamp>> {
self.contains_with(key.as_ref(), None).await
}
async fn contains_with(
&self,
key: &K,
meta: Option<&Meta>,
) -> Result<ReadResult<BlobRecordTimestamp>> {
let safe = self.inner.safe.read().await;
let latest_result = Self::get_latest_entry(&safe, key, meta).await?;
Ok(latest_result.map(|entry| entry.timestamp()))
}
pub async fn check_filters(&self, key: impl AsRef<K>) -> Option<bool> {
let key = key.as_ref();
trace!("[{:?}] check in blobs bloom filter", key);
let inner = self.inner.safe.read().await;
let in_active = if let Some(active_blob) = inner.active_blob.as_ref() {
active_blob.read().await.check_filter(key).await
} else {
FilterResult::NotContains
};
if in_active == FilterResult::NeedAdditionalCheck {
return Some(true);
}
let blobs = inner.blobs.read().await;
let (offloaded, in_memory): (Vec<&Blob<K>>, Vec<&Blob<K>>) =
blobs.iter().partition(|blob| {
blob.get_filter_fast()
.map_or(false, |filter| filter.is_filter_offloaded())
});
let in_closed = in_memory
.iter()
.any(|blob| blob.check_filter_fast(key) == FilterResult::NeedAdditionalCheck);
if in_closed {
return Some(true);
}
let in_closed_offloaded = offloaded
.iter()
.map(|blob| blob.check_filter(key))
.collect::<FuturesUnordered<_>>()
.any(|value| value == FilterResult::NeedAdditionalCheck)
.await;
Some(in_closed_offloaded)
}
pub async fn records_count(&self) -> usize {
self.inner.records_count().await
}
pub async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
self.inner.records_count_detailed().await
}
pub async fn records_count_in_active_blob(&self) -> Option<usize> {
self.inner.records_count_in_active_blob().await
}
pub async fn fsyncdata(&self) -> IOResult<()> {
self.inner.fsyncdata().await
}
pub async fn force_update_active_blob(&self, predicate: ActiveBlobPred) {
self.observer.force_update_active_blob(predicate).await;
self.observer.try_dump_old_blob_indexes().await
}
fn launch_observer(&mut self) {
self.observer.run();
}
pub async fn delete(&self, key: impl AsRef<K>, timestamp: BlobRecordTimestamp, only_if_presented: bool) -> Result<u64> {
self.delete_with_optional_meta(key, timestamp, None, only_if_presented).await
}
pub async fn delete_with(&self, key: impl AsRef<K>, timestamp: BlobRecordTimestamp, meta: Meta, only_if_presented: bool) -> Result<u64> {
self.delete_with_optional_meta(key, timestamp, Some(meta), only_if_presented).await
}
async fn delete_with_optional_meta(&self, key: impl AsRef<K>, timestamp: BlobRecordTimestamp, meta: Option<Meta>, only_if_presented: bool) -> Result<u64> {
{
let safe = self.inner.safe.read().await;
if only_if_presented || safe.active_blob.is_some() {
return self.delete_core(&safe, key.as_ref(), timestamp, meta, only_if_presented).await;
}
}
let mut safe = self.inner.safe.write().await;
if !only_if_presented {
self.inner.ensure_active_blob_exists(&mut safe).await?;
}
return self.delete_core(&mut safe, key.as_ref(), timestamp, meta, only_if_presented).await;
}
async fn delete_core(&self, safe: &Safe<K>, key: &K, timestamp: BlobRecordTimestamp, meta: Option<Meta>, only_if_presented: bool) -> Result<u64> {
let deleted_in_active_result = Self::delete_in_active(safe, key, timestamp, meta.clone(), only_if_presented).await?;
let deleted_in_active = deleted_in_active_result.as_ref().map(|r| if r.deleted { 1 } else { 0 }).unwrap_or(0);
let deleted_in_closed = Self::delete_in_closed(safe, key, timestamp, meta).await?;
if deleted_in_closed > 0 {
self.observer.defer_dump_old_blob_indexes().await;
}
if let Some(result) = deleted_in_active_result {
if self.inner.should_try_fsync(result.dirty_bytes) {
self.observer.try_fsync_data().await;
}
}
debug!("{} deleted total", deleted_in_active + deleted_in_closed);
Ok(deleted_in_active + deleted_in_closed)
}
async fn delete_in_closed(safe: &Safe<K>, key: &K, timestamp: BlobRecordTimestamp, meta: Option<Meta>) -> Result<u64> {
const DELETE_ONLY_IF_PRESENTED: bool = true; let mut blobs = safe.blobs.write().await;
let entries_closed_blobs = blobs
.iter_mut()
.map(|b| b.delete(key, timestamp, meta.clone(), DELETE_ONLY_IF_PRESENTED))
.collect::<FuturesUnordered<_>>();
let total = entries_closed_blobs
.map(|result| match result {
Ok(result) => {
if result.deleted {
1
} else {
0
}
}
Err(error) => {
warn!("failed to delete records: {}", error);
0
}
})
.fold(0, |s, n| s + n)
.await;
debug!("{} deleted from closed blobs", total);
Ok(total as u64)
}
async fn delete_in_active(
safe: &Safe<K>,
key: &K,
timestamp: BlobRecordTimestamp,
meta: Option<Meta>,
only_if_presented: bool,
) -> Result<Option<DeleteResult>> {
if !only_if_presented {
assert!(safe.active_blob.is_some(), "Active BLOB should be initialized before calling 'mark_all_as_deleted_active'");
}
let active_blob = safe.active_blob.as_deref();
if let Some(active_blob) = active_blob {
let result = active_blob
.write()
.await
.delete(key, timestamp, meta, only_if_presented)
.await?;
debug!("Deleted {} records from active blob", if result.deleted { 1 } else { 0 });
Ok(Some(result))
} else {
Ok(None)
}
}
}
impl<K> Inner<K>
where
for<'a> K: Key<'a> + 'static,
{
fn new(config: Config, iodriver: IoDriver) -> Self {
Self {
safe: RwLock::new(Safe::new(config.bloom_filter_group_size())),
config,
next_blob_id: AtomicUsize::new(0),
iodriver,
corrupted_blobs: AtomicUsize::new(0),
fsync_in_progress: AtomicBool::new(false)
}
}
pub(crate) fn config(&self) -> &Config {
&self.config
}
pub(crate) fn io_driver(&self) -> &IoDriver {
&self.iodriver
}
pub(crate) fn safe(&self) -> &RwLock<Safe<K>> {
&self.safe
}
fn get_dump_sem(&self) -> Arc<Semaphore> {
self.config.dump_sem()
}
pub(crate) async fn restore_active_blob(&self) -> Result<()> {
if self.has_active_blob().await {
return Err(Error::active_blob_already_exists().into());
}
let mut safe = self.safe.write().await;
if let None = safe.active_blob {
let blob_opt = safe.blobs.write().await.pop();
if let Some(blob) = blob_opt {
safe.active_blob = Some(Box::new(ASRwLock::new(blob)));
Ok(())
} else {
Err(Error::uninitialized().into())
}
} else {
Err(Error::active_blob_not_set().into())
}
}
pub(crate) async fn create_active_blob(&self) -> Result<()> {
if self.has_active_blob().await {
return Err(Error::active_blob_already_exists().into());
}
let mut safe = self.safe.write().await;
self.ensure_active_blob_exists(&mut *safe).await
}
async fn ensure_active_blob_exists(&self, safe: &mut Safe<K>) -> Result<()> {
if let None = safe.active_blob {
let next = self.next_blob_name()?;
let blob = Blob::open_new(next, self.iodriver.clone(), self.config.blob()).await?;
safe.active_blob = Some(Box::new(ASRwLock::new(blob)));
Ok(())
} else {
Ok(())
}
}
pub(crate) async fn close_active_blob(&self) -> Result<()> {
if !self.has_active_blob().await {
return Err(Error::active_blob_doesnt_exist().into());
}
let mut safe = self.safe.write().await;
if safe.active_blob.is_none() {
Err(Error::active_blob_doesnt_exist().into())
} else {
if let Some(ablob) = safe.active_blob.take() {
let ablob = (*ablob).into_inner();
ablob.fsyncdata().await?;
safe.blobs.write().await.push(ablob).await;
}
Ok(())
}
}
pub(crate) async fn has_active_blob(&self) -> bool {
if let None = self.safe.read().await.active_blob {
false
} else {
true
}
}
pub(crate) async fn active_blob_stat(&self) -> Option<ActiveBlobStat> {
if let Some(ablob) = self.safe.read().await.active_blob.as_ref() {
let ablob = ablob.read().await;
let records_count = ablob.records_count();
let index_memory = ablob.index_memory();
let file_size = ablob.file_size() as usize;
Some(ActiveBlobStat::new(records_count, index_memory, file_size))
} else {
None
}
}
pub(crate) fn next_blob_name(&self) -> Result<blob::FileName> {
let next_id = self.next_blob_id.fetch_add(1, Ordering::AcqRel);
let name_prefix = self
.config
.blob_file_name_prefix()
.ok_or_else(|| {
error!("Blob file name prefix is not set");
Error::uninitialized()
})?;
let dir = self
.config
.work_dir()
.ok_or_else(|| {
error!("Work dir is not set");
Error::uninitialized()
})?;
Ok(blob::FileName::new(
name_prefix,
next_id,
BLOB_FILE_EXTENSION,
dir,
))
}
async fn records_count(&self) -> usize {
self.safe.read().await.records_count().await
}
async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
self.safe.read().await.records_count_detailed().await
}
async fn records_count_in_active_blob(&self) -> Option<usize> {
let inner = self.safe.read().await;
if let Some(ablob) = inner.active_blob.as_ref() {
Some(ablob.read().await.records_count())
} else {
None
}
}
pub(crate) async fn fsyncdata(&self) -> IOResult<()> {
if self.fsync_in_progress.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire).is_err() {
return Ok(())
}
let _flag = ResetableFlag { flag: &self.fsync_in_progress };
let safe = self.safe.read().await;
if let Some(ablob) = &safe.active_blob {
let ablob = ablob.read().await;
if !self.too_many_dirty_bytes(ablob.file_dirty_bytes()) {
return Ok(());
}
}
safe.fsyncdata().await
}
pub(crate) async fn try_dump_old_blob_indexes(&self) {
Safe::try_dump_old_blob_indexes(&self.safe, self.get_dump_sem(), Duration::from_millis(200)).await;
}
pub(crate) fn should_try_fsync(&self, dirty_bytes: u64) -> bool {
self.too_many_dirty_bytes(dirty_bytes) && !self.fsync_in_progress.load(Ordering::Acquire)
}
fn too_many_dirty_bytes(&self, dirty_bytes: u64) -> bool {
dirty_bytes > self.config().max_dirty_bytes_before_sync()
}
}
struct ResetableFlag<'a> {
flag: &'a AtomicBool
}
impl<'a> Drop for ResetableFlag<'a> {
fn drop(&mut self) {
self.flag.store(false, Ordering::Release);
}
}
impl<K> Safe<K>
where
for<'a> K: Key<'a> + 'static,
{
fn new(group_size: usize) -> Self {
Self {
active_blob: None,
blobs: Arc::new(RwLock::new(HierarchicalFilters::new(group_size, 1))),
}
}
async fn max_id(&self) -> Option<usize> {
let mut id = None;
if let Some(ablob) = self.active_blob.as_ref() {
id = Some(ablob.read().await.id());
}
let blobs_max_id = self.blobs.read().await.last().map(|x| x.id());
id.max(blobs_max_id)
}
async fn records_count(&self) -> usize {
let details = self.records_count_detailed().await;
details.iter().fold(0, |acc, (_, count)| acc + count)
}
async fn records_count_detailed(&self) -> Vec<(usize, usize)> {
let mut results = Vec::new();
let blobs = self.blobs.read().await;
for blob in blobs.iter() {
let count = blob.records_count();
let value = (blob.id(), count);
debug!("push: {:?}", value);
results.push(value);
}
if let Some(blob) = self.active_blob.as_ref() {
let value = (blobs.len(), blob.read().await.records_count());
debug!("push: {:?}", value);
results.push(value);
}
results
}
async fn fsyncdata(&self) -> IOResult<()> {
if let Some(ref blob) = self.active_blob {
blob.read().await.fsyncdata().await?;
}
Ok(())
}
pub(crate) async fn read_active_blob<'a>(&'a self) -> Option<async_lock::RwLockReadGuard<'a, Blob<K>>> {
match &self.active_blob {
None => None,
Some(blob) => Some(blob.read().await)
}
}
pub(crate) async fn replace_active_blob(&mut self, blob: Blob<K>) -> Result<()> {
let old_active = self.active_blob.replace(Box::new(ASRwLock::new(blob)));
if let Some(blob) = old_active {
self.blobs.write().await.push(blob.into_inner()).await;
}
Ok(())
}
pub(crate) async fn try_dump_old_blob_indexes(safe: &RwLock<Self>, dump_sem: Arc<Semaphore>, max_quantum: Duration) {
trace!("dump indexes for old blobs started");
let mut finished = false;
let mut current_progress: usize = 0;
while !finished {
finished = true;
let safe_guard = safe.read().await;
let mut write_blobs = safe_guard.blobs.write().await;
let quantum_start = Instant::now();
let progress_start = current_progress; for blob in write_blobs.iter_mut().skip(current_progress) {
if current_progress > progress_start && quantum_start.elapsed() > max_quantum {
finished = false;
break;
}
current_progress += 1;
let _ = dump_sem.acquire().await;
if let Err(e) = blob.dump().await {
error!("Error dumping blob ({}): {}", blob.name(), e);
} else {
trace!("finished dumping old blob: {}", blob.name());
}
}
}
trace!("dump indexes for old blobs finished");
}
}
#[async_trait::async_trait]
impl<K> BloomProvider<K> for Storage<K>
where
for<'a> K: Key<'a> + 'static,
{
type Filter = <Blob<K> as BloomProvider<K>>::Filter;
async fn check_filter(&self, item: &K) -> FilterResult {
let inner = self.inner.safe.read().await;
let active = if let Some(ablob) = inner.active_blob.as_ref() {
ablob.read().await.check_filter_fast(item)
} else {
FilterResult::default()
};
let ret = inner.blobs.read().await.check_filter(item).await;
ret + active
}
fn check_filter_fast(&self, _item: &K) -> FilterResult {
FilterResult::NeedAdditionalCheck
}
async fn offload_buffer(&mut self, needed_memory: usize, level: usize) -> usize {
let inner = self.inner.safe.read().await;
let ret = inner
.blobs
.write()
.await
.offload_buffer(needed_memory, level)
.await;
ret
}
async fn get_filter(&self) -> Option<Self::Filter> {
let inner = self.inner.safe.read().await;
let mut ret = inner.blobs.read().await.get_filter_fast().cloned();
if let Some(filter) = &mut ret {
if let Some(ablob) = inner.active_blob.as_ref() {
let ablob = ablob.read().await;
if let Some(active_filter) = ablob.get_filter_fast() {
if !filter.checked_add_assign(active_filter) {
return None;
}
}
}
}
ret
}
fn get_filter_fast(&self) -> Option<&Self::Filter> {
None
}
async fn filter_memory_allocated(&self) -> usize {
let safe = self.inner.safe.read().await;
let active = if let Some(blob) = safe.active_blob.as_ref() {
blob.read().await.filter_memory_allocated().await
} else {
0
};
let closed = safe.blobs.read().await.filter_memory_allocated().await;
active + closed
}
}