use std::{collections::HashMap, marker::PhantomData, mem::ManuallyDrop, path::PathBuf};
use derive_more::{Display, Error, From};
use serde::{Deserialize, Serialize};
use tempest_core::journal::{Journal, JournalError, JournalHandle, Replayable};
use tempest_io::Io;
use tempest_rt::JoinHandle;
use crate::config::ManifestConfig;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct SstMetadataV1 {
pub(crate) filenum: u64,
pub(crate) level: u32,
}
pub(crate) type SstMetadata = SstMetadataV1;
#[derive(Serialize, Deserialize)]
enum SstManifestEditV1 {
AddFile(SstMetadataV1),
Compaction {
added: SstMetadataV1,
removed: Vec<u64>,
},
Snapshot(Vec<ManifestEdit>),
}
#[repr(u8)]
#[derive(Serialize, Deserialize)]
enum ManifestEdit {
V1(SstManifestEditV1),
}
impl ManifestEdit {
fn add_file(metadata: SstMetadata) -> Self {
Self::V1(SstManifestEditV1::AddFile(metadata))
}
fn compaction(added: SstMetadata, removed: Vec<u64>) -> Self {
Self::V1(SstManifestEditV1::Compaction { added, removed })
}
fn snapshot(edits: Vec<ManifestEdit>) -> ManifestEdit {
Self::V1(SstManifestEditV1::Snapshot(edits))
}
}
#[derive(Default, Clone)]
struct ManifestData {
ssts: HashMap<u64, SstMetadata>,
next_filenum: u64,
}
impl ManifestData {
fn apply_add_file(&mut self, metadata: SstMetadata) {
self.next_filenum = (self.next_filenum + 1).max(metadata.filenum);
let old = self.ssts.insert(metadata.filenum, metadata);
assert!(old.is_none(), "must not overwrite old manifest entry");
}
fn apply_remove_file(&mut self, filenum: u64) {
let val = self.ssts.remove(&filenum);
assert!(
val.is_some(),
"cannot remove manifest entry that does not exist"
)
}
pub(crate) fn alloc_filenum(&mut self) -> u64 {
let filenum = self.next_filenum;
self.next_filenum += 1;
filenum
}
}
impl Replayable for ManifestData {
type Edit = ManifestEdit;
fn apply(&mut self, edit: Self::Edit) {
match edit {
ManifestEdit::V1(edit) => match edit {
SstManifestEditV1::AddFile(metadata) => self.apply_add_file(metadata),
SstManifestEditV1::Compaction { added, removed } => {
self.apply_add_file(added);
for file in removed {
self.apply_remove_file(file);
}
}
SstManifestEditV1::Snapshot(edits) => {
for edit in edits {
self.apply(edit)
}
}
},
}
}
fn snapshot(&self) -> Self::Edit {
let num_edits = self.ssts.len();
let mut edits = Vec::with_capacity(num_edits);
edits.extend(
self.ssts
.values()
.map(|v| ManifestEdit::add_file(v.clone())),
);
ManifestEdit::snapshot(edits)
}
fn filename_prefix() -> &'static str {
"manifest"
}
fn initial() -> Self {
Self::default()
}
}
#[derive(Debug, Display, From, Error)]
pub enum ManifestError {
Journal(JournalError),
}
pub(crate) struct Manifest<I: Io> {
data: ManifestData,
journal: ManuallyDrop<JournalHandle<ManifestData>>,
journal_handle: ManuallyDrop<JoinHandle<()>>,
_marker: PhantomData<I>,
}
impl<I: Io> Manifest<I> {
pub(crate) async fn init(
dir: PathBuf,
config: ManifestConfig,
) -> Result<Manifest<I>, ManifestError> {
let (journal, journal_handle) =
Journal::<ManifestData, I>::new(dir, config.journal.clone()).await?;
let data = journal.data().clone();
debug!("manifest now initialized");
Ok(Manifest {
data,
journal: ManuallyDrop::new(journal),
journal_handle: ManuallyDrop::new(journal_handle),
_marker: PhantomData,
})
}
pub(crate) async fn close(mut self) -> Result<(), ManifestError> {
unsafe { ManuallyDrop::drop(&mut self.journal) };
unsafe { ManuallyDrop::take(&mut self.journal_handle).await };
std::mem::forget(self); Ok(())
}
pub(crate) fn alloc_filenum(&mut self) -> u64 {
self.data.alloc_filenum()
}
pub(crate) fn ssts(&self) -> impl Iterator<Item = &SstMetadata> {
self.data.ssts.values()
}
pub(crate) fn ssts_at_level(&self, level: u32) -> impl Iterator<Item = &SstMetadata> {
self.data.ssts.values().filter(move |m| m.level == level)
}
pub(crate) async fn record_flush(
&mut self,
metadata: SstMetadata,
) -> Result<(), ManifestError> {
debug!(?metadata, "recording sst flush in manifest");
self.journal
.append(ManifestEdit::add_file(metadata.clone()))
.await?;
let old = self.data.ssts.insert(metadata.filenum, metadata);
debug_assert!(old.is_none(), "must not overwrite existing manifest entry");
Ok(())
}
pub(crate) async fn record_compaction(
&mut self,
added: SstMetadata,
removed: Vec<u64>,
) -> Result<(), ManifestError> {
debug!(?added, ?removed, "recording sst compaction in manifest");
self.journal
.append(ManifestEdit::compaction(added.clone(), removed.clone()))
.await?;
let old = self.data.ssts.insert(added.filenum, added);
debug_assert!(old.is_none(), "must not overwrite existing manifest entry");
for filenum in removed {
let val = self.data.ssts.remove(&filenum);
debug_assert!(
val.is_some(),
"cannot remove manifest entry that does not exist"
);
}
Ok(())
}
}
impl<I: Io> Drop for Manifest<I> {
fn drop(&mut self) {
warn!("Manifest dropped without close()");
unsafe { ManuallyDrop::drop(&mut self.journal) };
unsafe { ManuallyDrop::drop(&mut self.journal_handle) };
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use tempest_core::test_utils::setup_tracing;
use tempest_io::VirtualIo;
use tempest_rt::block_on;
use super::*;
use crate::config::ManifestConfig;
fn manifest_dir() -> PathBuf {
PathBuf::from("/manifest")
}
#[test]
fn test_manifest_initializes() {
setup_tracing();
let io = VirtualIo::default();
block_on(io, async {
let mut manifest =
Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
.await
.unwrap();
assert_eq!(manifest.ssts().count(), 0);
assert_eq!(manifest.data.next_filenum, 0);
manifest.close().await.unwrap();
});
}
#[test]
fn test_manifest_alloc_filenum() {
setup_tracing();
let io = VirtualIo::default();
block_on(io, async {
let test_alloc = async || {
let mut manifest =
Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
.await
.unwrap();
assert_eq!(manifest.alloc_filenum(), 0);
assert_eq!(manifest.alloc_filenum(), 1);
assert_eq!(manifest.alloc_filenum(), 2);
manifest.close().await.unwrap();
};
test_alloc().await;
test_alloc().await;
test_alloc().await;
});
}
#[test]
fn test_manifest_record_flush() {
setup_tracing();
let io = VirtualIo::default();
block_on(io, async {
let mut manifest =
Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
.await
.unwrap();
let filenum = manifest.alloc_filenum();
manifest
.record_flush(SstMetadata { filenum, level: 0 })
.await
.unwrap();
assert_eq!(manifest.ssts().count(), 1);
assert_eq!(manifest.ssts_at_level(0).count(), 1);
manifest.close().await.unwrap();
});
}
#[test]
fn test_manifest_recovers() {
setup_tracing();
let io = VirtualIo::default();
block_on(io, async {
{
let mut manifest =
Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
.await
.unwrap();
let a = manifest.alloc_filenum();
let b = manifest.alloc_filenum();
manifest
.record_flush(SstMetadata {
filenum: a,
level: 0,
})
.await
.unwrap();
manifest
.record_flush(SstMetadata {
filenum: b,
level: 0,
})
.await
.unwrap();
manifest.close().await.unwrap();
}
{
let mut manifest =
Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
.await
.unwrap();
assert_eq!(manifest.ssts().count(), 2);
assert_eq!(manifest.alloc_filenum(), 2);
manifest.close().await.unwrap();
}
});
}
#[test]
fn test_manifest_record_compaction() {
setup_tracing();
let io = VirtualIo::default();
block_on(io, async {
let mut manifest =
Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
.await
.unwrap();
let a = manifest.alloc_filenum();
let b = manifest.alloc_filenum();
manifest
.record_flush(SstMetadata {
filenum: a,
level: 0,
})
.await
.unwrap();
manifest
.record_flush(SstMetadata {
filenum: b,
level: 0,
})
.await
.unwrap();
let c = manifest.alloc_filenum();
manifest
.record_compaction(
SstMetadata {
filenum: c,
level: 1,
},
vec![a, b],
)
.await
.unwrap();
assert_eq!(manifest.ssts().count(), 1);
assert_eq!(manifest.ssts_at_level(0).count(), 0);
assert_eq!(manifest.ssts_at_level(1).count(), 1);
manifest.close().await.unwrap();
})
}
}