#[cfg(test)]
mod tests;
use crate::encoding::{self, EncodingError};
use crate::wal::{Wal, WalError};
use crc32fast::Hasher as Crc32;
use std::{
fs::{self, File, OpenOptions},
io::{self, Read, Write},
path::{Path, PathBuf},
sync::Mutex,
};
use thiserror::Error;
use tracing::{error, info, warn};
const SNAPSHOT_TMP_SUFFIX: &str = ".tmp";
const SNAPSHOT_FILENAME: &str = "MANIFEST-000001";
const WAL_FILENAME: &str = "000000.log";
#[derive(Debug, Error)]
pub enum ManifestError {
#[error("WAL error: {0}")]
Wal(#[from] WalError),
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Encoding error: {0}")]
Encoding(#[from] EncodingError),
#[error("Snapshot checksum mismatch")]
SnapshotChecksumMismatch,
#[error("Internal error: {0}")]
Internal(String),
}
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct ManifestData {
version: u64,
last_lsn: u64,
active_wal: u64,
frozen_wals: Vec<u64>,
sstables: Vec<ManifestSstEntry>,
next_sst_id: u64,
dirty: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct ManifestSstEntry {
pub id: u64,
pub path: PathBuf,
}
impl encoding::Encode for ManifestSstEntry {
fn encode_to(&self, buf: &mut Vec<u8>) -> Result<(), EncodingError> {
encoding::Encode::encode_to(&self.id, buf)?;
encoding::Encode::encode_to(&self.path, buf)?;
Ok(())
}
}
impl encoding::Decode for ManifestSstEntry {
fn decode_from(buf: &[u8]) -> Result<(Self, usize), EncodingError> {
let mut offset = 0;
let (id, n) = u64::decode_from(&buf[offset..])?;
offset += n;
let (path, n) = PathBuf::decode_from(&buf[offset..])?;
offset += n;
Ok((Self { id, path }, offset))
}
}
impl encoding::Encode for ManifestData {
fn encode_to(&self, buf: &mut Vec<u8>) -> Result<(), EncodingError> {
encoding::Encode::encode_to(&self.version, buf)?;
encoding::Encode::encode_to(&self.last_lsn, buf)?;
encoding::Encode::encode_to(&self.active_wal, buf)?;
encoding::encode_vec(&self.frozen_wals, buf)?;
encoding::encode_vec(&self.sstables, buf)?;
encoding::Encode::encode_to(&self.next_sst_id, buf)?;
encoding::Encode::encode_to(&false, buf)?;
Ok(())
}
}
impl encoding::Decode for ManifestData {
fn decode_from(buf: &[u8]) -> Result<(Self, usize), EncodingError> {
let mut offset = 0;
let (version, n) = u64::decode_from(&buf[offset..])?;
offset += n;
let (last_lsn, n) = u64::decode_from(&buf[offset..])?;
offset += n;
let (active_wal, n) = u64::decode_from(&buf[offset..])?;
offset += n;
let (frozen_wals, n) = encoding::decode_vec::<u64>(&buf[offset..])?;
offset += n;
let (sstables, n) = encoding::decode_vec::<ManifestSstEntry>(&buf[offset..])?;
offset += n;
let (next_sst_id, n) = u64::decode_from(&buf[offset..])?;
offset += n;
let (_dirty, n) = bool::decode_from(&buf[offset..])?;
offset += n;
Ok((
Self {
version,
last_lsn,
active_wal,
frozen_wals,
sstables,
next_sst_id,
dirty: false,
},
offset,
))
}
}
impl encoding::Encode for ManifestEvent {
fn encode_to(&self, buf: &mut Vec<u8>) -> Result<(), EncodingError> {
match self {
ManifestEvent::Version { version } => {
encoding::Encode::encode_to(&0u32, buf)?;
encoding::Encode::encode_to(version, buf)?;
}
ManifestEvent::SetActiveWal { wal } => {
encoding::Encode::encode_to(&1u32, buf)?;
encoding::Encode::encode_to(wal, buf)?;
}
ManifestEvent::AddFrozenWal { wal } => {
encoding::Encode::encode_to(&2u32, buf)?;
encoding::Encode::encode_to(wal, buf)?;
}
ManifestEvent::RemoveFrozenWal { wal } => {
encoding::Encode::encode_to(&3u32, buf)?;
encoding::Encode::encode_to(wal, buf)?;
}
ManifestEvent::AddSst { entry } => {
encoding::Encode::encode_to(&4u32, buf)?;
encoding::Encode::encode_to(entry, buf)?;
}
ManifestEvent::RemoveSst { id } => {
encoding::Encode::encode_to(&5u32, buf)?;
encoding::Encode::encode_to(id, buf)?;
}
ManifestEvent::UpdateLsn { last_lsn } => {
encoding::Encode::encode_to(&6u32, buf)?;
encoding::Encode::encode_to(last_lsn, buf)?;
}
ManifestEvent::AllocateSstId { id } => {
encoding::Encode::encode_to(&7u32, buf)?;
encoding::Encode::encode_to(id, buf)?;
}
ManifestEvent::Compaction { added, removed } => {
encoding::Encode::encode_to(&8u32, buf)?;
encoding::encode_vec(added, buf)?;
encoding::encode_vec(removed, buf)?;
}
}
Ok(())
}
}
impl encoding::Decode for ManifestEvent {
fn decode_from(buf: &[u8]) -> Result<(Self, usize), EncodingError> {
let mut offset = 0;
let (tag, n) = u32::decode_from(buf)?;
offset += n;
match tag {
0 => {
let (version, n) = u64::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::Version { version }, offset))
}
1 => {
let (wal, n) = u64::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::SetActiveWal { wal }, offset))
}
2 => {
let (wal, n) = u64::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::AddFrozenWal { wal }, offset))
}
3 => {
let (wal, n) = u64::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::RemoveFrozenWal { wal }, offset))
}
4 => {
let (entry, n) = ManifestSstEntry::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::AddSst { entry }, offset))
}
5 => {
let (id, n) = u64::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::RemoveSst { id }, offset))
}
6 => {
let (last_lsn, n) = u64::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::UpdateLsn { last_lsn }, offset))
}
7 => {
let (id, n) = u64::decode_from(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::AllocateSstId { id }, offset))
}
8 => {
let (added, n) = encoding::decode_vec::<ManifestSstEntry>(&buf[offset..])?;
offset += n;
let (removed, n) = encoding::decode_vec::<u64>(&buf[offset..])?;
offset += n;
Ok((ManifestEvent::Compaction { added, removed }, offset))
}
_ => Err(EncodingError::InvalidTag {
tag,
type_name: "ManifestEvent",
}),
}
}
}
impl encoding::Encode for ManifestSnapshot {
fn encode_to(&self, buf: &mut Vec<u8>) -> Result<(), EncodingError> {
encoding::Encode::encode_to(&self.version, buf)?;
encoding::Encode::encode_to(&self.snapshot_lsn, buf)?;
encoding::Encode::encode_to(&self.manifest_data, buf)?;
encoding::Encode::encode_to(&self.checksum, buf)?;
Ok(())
}
}
impl encoding::Decode for ManifestSnapshot {
fn decode_from(buf: &[u8]) -> Result<(Self, usize), EncodingError> {
let mut offset = 0;
let (version, n) = u64::decode_from(&buf[offset..])?;
offset += n;
let (snapshot_lsn, n) = u64::decode_from(&buf[offset..])?;
offset += n;
let (manifest_data, n) = ManifestData::decode_from(&buf[offset..])?;
offset += n;
let (checksum, n) = u32::decode_from(&buf[offset..])?;
offset += n;
Ok((
Self {
version,
snapshot_lsn,
manifest_data,
checksum,
},
offset,
))
}
}
impl Default for ManifestData {
fn default() -> Self {
Self {
version: 1,
last_lsn: 0,
active_wal: 0,
frozen_wals: Vec::new(),
sstables: Vec::new(),
next_sst_id: 1,
dirty: false,
}
}
}
#[derive(Debug)]
pub enum ManifestEvent {
Version { version: u64 },
SetActiveWal { wal: u64 },
AddFrozenWal { wal: u64 },
RemoveFrozenWal { wal: u64 },
AddSst { entry: ManifestSstEntry },
RemoveSst { id: u64 },
UpdateLsn { last_lsn: u64 },
AllocateSstId { id: u64 },
Compaction {
added: Vec<ManifestSstEntry>,
removed: Vec<u64>,
},
}
#[derive(Debug)]
struct ManifestSnapshot {
version: u64,
snapshot_lsn: u64,
manifest_data: ManifestData,
checksum: u32,
}
#[derive(Debug)]
pub struct Manifest {
path: PathBuf,
wal: Wal<ManifestEvent>,
data: Mutex<ManifestData>,
}
impl Manifest {
pub fn open(path: impl AsRef<Path>) -> Result<Self, ManifestError> {
let path = path.as_ref().to_path_buf();
fs::create_dir_all(&path)?;
let snapshot_path = path.join(SNAPSHOT_FILENAME);
let mut data = ManifestData::default();
let mut snapshot_lsn: u64 = 0;
if snapshot_path.exists() {
match Self::read_snapshot(&snapshot_path) {
Ok((snap, slsn)) => {
data = snap;
snapshot_lsn = slsn;
info!("Loaded manifest snapshot from {:?}", snapshot_path);
}
Err(e) => {
warn!(
"Failed to read manifest snapshot {:?}: {}; \
falling back to full WAL replay",
snapshot_path, e
);
data = ManifestData::default();
snapshot_lsn = 0;
}
}
}
let wal_path = path.join(WAL_FILENAME);
let wal = Wal::<ManifestEvent>::open(&wal_path, None)?;
let mut manifest = Manifest {
path,
wal,
data: Mutex::new(data),
};
manifest.replay_wal(snapshot_lsn)?;
Ok(manifest)
}
fn lock_data(&self) -> Result<std::sync::MutexGuard<'_, ManifestData>, ManifestError> {
self.data.lock().map_err(|_| {
error!("Mutex poisoned");
ManifestError::Internal("Mutex poisoned".into())
})
}
pub fn get_active_wal(&self) -> Result<u64, ManifestError> {
Ok(self.lock_data()?.active_wal)
}
pub fn get_frozen_wals(&self) -> Result<Vec<u64>, ManifestError> {
Ok(self.lock_data()?.frozen_wals.clone())
}
pub fn get_sstables(&self) -> Result<Vec<ManifestSstEntry>, ManifestError> {
Ok(self.lock_data()?.sstables.clone())
}
pub fn get_last_lsn(&self) -> Result<u64, ManifestError> {
Ok(self.lock_data()?.last_lsn)
}
pub fn is_dirty(&self) -> Result<bool, ManifestError> {
Ok(self.lock_data()?.dirty)
}
pub fn set_active_wal(&self, wal_id: u64) -> Result<(), ManifestError> {
let rec = ManifestEvent::SetActiveWal { wal: wal_id };
self.wal.append(&rec)?;
self.apply_record(&rec)?;
Ok(())
}
pub fn add_frozen_wal(&self, wal_id: u64) -> Result<(), ManifestError> {
let rec = ManifestEvent::AddFrozenWal { wal: wal_id };
self.wal.append(&rec)?;
self.apply_record(&rec)?;
Ok(())
}
pub fn remove_frozen_wal(&self, wal_id: u64) -> Result<(), ManifestError> {
let rec = ManifestEvent::RemoveFrozenWal { wal: wal_id };
self.wal.append(&rec)?;
self.apply_record(&rec)?;
Ok(())
}
pub fn add_sstable(&self, entry: ManifestSstEntry) -> Result<(), ManifestError> {
let rec = ManifestEvent::AddSst {
entry: entry.clone(),
};
self.wal.append(&rec)?;
self.apply_record(&rec)?;
Ok(())
}
pub fn remove_sstable(&self, sst_id: u64) -> Result<(), ManifestError> {
let rec = ManifestEvent::RemoveSst { id: sst_id };
self.wal.append(&rec)?;
self.apply_record(&rec)?;
Ok(())
}
pub fn allocate_sst_id(&self) -> Result<u64, ManifestError> {
let mut data = self.lock_data()?;
let id = data.next_sst_id;
let rec = ManifestEvent::AllocateSstId { id };
self.wal.append(&rec)?;
data.next_sst_id = id + 1;
data.dirty = true;
Ok(id)
}
pub fn peek_next_sst_id(&self) -> Result<u64, ManifestError> {
Ok(self.lock_data()?.next_sst_id)
}
pub fn apply_compaction(
&self,
added: Vec<ManifestSstEntry>,
removed: Vec<u64>,
) -> Result<(), ManifestError> {
let rec = ManifestEvent::Compaction { added, removed };
self.wal.append(&rec)?;
self.apply_record(&rec)?;
Ok(())
}
pub fn update_lsn(&self, last_lsn: u64) -> Result<(), ManifestError> {
let rec = ManifestEvent::UpdateLsn { last_lsn };
self.wal.append(&rec)?;
self.apply_record(&rec)?;
Ok(())
}
pub fn checkpoint(&mut self) -> Result<(), ManifestError> {
let snapshot = {
let data = self.lock_data()?.clone();
ManifestSnapshot {
version: data.version,
snapshot_lsn: data.last_lsn,
manifest_data: data,
checksum: 0,
}
};
let mut snapshot_bytes = encoding::encode_to_vec(&snapshot)?;
let mut hasher = Crc32::new();
hasher.update(&snapshot_bytes);
let checksum = hasher.finalize();
let len = snapshot_bytes.len();
snapshot_bytes[len - 4..].copy_from_slice(&checksum.to_le_bytes());
let tmp_name = format!("{}{}", SNAPSHOT_FILENAME, SNAPSHOT_TMP_SUFFIX);
let tmp_path = self.path.join(&tmp_name);
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp_path)?;
f.write_all(&snapshot_bytes)?;
f.sync_all()?; }
let final_path = self.path.join(SNAPSHOT_FILENAME);
fs::rename(&tmp_path, &final_path)?;
Self::fsync_dir(&self.path)?;
info!("Manifest snapshot written to {:?}", final_path);
self.wal.truncate()?;
self.lock_data()?.dirty = false;
Ok(())
}
fn fsync_dir(dir: &Path) -> Result<(), ManifestError> {
let dir_file = File::open(dir)?;
dir_file.sync_all()?;
Ok(())
}
fn read_snapshot(p: &Path) -> Result<(ManifestData, u64), ManifestError> {
let mut f = File::open(p)?;
let mut buf = Vec::new();
f.read_to_end(&mut buf)?;
let (snap, _) = encoding::decode_from_slice::<ManifestSnapshot>(buf.as_slice())?;
let verify = ManifestSnapshot {
checksum: 0,
version: snap.version,
snapshot_lsn: snap.snapshot_lsn,
manifest_data: snap.manifest_data.clone(),
};
let verify_bytes = encoding::encode_to_vec(&verify)?;
let mut hasher = Crc32::new();
hasher.update(&verify_bytes);
let computed_checksum = hasher.finalize();
if snap.checksum != computed_checksum {
return Err(ManifestError::SnapshotChecksumMismatch);
}
Ok((snap.manifest_data, snap.snapshot_lsn))
}
fn replay_wal(&mut self, snapshot_lsn: u64) -> Result<(), ManifestError> {
let iter = match self.wal.replay_iter() {
Ok(i) => i,
Err(e) => {
return Err(ManifestError::Wal(e));
}
};
let mut count: u64 = 0;
for item in iter {
match item {
Ok(rec) => {
self.apply_record(&rec)?;
count += 1;
}
Err(e) => {
warn!("Manifest WAL replay stopped due to WAL error: {}", e);
break;
}
}
}
let current_lsn = self.lock_data()?.last_lsn;
if snapshot_lsn > 0 && current_lsn < snapshot_lsn {
warn!(
"Manifest LSN after WAL replay ({}) is less than snapshot LSN ({}); \
possible WAL truncation or data loss",
current_lsn, snapshot_lsn
);
}
info!(
"Manifest WAL replay: {} entries applied (snapshot_lsn={})",
count, snapshot_lsn
);
Ok(())
}
fn apply_record(&self, rec: &ManifestEvent) -> Result<(), ManifestError> {
let mut data = self.lock_data()?;
match rec {
ManifestEvent::Version { version } => {
data.version = *version;
data.dirty = true;
}
ManifestEvent::SetActiveWal { wal } => {
data.active_wal = *wal;
data.frozen_wals.retain(|w| w != wal);
data.dirty = true;
}
ManifestEvent::AddFrozenWal { wal } => {
if !data.frozen_wals.contains(wal) {
data.frozen_wals.push(*wal);
}
data.dirty = true;
}
ManifestEvent::RemoveFrozenWal { wal } => {
data.frozen_wals.retain(|w| w != wal);
data.dirty = true;
}
ManifestEvent::AddSst { entry } => {
if !data.sstables.iter().any(|e| e.id == entry.id) {
data.sstables.push(entry.clone());
}
data.dirty = true;
}
ManifestEvent::RemoveSst { id } => {
data.sstables.retain(|e| e.id != *id);
data.dirty = true;
}
ManifestEvent::UpdateLsn { last_lsn } => {
if *last_lsn > data.last_lsn {
data.last_lsn = *last_lsn;
}
data.dirty = true;
}
ManifestEvent::AllocateSstId { id } => {
if *id >= data.next_sst_id {
data.next_sst_id = *id + 1;
}
data.dirty = true;
}
ManifestEvent::Compaction { added, removed } => {
for id in removed {
data.sstables.retain(|e| e.id != *id);
}
for entry in added {
if !data.sstables.iter().any(|e| e.id == entry.id) {
data.sstables.push(entry.clone());
}
if entry.id >= data.next_sst_id {
data.next_sst_id = entry.id + 1;
}
}
data.dirty = true;
}
}
Ok(())
}
}