mod bucket;
mod task_queue;
mod transaction;
use std::{
collections::hash_map::Entry,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};
use rspack_parallel::TryFutureConsumer;
use rustc_hash::FxHashMap as HashMap;
use tokio::sync::Mutex;
use self::{bucket::Bucket, task_queue::TaskQueue, transaction::Transaction};
use super::ScopeFileSystem;
use crate::{Error, Result};
type BucketChanges = HashMap<String, Vec<(Vec<u8>, Option<Vec<u8>>)>>;
#[derive(Debug)]
pub struct DB {
fs: ScopeFileSystem,
buckets: Arc<Mutex<HashMap<String, Bucket>>>,
task_queue: Arc<TaskQueue>,
readonly: Arc<AtomicBool>,
}
impl DB {
pub fn new(fs: ScopeFileSystem) -> Self {
Self {
fs,
buckets: Default::default(),
task_queue: Arc::new(TaskQueue::default()),
readonly: Arc::new(AtomicBool::new(false)),
}
}
pub async fn bucket_names(&self) -> Result<Vec<String>> {
self.fs.ensure_exist().await?;
let entries = self.fs.list_child().await?;
let mut bucket_names = Vec::new();
for entry in entries {
if !entry.starts_with('.')
&& let Ok(metadata) = self.fs.stat(&entry).await
&& metadata.is_directory
{
bucket_names.push(entry);
}
}
bucket_names.sort();
Ok(bucket_names)
}
pub async fn load(&self, bucket_name: &str) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut buckets = self.buckets.lock().await;
self.fs.ensure_exist().await?;
Transaction::ensure_committed(&self.fs).await?;
let bucket = match buckets.entry(bucket_name.to_string()) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
let fs = self.fs.child_fs(bucket_name);
let bucket = Bucket::new(fs).await?;
entry.insert(bucket)
}
};
bucket.load_all().await
}
pub fn save(&self, changes: BucketChanges, max_pack_size: usize) {
let fs = self.fs.clone();
let buckets = self.buckets.clone();
let readonly = self.readonly.clone();
self.task_queue.add_task(async move {
if readonly.load(Ordering::Relaxed) {
return;
}
if changes.is_empty() {
return;
}
let task_fn = async move || -> Result<()> {
let mut buckets = buckets.lock().await;
let transaction = Transaction::new(&fs).await?;
let mut all_files_to_add = Vec::new();
let mut all_files_to_remove = Vec::new();
let mut updated_buckets = HashMap::default();
let save_result = changes
.into_iter()
.map(|(bucket_name, changes)| {
let cached_bucket = buckets.remove(&bucket_name);
let readable_fs = transaction.readable_fs().child_fs(&bucket_name);
let writable_fs = transaction.writable_fs().child_fs(&bucket_name);
async move {
let mut bucket = if let Some(bucket) = cached_bucket {
bucket
} else {
Bucket::new(readable_fs).await?
};
let affacted_files = bucket
.save(Some(writable_fs), changes, max_pack_size)
.await?;
Ok::<_, Error>((bucket_name, bucket, affacted_files))
}
})
.try_fut_consume(|(bucket_name, bucket, affacted_files)| {
let (added_pack, removed_pack) = affacted_files;
updated_buckets.insert(bucket_name.clone(), bucket);
all_files_to_add.extend(
added_pack
.into_iter()
.map(|file| format!("{bucket_name}/{file}")),
);
all_files_to_remove.extend(
removed_pack
.into_iter()
.map(|file| format!("{bucket_name}/{file}")),
);
})
.await;
match save_result {
Ok(()) => {
transaction
.commit(all_files_to_add, all_files_to_remove)
.await?;
buckets.extend(updated_buckets);
}
Err(e) => {
transaction.rollback().await?;
return Err(e);
}
}
Ok(())
};
if let Err(err) = task_fn().await {
readonly.store(true, Ordering::Relaxed);
println!(
"Rspack persistent cache save failed: {err}\n \
Persistent cache has been disabled for this session. \
Restart the process to re-enable it."
);
}
});
}
pub async fn flush(&self) {
self.task_queue.flush().await;
}
pub fn reset(&self, scope: &str) {
let scope = scope.to_string();
let fs = self.fs.clone();
let buckets = self.buckets.clone();
let readonly = self.readonly.clone();
self.task_queue.add_task(async move {
if readonly.load(Ordering::Relaxed) {
return;
}
let mut buckets = buckets.lock().await;
if let Err(err) = fs.child_fs(&scope).remove().await {
readonly.store(true, Ordering::Relaxed);
println!(
"Rspack persistent cache reset scope {scope} failed: {err}\n \
Persistent cache has been disabled for this session. \
Restart the process to re-enable it."
);
}
buckets.remove(&scope);
});
}
}
#[cfg(test)]
mod test {
use super::{DB, HashMap, Result, ScopeFileSystem};
#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_db() -> Result<()> {
let fs = ScopeFileSystem::new_memory_fs("/".into());
let db = DB::new(fs);
let name_1 = "name1";
let name_2 = "name2";
assert!(db.bucket_names().await?.is_empty());
assert!(db.load(name_1).await?.is_empty());
let bucket_data: Vec<_> = (0..9)
.map(|num| {
(
format!("key{num}").as_bytes().to_vec(),
Some(format!("value{num}").as_bytes().to_vec()),
)
})
.collect();
let mut data = HashMap::default();
data.insert(String::from(name_1), bucket_data.clone());
data.insert(String::from(name_2), bucket_data);
db.save(data, 25);
db.flush().await;
let mut data1 = db.load(name_1).await?;
data1.sort();
let mut data2 = db.load(name_2).await?;
data2.sort();
assert_eq!(data1.len(), 9);
assert_eq!(data1, data2);
let mut names = db.bucket_names().await?;
names.sort();
assert_eq!(names, vec![String::from(name_1), String::from(name_2)]);
db.reset(name_1);
db.reset(name_2);
db.flush().await;
assert!(db.bucket_names().await?.is_empty());
assert!(db.load(name_1).await?.is_empty());
Ok(())
}
}