use std::collections::HashMap;
use std::fs;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::cache::table_cache::TableCache;
use crate::error::{Error, Result};
use crate::manifest::version::{TableFile, Version};
use crate::manifest::version_edit::{FileMetaData, VersionEdit};
use crate::sst::table_reader::TableReader;
use crate::types::{SequenceNumber, compare_internal_key};
use crate::wal::{WalReader, WalWriter};
use parking_lot::Mutex;
use ruc::*;
pub struct VersionSet {
db_path: PathBuf,
num_levels: usize,
current: Arc<Version>,
next_file_number: u64,
log_number: u64,
last_sequence: SequenceNumber,
manifest_number: u64,
manifest_writer: Arc<Mutex<Option<WalWriter>>>,
table_cache: Option<Arc<TableCache>>,
edits_since_snapshot: u64,
}
impl VersionSet {
pub fn create(db_path: &Path, num_levels: usize) -> Result<Self> {
Self::create_with_cache(db_path, num_levels, None)
}
pub fn create_with_cache(
db_path: &Path,
num_levels: usize,
table_cache: Option<Arc<TableCache>>,
) -> Result<Self> {
let manifest_number = 1;
let manifest_path = db_path.join(format!("MANIFEST-{:06}", manifest_number));
let manifest_writer = WalWriter::new(&manifest_path).c(d!())?;
Self::set_current_file(db_path, manifest_number).c(d!())?;
let mut vs = Self {
db_path: db_path.to_path_buf(),
num_levels,
current: Arc::new(Version::new(num_levels)),
next_file_number: 2, log_number: 0,
last_sequence: 0,
manifest_number,
manifest_writer: Arc::new(Mutex::new(Some(manifest_writer))),
table_cache,
edits_since_snapshot: 0,
};
let mut edit = VersionEdit::new();
edit.set_next_file_number(vs.next_file_number);
edit.set_last_sequence(vs.last_sequence);
vs.log_and_apply(edit).c(d!())?;
vs.sync_manifest().c(d!())?;
Ok(vs)
}
pub fn recover(db_path: &Path, num_levels: usize) -> Result<Self> {
Self::recover_with_cache(db_path, num_levels, None)
}
pub fn recover_with_cache(
db_path: &Path,
num_levels: usize,
table_cache: Option<Arc<TableCache>>,
) -> Result<Self> {
let current_path = db_path.join("CURRENT");
let manifest_name = fs::read_to_string(¤t_path)
.map_err(|e| eg!(Error::Corruption(format!("cannot read CURRENT: {}", e))))
.c(d!())?;
let manifest_name = manifest_name.trim();
let manifest_path = db_path.join(manifest_name);
let manifest_number = manifest_name
.strip_prefix("MANIFEST-")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(1);
let mut reader = WalReader::new(&manifest_path).c(d!())?;
let mut next_file_number = 2u64;
let mut log_number = 0u64;
let mut last_sequence = 0u64;
let mut live_files: HashMap<u64, (usize, FileMetaData)> = HashMap::new();
loop {
let data = match reader.read_record() {
Ok(Some(data)) => data,
Ok(None) => break,
Err(e) if reader.is_at_file_end().unwrap_or(false) => {
tracing::warn!("MANIFEST {} has corrupt tail: {}", manifest_name, e);
break;
}
Err(e) => return Err(e),
};
let edit = VersionEdit::decode(&data).c(d!())?;
if let Some(n) = edit.next_file_number {
next_file_number = n;
}
if let Some(n) = edit.log_number {
log_number = n;
}
if let Some(s) = edit.last_sequence {
last_sequence = s;
}
for (_level, file_number) in &edit.deleted_files {
live_files.remove(file_number);
}
for (level, meta) in &edit.new_files {
let level = *level as usize;
if level < num_levels {
live_files.insert(meta.number, (level, meta.clone()));
}
}
}
let mut version = Version::new(num_levels);
for (level, meta) in live_files.values() {
let reader_result = if let Some(ref tc) = table_cache {
tc.get_reader(meta.number)
} else {
let sst_path = db_path.join(format!("{:06}.sst", meta.number));
TableReader::open(&sst_path).map(Arc::new)
};
match reader_result {
Ok(reader) => {
version.files[*level].push(TableFile {
meta: meta.clone(),
reader,
});
}
Err(e) => {
return Err(eg!(Error::Corruption(format!(
"cannot open SST {} during recovery: {}",
meta.number, e
))));
}
}
}
version.files[0].sort_by(|a, b| b.meta.number.cmp(&a.meta.number));
for level in 1..num_levels {
version.files[level]
.sort_by(|a, b| compare_internal_key(&a.meta.smallest_key, &b.meta.smallest_key));
}
let valid_offset = reader.last_valid_offset();
let manifest_writer =
WalWriter::open_append_truncated(&manifest_path, valid_offset).c(d!())?;
Ok(Self {
db_path: db_path.to_path_buf(),
num_levels,
current: Arc::new(version),
next_file_number,
log_number,
last_sequence,
manifest_number,
manifest_writer: Arc::new(Mutex::new(Some(manifest_writer))),
table_cache,
edits_since_snapshot: 0,
})
}
pub fn open(db_path: &Path, num_levels: usize) -> Result<Self> {
Self::open_with_cache(db_path, num_levels, None)
}
pub fn open_with_cache(
db_path: &Path,
num_levels: usize,
table_cache: Option<Arc<TableCache>>,
) -> Result<Self> {
let current_path = db_path.join("CURRENT");
if current_path.exists() {
Self::recover_with_cache(db_path, num_levels, table_cache)
} else {
Self::create_with_cache(db_path, num_levels, table_cache)
}
}
pub fn log_and_apply(&mut self, edit: VersionEdit) -> Result<()> {
let mut new_version = (*self.current).clone();
for (level, meta) in &edit.new_files {
let level = *level as usize;
if level < self.num_levels {
let reader = if let Some(ref tc) = self.table_cache {
tc.get_reader(meta.number)
} else {
let sst_path = self.db_path.join(format!("{:06}.sst", meta.number));
TableReader::open(&sst_path).map(Arc::new)
};
match reader {
Ok(reader) => {
if level == 0 {
new_version.files[level].insert(
0,
TableFile {
meta: meta.clone(),
reader,
},
);
} else {
new_version.files[level].push(TableFile {
meta: meta.clone(),
reader,
});
new_version.files[level].sort_by(|a, b| {
compare_internal_key(&a.meta.smallest_key, &b.meta.smallest_key)
});
}
}
Err(e) => {
return Err(eg!(Error::Corruption(format!(
"failed to open new SST {}: {}",
meta.number, e
))));
}
}
}
}
for (level, file_number) in &edit.deleted_files {
let level = *level as usize;
if level < self.num_levels {
new_version.files[level].retain(|f| f.meta.number != *file_number);
}
}
let encoded = edit.encode();
{
let mut w = self.manifest_writer.lock();
if let Some(ref mut writer) = *w {
writer.add_record(&encoded).c(d!())?;
}
}
if let Some(n) = edit.next_file_number {
self.next_file_number = n;
}
if let Some(n) = edit.log_number {
self.log_number = n;
}
if let Some(s) = edit.last_sequence {
self.last_sequence = s;
}
self.current = Arc::new(new_version);
self.edits_since_snapshot += 1;
self.maybe_compact_manifest().c(d!())?;
Ok(())
}
pub fn sync_manifest(&self) -> Result<()> {
let mut w = self.manifest_writer.lock();
if let Some(ref mut writer) = *w {
writer.sync().c(d!())?;
}
Ok(())
}
pub fn manifest_sync_handle(&self) -> Arc<Mutex<Option<WalWriter>>> {
Arc::clone(&self.manifest_writer)
}
pub fn new_file_number(&mut self) -> u64 {
let n = self.next_file_number;
self.next_file_number += 1;
n
}
pub fn reserve_file_numbers(&mut self, count: u64) -> u64 {
let start = self.next_file_number;
self.next_file_number += count;
start
}
pub fn current(&self) -> Arc<Version> {
self.current.clone()
}
pub fn next_file_number(&self) -> u64 {
self.next_file_number
}
pub fn ensure_file_number_at_least(&mut self, min_next: u64) {
if min_next > self.next_file_number {
self.next_file_number = min_next;
}
}
pub fn log_number(&self) -> u64 {
self.log_number
}
pub fn last_sequence(&self) -> SequenceNumber {
self.last_sequence
}
pub fn set_last_sequence(&mut self, seq: SequenceNumber) {
self.last_sequence = seq;
}
pub fn maybe_compact_manifest(&mut self) -> Result<()> {
const MANIFEST_COMPACTION_THRESHOLD: u64 = 1000;
if self.edits_since_snapshot < MANIFEST_COMPACTION_THRESHOLD {
return Ok(());
}
let new_manifest_number = self.next_file_number;
self.next_file_number += 1;
let snapshot_edit = VersionEdit::from_version_snapshot(
&self.current,
self.log_number,
self.next_file_number,
self.last_sequence,
);
let new_manifest_path = self
.db_path
.join(format!("MANIFEST-{:06}", new_manifest_number));
let mut new_writer = match WalWriter::new(&new_manifest_path).c(d!()) {
Ok(writer) => writer,
Err(e) => {
tracing::warn!("MANIFEST compaction deferred creating snapshot: {}", e);
return Ok(());
}
};
let encoded = snapshot_edit.encode();
if let Err(e) = new_writer.add_record(&encoded).c(d!()) {
drop(new_writer);
let _ = fs::remove_file(&new_manifest_path);
tracing::warn!("MANIFEST compaction deferred writing snapshot: {}", e);
return Ok(());
}
if let Err(e) = new_writer.sync().c(d!()) {
drop(new_writer);
let _ = fs::remove_file(&new_manifest_path);
tracing::warn!("MANIFEST compaction deferred syncing snapshot: {}", e);
return Ok(());
}
if let Err(e) = Self::write_current_file_tmp(&self.db_path, new_manifest_number).c(d!()) {
drop(new_writer);
let _ = fs::remove_file(&new_manifest_path);
tracing::warn!("MANIFEST compaction deferred writing CURRENT: {}", e);
return Ok(());
}
if let Err(e) = Self::rename_current_file(&self.db_path).c(d!()) {
drop(new_writer);
let _ = fs::remove_file(&new_manifest_path);
let _ = fs::remove_file(self.db_path.join("CURRENT.tmp"));
tracing::warn!("MANIFEST compaction deferred publishing CURRENT: {}", e);
return Ok(());
}
let old_manifest_number = self.manifest_number;
self.manifest_number = new_manifest_number;
*self.manifest_writer.lock() = Some(new_writer);
self.edits_since_snapshot = 0;
Self::fsync_directory(&self.db_path).c(d!())?;
let old_manifest_path = self
.db_path
.join(format!("MANIFEST-{:06}", old_manifest_number));
if let Err(e) = fs::remove_file(&old_manifest_path) {
tracing::warn!(
"failed to remove old manifest {}: {}",
old_manifest_path.display(),
e
);
}
Ok(())
}
fn set_current_file(db_path: &Path, manifest_number: u64) -> Result<()> {
Self::write_current_file_tmp(db_path, manifest_number).c(d!())?;
Self::rename_current_file(db_path).c(d!())?;
Self::fsync_directory(db_path).c(d!())?;
Ok(())
}
fn write_current_file_tmp(db_path: &Path, manifest_number: u64) -> Result<()> {
let contents = format!("MANIFEST-{:06}\n", manifest_number);
let tmp_path = db_path.join("CURRENT.tmp");
let file = fs::File::create(&tmp_path).c(d!())?;
let mut writer = BufWriter::new(file);
writer.write_all(contents.as_bytes()).c(d!())?;
writer.flush().c(d!())?;
writer.get_ref().sync_all().c(d!())?;
Ok(())
}
fn rename_current_file(db_path: &Path) -> Result<()> {
let tmp_path = db_path.join("CURRENT.tmp");
fs::rename(&tmp_path, db_path.join("CURRENT")).c(d!())?;
Ok(())
}
fn fsync_directory(dir: &Path) -> Result<()> {
let f = fs::File::open(dir).c(d!())?;
f.sync_all().c(d!())?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_and_recover() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
{
let vs = VersionSet::create(path, 7).unwrap();
assert_eq!(vs.current().total_files(), 0);
}
assert!(path.join("CURRENT").exists());
let current_content = fs::read_to_string(path.join("CURRENT")).unwrap();
assert!(current_content.trim().starts_with("MANIFEST-"));
let vs = VersionSet::recover(path, 7).unwrap();
assert_eq!(vs.current().total_files(), 0);
}
#[test]
fn test_recovery_fails_on_missing_sst() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
{
let vs = VersionSet::create(path, 7).unwrap();
let mut edit = VersionEdit::new();
edit.set_last_sequence(100);
edit.add_file(
0,
FileMetaData {
number: 10,
file_size: 4096,
smallest_key: b"aaa".to_vec(),
largest_key: b"zzz".to_vec(),
has_range_deletions: false,
},
);
let encoded = edit.encode();
{
let mut w = vs.manifest_writer.lock();
if let Some(ref mut writer) = *w {
writer.add_record(&encoded).unwrap();
writer.sync().unwrap();
}
}
}
let result = VersionSet::recover(path, 7);
assert!(result.is_err(), "recovery should fail on missing SST");
}
#[test]
fn test_version_edit_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path();
let mut vs = VersionSet::create(path, 7).unwrap();
let n1 = vs.new_file_number();
let n2 = vs.new_file_number();
assert!(n1 < n2);
let mut edit = VersionEdit::new();
edit.set_last_sequence(42);
edit.set_next_file_number(vs.next_file_number());
let encoded = edit.encode();
let decoded = VersionEdit::decode(&encoded).unwrap();
assert_eq!(decoded.last_sequence, Some(42));
}
}