use super::abloom;
use super::acache;
use super::address::*;
use super::chunk_storage;
use super::compression;
use super::dir_chunk_storage;
use super::external_chunk_storage;
use super::fstx2;
use super::fsutil;
use super::htree;
use super::migrate;
use super::oplog;
use super::protocol;
use super::vfs;
use super::xid::*;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::io::{BufRead, Read, Seek, Write};
use std::path::{Path, PathBuf};
use uriparse::uri;
#[non_exhaustive]
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub enum StorageEngineSpec {
DirStore,
ExternalStore { socket_path: String, path: String },
}
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
pub struct GcStats {
pub chunks_deleted: Option<u64>,
pub bytes_deleted: Option<u64>,
pub chunks_remaining: Option<u64>,
pub bytes_remaining: Option<u64>,
}
#[derive(Clone, Eq, PartialEq)]
pub enum RepoLockMode {
None,
Shared,
Exclusive,
}
enum RepoLock {
None,
Shared(vfs::VFile),
Exclusive(vfs::VFile),
}
impl RepoLock {
fn mode(&self) -> RepoLockMode {
match self {
RepoLock::None => RepoLockMode::None,
RepoLock::Shared(_) => RepoLockMode::Shared,
RepoLock::Exclusive(_) => RepoLockMode::Exclusive,
}
}
}
pub struct Repo {
repo_vfs: vfs::VFs,
storage_engine: Box<dyn chunk_storage::Engine>,
repo_lock: RepoLock,
}
pub enum OpLogSyncEvent {
Start(Xid),
LogOps(Vec<oplog::LogOp>),
End,
}
pub const CURRENT_SCHEMA_VERSION: &str = "8";
const MIN_GC_BLOOM_SIZE: usize = 128 * 1024;
const MAX_GC_BLOOM_SIZE: usize = 0xffffffff;
impl Repo {
pub fn init(
repo_path_or_url: &Path,
storage_engine: Option<StorageEngineSpec>,
) -> Result<(), anyhow::Error> {
let storage_engine = match storage_engine {
Some(storage_engine) => storage_engine,
None => StorageEngineSpec::DirStore,
};
let (fs, repo_path) = if repo_path_or_url.starts_with("file:")
|| repo_path_or_url.starts_with("vfs+9p2000.l:")
{
let mut u = match repo_path_or_url.to_str() {
Some(r) => match uri::URI::try_from(r) {
Ok(u) => u,
Err(err) => anyhow::bail!("unable to parse uri repository path: {}", err),
},
None => anyhow::bail!("uri repository paths must be valid utf-8"),
};
let mut path = PathBuf::from(u.path().to_string());
if !path.is_absolute() {
path = std::env::current_dir()?.join(&path);
};
let repo_path = match path.file_name() {
Some(name) => PathBuf::from(name),
None => anyhow::bail!("unable to create a repository at /"),
};
let parent_path = {
let mut p = path;
p.pop();
p
};
if let Err(err) = u.set_path(parent_path.to_str().unwrap()) {
anyhow::bail!("unable to set uri path: {}", err)
}
let fs = vfs::VFs::create_from_uri(&u)?;
(fs, repo_path)
} else {
let path = if repo_path_or_url.is_absolute() {
repo_path_or_url.to_path_buf()
} else {
std::env::current_dir()?.join(repo_path_or_url)
};
let repo_path = match path.file_name() {
Some(name) => PathBuf::from(name),
None => anyhow::bail!("unable to create a repository at /"),
};
let parent_path = {
let mut parent_path = path;
parent_path.pop();
parent_path
};
let fs = vfs::VFs::create_from_local_path(&parent_path)?;
(fs, repo_path)
};
let repo_path = match repo_path.to_str() {
Some(repo_path) => repo_path.to_string(),
None => anyhow::bail!("repository base name must be valid utf-8"),
};
let tmp_path = format!("{}.tmp", repo_path);
match fs.metadata(&repo_path) {
Ok(_) => anyhow::bail!(
"{} already exists, remove and try again",
repo_path_or_url.to_string_lossy()
),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
};
match fs.metadata(&tmp_path) {
Ok(_) => anyhow::bail!(
"temporary path {} already exists, remove and try again",
tmp_path
),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
};
let touch = |p: &str| -> Result<(), std::io::Error> {
fs.open(p, vfs::OpenFlags::WRONLY | vfs::OpenFlags::CREAT)?;
Ok(())
};
let write_file = |p: &str, data| -> Result<(), std::io::Error> {
let mut f = fs.open(p, vfs::OpenFlags::WRONLY | vfs::OpenFlags::CREAT)?;
f.write_all(data)?;
f.flush()
};
let sync_dir = |p: &str| -> Result<(), std::io::Error> {
let mut f = fs.open(p, vfs::OpenFlags::RDONLY)?;
f.flush()
};
fs.mkdir(&tmp_path)?;
touch(&format!("{}/{}", &tmp_path, fstx2::LOCK_NAME))?;
write_file(
&format!("{}/{}", tmp_path, fstx2::SEQ_NUM_NAME),
&[0, 0, 0, 0, 0, 0, 0, 0][..],
)?;
sync_dir(".")?;
let repo_fs = fs.sub_fs(&tmp_path)?;
let mut tx = fstx2::WriteTxn::begin_at(&repo_fs)?;
{
tx.add_mkdir("items")?;
tx.add_mkdir("data")?;
tx.add_mkdir("meta")?;
tx.add_mkdir("wal")?;
tx.add_write("repo.lock", vec![])?;
tx.add_write("repo.oplog", vec![])?;
tx.add_write(
"meta/schema_version",
CURRENT_SCHEMA_VERSION.to_string().into(),
)?;
tx.add_write("meta/gc_generation", format!("{:x}", Xid::new()).into())?;
tx.add_write(
"meta/storage_engine",
serde_json::to_vec_pretty(&storage_engine)?,
)?;
}
tx.commit()?;
drop(repo_fs);
fs.rename(&tmp_path, &repo_path)?;
sync_dir(".")?;
Ok(())
}
pub fn open(repo_path: &Path, initial_lock_mode: RepoLockMode) -> Result<Repo, anyhow::Error> {
let repo_path = repo_path.to_str().unwrap();
let repo_vfs = match vfs::VFs::create(repo_path) {
Ok(fs) => fs,
Err(err) => anyhow::bail!("unable to open repository at {}: {}", repo_path, err),
};
let tx_file_exists = repo_vfs.metadata("tx.lock").is_ok();
if !tx_file_exists {
anyhow::bail!("{} is not an intialized repository", repo_path);
}
let mut repo_lock = match initial_lock_mode {
RepoLockMode::None => RepoLock::None,
RepoLockMode::Shared => RepoLock::Shared({
let mut lock_file = repo_vfs.open("repo.lock", vfs::OpenFlags::RDONLY)?;
lock_file.lock(vfs::LockType::Shared)?;
lock_file
}),
RepoLockMode::Exclusive => RepoLock::Exclusive({
let mut lock_file = repo_vfs.open("repo.lock", vfs::OpenFlags::RDWR)?;
lock_file.lock(vfs::LockType::Exclusive)?;
lock_file
}),
};
let mut txn;
loop {
txn = fstx2::ReadTxn::begin_at(&repo_vfs)?;
let mut schema_version = txn.read_string("meta/schema_version")?;
if schema_version != CURRENT_SCHEMA_VERSION {
txn.end();
repo_lock = RepoLock::None;
if schema_version == "5" {
migrate::repo_upgrade_to_5_to_6(&repo_vfs)?;
continue;
}
if schema_version == "6" {
migrate::repo_upgrade_to_6_to_7(&repo_vfs)?;
continue;
}
if schema_version == "7" {
migrate::repo_upgrade_to_7_to_8(&repo_vfs)?;
continue;
}
txn = fstx2::ReadTxn::begin_at(&repo_vfs)?;
schema_version = txn.read_string("meta/schema_version")?;
if schema_version != CURRENT_SCHEMA_VERSION {
anyhow::bail!(
"the current version of bupstash expects repository schema version {}, got {}",
CURRENT_SCHEMA_VERSION,
schema_version
);
}
}
break;
}
let storage_engine: Box<dyn chunk_storage::Engine> = {
let mut f = txn.open("meta/storage_engine")?;
let mut buf = Vec::with_capacity(128);
f.read_to_end(&mut buf)?;
let spec: StorageEngineSpec = serde_json::from_slice(&buf)?;
match spec {
StorageEngineSpec::DirStore => {
let data_fs = repo_vfs.sub_fs("data")?;
Box::new(dir_chunk_storage::DirStorage::new(data_fs)?)
}
StorageEngineSpec::ExternalStore {
socket_path, path, ..
} => {
let socket_path = PathBuf::from(socket_path);
Box::new(external_chunk_storage::ExternalStorage::new(
&socket_path,
&path,
)?)
}
}
};
txn.end();
Ok(Repo {
repo_vfs,
storage_engine,
repo_lock,
})
}
#[allow(clippy::type_complexity)]
pub fn pipelined_get_chunks(
&mut self,
addresses: &[Address],
on_chunk: &mut dyn FnMut(&Address, &[u8]) -> Result<(), anyhow::Error>,
) -> Result<(), anyhow::Error> {
self.storage_engine
.pipelined_get_chunks(addresses, on_chunk)
}
pub fn get_chunk(&mut self, addr: &Address) -> Result<Vec<u8>, anyhow::Error> {
self.storage_engine.get_chunk(addr)
}
pub fn add_chunk(&mut self, addr: &Address, buf: Vec<u8>) -> Result<(), anyhow::Error> {
self.storage_engine.add_chunk(addr, buf)
}
pub fn flush(&mut self) -> Result<protocol::FlushStats, anyhow::Error> {
self.storage_engine.flush()
}
pub fn filter_existing_chunks(
&mut self,
on_progress: &mut dyn FnMut(u64) -> Result<(), anyhow::Error>,
addresses: Vec<Address>,
) -> Result<Vec<Address>, anyhow::Error> {
self.storage_engine
.filter_existing_chunks(on_progress, addresses)
}
pub fn alter_lock_mode(&mut self, lock_mode: RepoLockMode) -> Result<(), anyhow::Error> {
if self.repo_lock.mode() != lock_mode {
self.repo_lock = RepoLock::None;
self.repo_lock = match lock_mode {
RepoLockMode::None => RepoLock::None,
RepoLockMode::Shared => RepoLock::Shared({
let mut lock_file = self.repo_vfs.open("repo.lock", vfs::OpenFlags::RDONLY)?;
lock_file.lock(vfs::LockType::Shared)?;
lock_file
}),
RepoLockMode::Exclusive => RepoLock::Exclusive({
let mut lock_file = self.repo_vfs.open("repo.lock", vfs::OpenFlags::RDWR)?;
lock_file.lock(vfs::LockType::Exclusive)?;
lock_file
}),
};
if matches!(
self.repo_lock.mode(),
RepoLockMode::Shared | RepoLockMode::Exclusive
) {
match self.repo_vfs.metadata("meta/gc_dirty") {
Ok(_) => {
let mut txn = fstx2::WriteTxn::begin_at(&self.repo_vfs)?;
{
let gc_dirty: Option<Xid> =
match txn.read_opt_string("meta/gc_dirty")? {
Some(s) => Some(Xid::parse(&s)?),
None => None,
};
if let Some(dirty_generation) = gc_dirty {
const MAX_DELAY: u64 = 10_000;
let mut delay = 500;
loop {
if self.storage_engine.sweep_completed(dirty_generation)? {
txn.add_rm("meta/gc_dirty")?;
break;
}
std::thread::sleep(std::time::Duration::from_millis(delay));
delay = (delay * 2).min(MAX_DELAY);
}
}
}
txn.commit()?;
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
}
}
}
Ok(())
}
pub fn import_items(
&mut self,
items: Vec<(Xid, oplog::VersionedItemMetadata)>,
) -> Result<(), anyhow::Error> {
self.alter_lock_mode(RepoLockMode::Shared)?;
let mut txn = fstx2::WriteTxn::begin_at(&self.repo_vfs)?;
for (id, item) in items.into_iter() {
if txn.file_exists(&format!("items/{:x}.removed", id))? {
continue;
}
let item_path = format!("items/{:x}", id);
if txn.file_exists(&item_path)? {
continue;
}
let serialized_md = oplog::checked_serialize_metadata(&item)?;
txn.add_write(&item_path, serialized_md)?;
let op = oplog::LogOp::AddItem((id, item));
let serialized_op = serde_bare::to_vec(&op)?;
txn.add_append("repo.oplog", serialized_op)?;
}
txn.commit()?;
Ok(())
}
pub fn add_item(
&mut self,
id: Xid,
item: oplog::VersionedItemMetadata,
) -> Result<Xid, anyhow::Error> {
self.alter_lock_mode(RepoLockMode::Shared)?;
let mut txn = fstx2::WriteTxn::begin_at(&self.repo_vfs)?;
{
let serialized_md = oplog::checked_serialize_metadata(&item)?;
let item_path = format!("items/{:x}", id);
if txn.file_exists(&item_path)? {
anyhow::bail!("item id already exists in repository");
}
txn.add_write(&item_path, serialized_md)?;
let op = oplog::LogOp::AddItem((id, item));
let serialized_op = serde_bare::to_vec(&op)?;
txn.add_append("repo.oplog", serialized_op)?;
}
txn.commit()?;
Ok(id)
}
pub fn remove_items(&mut self, mut items: Vec<Xid>) -> Result<u64, anyhow::Error> {
self.alter_lock_mode(RepoLockMode::Shared)?;
let mut txn = fstx2::WriteTxn::begin_at(&self.repo_vfs)?;
let mut removed_items = Vec::with_capacity(items.len());
for item in items.drain(..) {
let path = format!("items/{:x}", item);
match txn.metadata(&path) {
Ok(_) => {
let removed = path.clone() + ".removed";
txn.add_rename(&path, &removed)?;
removed_items.push(item)
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
};
}
let n_removed = removed_items.len() as u64;
if !removed_items.is_empty() {
let op = oplog::LogOp::RemoveItems(removed_items);
let serialized_op = serde_bare::to_vec(&op)?;
txn.add_append("repo.oplog", serialized_op)?;
}
txn.commit()?;
Ok(n_removed)
}
fn open_item_for_reading(txn: &fstx2::ReadTxn, id: &Xid) -> std::io::Result<vfs::VFile> {
let f = match txn.open(&format!("items/{:x}", id)) {
Ok(f) => f,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
txn.open(&format!("items/{:x}.removed", id))?
}
Err(err) => return Err(err),
};
Ok(f)
}
pub fn batch_read_items(
&mut self,
ids: &[Xid],
) -> Result<Vec<oplog::VersionedItemMetadata>, anyhow::Error> {
let txn = fstx2::ReadTxn::begin_at(&self.repo_vfs)?;
let mut all_metadata = Vec::with_capacity(ids.len());
let mut buf = Vec::with_capacity(1024);
for id in ids {
let mut f = match Repo::open_item_for_reading(&txn, id) {
Ok(f) => f,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
anyhow::bail!("item {:x} does not exist", id)
}
Err(err) => return Err(err.into()),
};
buf.truncate(0);
f.read_to_end(&mut buf)?;
all_metadata.push(serde_bare::from_slice(&buf)?);
}
txn.end();
Ok(all_metadata)
}
pub fn lookup_item_by_id(
&mut self,
id: &Xid,
) -> Result<Option<oplog::VersionedItemMetadata>, anyhow::Error> {
let txn = fstx2::ReadTxn::begin_at(&self.repo_vfs)?;
let r = match Repo::open_item_for_reading(&txn, id) {
Ok(mut f) => {
let mut buf = Vec::with_capacity(1024);
f.read_to_end(&mut buf)?;
Ok(Some(serde_bare::from_slice(&buf)?))
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err.into()),
};
txn.end();
r
}
pub fn has_complete_data(&mut self, id: &Xid) -> Result<bool, anyhow::Error> {
let txn = fstx2::ReadTxn::begin_at(&self.repo_vfs)?;
let r = txn.file_exists(&format!("items/{:x}", id))?
|| txn.file_exists(&format!("items/{:x}.removed", id))?;
txn.end();
Ok(r)
}
pub fn filter_items_with_complete_data(
&mut self,
on_progress: &mut dyn FnMut(u64) -> Result<(), anyhow::Error>,
ids: Vec<Xid>,
) -> Result<Vec<Xid>, anyhow::Error> {
let txn = fstx2::ReadTxn::begin_at(&self.repo_vfs)?;
let mut missing = Vec::new();
let mut progress: u64 = 0;
let mut last_progress_update = std::time::Instant::now();
let update_interval = std::time::Duration::from_millis(500);
for id in ids.iter() {
if (progress % 61 == 0) && last_progress_update.elapsed() > update_interval {
on_progress(progress)?;
progress = 0;
last_progress_update = std::time::Instant::now();
}
if txn.file_exists(&format!("items/{:x}", id))?
|| txn.file_exists(&format!("items/{:x}.removed", id))?
{
continue;
};
missing.push(*id);
progress += 1;
}
txn.end();
Ok(missing)
}
pub fn oplog_sync(
&mut self,
after: Option<u64>,
start_gc_generation: Option<Xid>,
on_sync_event: &mut dyn FnMut(OpLogSyncEvent) -> Result<(), anyhow::Error>,
) -> Result<(), anyhow::Error> {
let txn = fstx2::ReadTxn::begin_at(&self.repo_vfs)?;
let current_gc_generation: Xid = Xid::parse(&txn.read_string("meta/gc_generation")?)?;
let after = match start_gc_generation {
Some(start_gc_generation) if start_gc_generation == current_gc_generation => after,
_ => None,
};
let mut log_file = txn.open("repo.oplog")?;
let log_meta = log_file.metadata()?;
txn.end();
if let Some(after) = after {
log_file.seek(std::io::SeekFrom::Start(after))?;
};
on_sync_event(OpLogSyncEvent::Start(current_gc_generation))?;
const BATCH_SIZE: usize = 64;
let mut op_batch = Vec::with_capacity(BATCH_SIZE);
let log_file = log_file.take(log_meta.size - after.unwrap_or(0));
let mut log_file = std::io::BufReader::new(log_file);
let mut done = false;
if after.is_some() {
serde_bare::from_reader::<_, oplog::LogOp>(&mut log_file)?;
}
while !done {
while op_batch.len() < BATCH_SIZE {
if log_file.fill_buf()?.is_empty() {
done = true;
break;
}
let op: oplog::LogOp = serde_bare::from_reader(&mut log_file)?;
op_batch.push(op);
}
if !op_batch.is_empty() {
let mut send_batch = Vec::with_capacity(if !done { BATCH_SIZE } else { 0 });
std::mem::swap(&mut send_batch, &mut op_batch);
on_sync_event(OpLogSyncEvent::LogOps(send_batch))?;
}
}
on_sync_event(OpLogSyncEvent::End)?;
Ok(())
}
pub fn recover_removed(&mut self) -> Result<u64, anyhow::Error> {
self.alter_lock_mode(RepoLockMode::Shared)?;
let mut txn = fstx2::WriteTxn::begin_at(&self.repo_vfs)?;
let mut n_restored = 0;
for item in txn.read_dir("items")? {
let name = item.file_name;
if name.ends_with(".removed") {
let restored_name = &name[0..(name.len() - ".removed".len())];
txn.add_rename(
&format!("items/{}", name),
&format!("items/{}", restored_name),
)?;
n_restored += 1;
}
}
if n_restored != 0 {
let op = oplog::LogOp::RecoverRemoved;
let serialized_op = serde_bare::to_vec(&op)?;
txn.add_append("repo.oplog", serialized_op)?;
}
txn.commit()?;
Ok(n_restored)
}
pub fn gc_generation(&mut self) -> Result<Xid, anyhow::Error> {
let txn = fstx2::ReadTxn::begin_at(&self.repo_vfs)?;
let gc_generation = Xid::parse(&txn.read_string("meta/gc_generation")?)?;
txn.end();
Ok(gc_generation)
}
pub fn gc(
&mut self,
update_progress_msg: &mut dyn FnMut(String) -> Result<(), anyhow::Error>,
) -> Result<GcStats, anyhow::Error> {
self.alter_lock_mode(RepoLockMode::Shared)?;
let gc_generation = Xid::new();
update_progress_msg("walking reachable data...".to_string())?;
let estimated_chunk_count = self.storage_engine.estimate_chunk_count()?;
let reachable_bloom_mem_size =
abloom::approximate_mem_size_upper_bound(0.02, estimated_chunk_count)
.min(MAX_GC_BLOOM_SIZE)
.max(MIN_GC_BLOOM_SIZE);
let mut xid_wset = HashSet::with_capacity(65536);
let mut address_wcache = acache::ACache::new(1048576);
let mut reachable = abloom::ABloom::new(reachable_bloom_mem_size);
let mut walk_item = |storage_engine: &mut dyn chunk_storage::Engine,
metadata: oplog::VersionedItemMetadata|
-> Result<(), anyhow::Error> {
let data_tree = metadata.data_tree();
let trees = if let Some(index_tree) = metadata.index_tree() {
vec![data_tree, index_tree]
} else {
vec![data_tree]
};
for tree in trees {
let mut tr = htree::TreeReader::new(
tree.height.0.try_into()?,
tree.data_chunk_count.0,
&tree.address,
);
while let Some((height, addr)) = tr.next_addr() {
reachable.add(&addr);
if height != 0 && address_wcache.add(&addr) {
let data = storage_engine.get_chunk(&addr)?;
let data = compression::unauthenticated_decompress(data)?;
tr.push_level(height - 1, data)?;
}
}
}
Ok(())
};
let mut walk_items =
|repo_vfs: &vfs::VFs,
update_progress_msg: &mut dyn FnMut(String) -> Result<(), anyhow::Error>,
storage_engine: &mut dyn chunk_storage::Engine|
-> Result<(), anyhow::Error> {
let txn = fstx2::ReadTxn::begin_at(repo_vfs)?;
let mut items_dir_ents = txn.read_dir("items")?;
let mut reachable_items = HashSet::with_capacity(items_dir_ents.len());
for item in items_dir_ents.drain(..) {
let file_name = item.file_name;
if file_name.ends_with(".removed") {
continue;
}
let item_id = Xid::parse(&file_name)?;
reachable_items.insert(item_id);
}
let update_delay_millis = std::time::Duration::from_millis(500);
let mut last_progress_update = std::time::Instant::now()
.checked_sub(update_delay_millis)
.unwrap();
let mut i = 1;
let mut log_file = txn.open("repo.oplog")?;
let log_meta = log_file.metadata()?;
let log_file = log_file.take(log_meta.size);
let mut log_file = std::io::BufReader::new(log_file);
txn.end();
while !log_file.fill_buf()?.is_empty() {
let op = serde_bare::from_reader(&mut log_file)?;
if let oplog::LogOp::AddItem((item_id, metadata)) = op {
if !reachable_items.contains(&item_id) {
continue;
}
if !xid_wset.insert(item_id) {
continue;
}
if last_progress_update.elapsed() >= update_delay_millis
|| i == reachable_items.len()
{
last_progress_update = std::time::Instant::now();
update_progress_msg(format!(
"walking item {}/{}...",
i,
reachable_items.len()
))?;
}
walk_item(storage_engine, metadata)?;
i += 1;
}
}
Ok(())
};
walk_items(
&self.repo_vfs,
update_progress_msg,
&mut *self.storage_engine,
)?;
update_progress_msg("getting exclusive repository lock...".to_string())?;
self.alter_lock_mode(RepoLockMode::Exclusive)?;
self.storage_engine.prepare_for_sweep(gc_generation)?;
update_progress_msg("compacting repository log...".to_string())?;
{
let mut txn = fstx2::WriteTxn::begin_at(&self.repo_vfs)?;
txn.add_write(
"meta/gc_generation",
format!("{:x}", gc_generation).into_bytes(),
)?;
txn.add_write("meta/gc_dirty", format!("{:x}", gc_generation).into_bytes())?;
let log_file = txn.open("repo.oplog")?;
let mut log_file = std::io::BufReader::new(log_file);
let mut compacted_log = std::io::BufWriter::new(fsutil::anon_temp_file()?);
while !log_file.fill_buf()?.is_empty() {
let op = serde_bare::from_reader(&mut log_file)?;
if let oplog::LogOp::AddItem((id, md)) = op {
let p = format!("items/{:x}", id);
match txn.metadata(&p) {
Ok(_) => {
compacted_log.write_all(&serde_bare::to_vec(
&oplog::LogOp::AddItem((id, md)),
)?)?;
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => (),
Err(err) => return Err(err.into()),
}
}
}
compacted_log.flush()?;
txn.add_write_from_file("repo.oplog", compacted_log.into_inner()?)?;
for item in txn.read_dir("items")? {
let name = item.file_name;
if name.ends_with(".removed") {
txn.add_rm(&format!("items/{}", name))?;
}
}
txn.commit()?;
}
update_progress_msg("finalizing reachable data...".to_string())?;
walk_items(
&self.repo_vfs,
update_progress_msg,
&mut *self.storage_engine,
)?;
if std::env::var("BUPSTASH_DEBUG_GC").is_ok() {
eprintln!("dbg_gc_estimated_chunk_count={}", estimated_chunk_count);
eprintln!("dbg_gc_reachable_bloom_mem_size={}", reachable.mem_size());
eprintln!(
"dbg_gc_reachable_bloom_utilization={}",
reachable.estimate_utilization()
);
eprintln!(
"dbg_gc_reachable_bloom_estimated_false_positive_rate={}",
reachable.estimate_false_positive_rate()
);
eprintln!(
"dbg_gc_reachable_bloom_estimated_add_count={}",
reachable.estimate_add_count()
);
eprintln!("dbg_gc_xid_walk_set_size={}", xid_wset.len(),);
eprintln!(
"dbg_gc_address_walk_cache_hit_count={}/{}",
address_wcache.hit_count, address_wcache.add_count
);
eprintln!(
"dbg_gc_address_walk_cache_utilization={}",
address_wcache.utilization()
);
}
std::mem::drop(xid_wset);
std::mem::drop(address_wcache);
let stats = self.storage_engine.sweep(update_progress_msg, reachable)?;
{
let mut txn = fstx2::WriteTxn::begin_at(&self.repo_vfs)?;
let gc_dirty: Option<Xid> = match txn.read_opt_string("meta/gc_dirty")? {
Some(s) => Some(Xid::parse(&s)?),
None => None,
};
if gc_dirty == Some(gc_generation) {
txn.add_rm("meta/gc_dirty")?;
}
txn.commit()?;
}
Ok(stats)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn dir_store_sanity_test() {
let tmp_dir = tempfile::tempdir().unwrap();
let mut path_buf = PathBuf::from(tmp_dir.path());
path_buf.push("repo");
Repo::init(path_buf.as_path(), Some(StorageEngineSpec::DirStore)).unwrap();
let mut repo = Repo::open(path_buf.as_path(), RepoLockMode::Exclusive).unwrap();
let addr = Address::default();
repo.add_chunk(&addr, vec![1]).unwrap();
repo.flush().unwrap();
repo.add_chunk(&addr, vec![2]).unwrap();
repo.flush().unwrap();
let v = repo.get_chunk(&addr).unwrap();
assert_eq!(v, vec![1]);
}
}