tempest-kv 0.0.2

Key-Value storage layer for TempestDB
Documentation
use std::{collections::HashMap, marker::PhantomData, mem::ManuallyDrop, path::PathBuf};

use derive_more::{Display, Error, From};
use serde::{Deserialize, Serialize};
use tempest_core::journal::{Journal, JournalError, JournalHandle, Replayable};
use tempest_io::Io;
use tempest_rt::JoinHandle;

use crate::config::ManifestConfig;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct SstMetadataV1 {
    pub(crate) filenum: u64,
    pub(crate) level: u32,
}

pub(crate) type SstMetadata = SstMetadataV1;

#[derive(Serialize, Deserialize)]
enum SstManifestEditV1 {
    AddFile(SstMetadataV1),
    Compaction {
        added: SstMetadataV1,
        removed: Vec<u64>,
    },
    Snapshot(Vec<ManifestEdit>),
}

#[repr(u8)]
#[derive(Serialize, Deserialize)]
enum ManifestEdit {
    V1(SstManifestEditV1),
}
impl ManifestEdit {
    fn add_file(metadata: SstMetadata) -> Self {
        Self::V1(SstManifestEditV1::AddFile(metadata))
    }

    fn compaction(added: SstMetadata, removed: Vec<u64>) -> Self {
        Self::V1(SstManifestEditV1::Compaction { added, removed })
    }

    fn snapshot(edits: Vec<ManifestEdit>) -> ManifestEdit {
        Self::V1(SstManifestEditV1::Snapshot(edits))
    }
}

#[derive(Default, Clone)]
struct ManifestData {
    ssts: HashMap<u64, SstMetadata>,
    next_filenum: u64,
}

impl ManifestData {
    fn apply_add_file(&mut self, metadata: SstMetadata) {
        // WARN: requires that the newest sst (as per filenum) stays alive
        // See also: WalState::apply match arm for WalStateEditV1::AddFile
        self.next_filenum = (self.next_filenum + 1).max(metadata.filenum);
        let old = self.ssts.insert(metadata.filenum, metadata);
        assert!(old.is_none(), "must not overwrite old manifest entry");
    }

    fn apply_remove_file(&mut self, filenum: u64) {
        let val = self.ssts.remove(&filenum);
        assert!(
            val.is_some(),
            "cannot remove manifest entry that does not exist"
        )
    }

    /// Hands out a unique file number for creating an SST. The file number change is volatile,
    /// until you persist it through a manifest edit, like [`record_flush`].
    ///
    /// [`record_flush`]: SstManifest::record_flush
    pub(crate) fn alloc_filenum(&mut self) -> u64 {
        let filenum = self.next_filenum;
        self.next_filenum += 1;
        filenum
    }
}

impl Replayable for ManifestData {
    type Edit = ManifestEdit;

    fn apply(&mut self, edit: Self::Edit) {
        match edit {
            ManifestEdit::V1(edit) => match edit {
                SstManifestEditV1::AddFile(metadata) => self.apply_add_file(metadata),
                SstManifestEditV1::Compaction { added, removed } => {
                    self.apply_add_file(added);
                    for file in removed {
                        self.apply_remove_file(file);
                    }
                }
                SstManifestEditV1::Snapshot(edits) => {
                    for edit in edits {
                        self.apply(edit)
                    }
                }
            },
        }
    }

    fn snapshot(&self) -> Self::Edit {
        let num_edits = self.ssts.len();
        let mut edits = Vec::with_capacity(num_edits);
        edits.extend(
            self.ssts
                .values()
                .map(|v| ManifestEdit::add_file(v.clone())),
        );
        ManifestEdit::snapshot(edits)
    }

    fn filename_prefix() -> &'static str {
        "manifest"
    }

    fn initial() -> Self {
        Self::default()
    }
}

#[derive(Debug, Display, From, Error)]
pub enum ManifestError {
    Journal(JournalError),
}

pub(crate) struct Manifest<I: Io> {
    data: ManifestData,
    journal: ManuallyDrop<JournalHandle<ManifestData>>,
    journal_handle: ManuallyDrop<JoinHandle<()>>,
    _marker: PhantomData<I>,
}

impl<I: Io> Manifest<I> {
    pub(crate) async fn init(
        dir: PathBuf,
        config: ManifestConfig,
    ) -> Result<Manifest<I>, ManifestError> {
        let (journal, journal_handle) =
            Journal::<ManifestData, I>::new(dir, config.journal.clone()).await?;
        let data = journal.data().clone();
        debug!("manifest now initialized");
        Ok(Manifest {
            data,
            journal: ManuallyDrop::new(journal),
            journal_handle: ManuallyDrop::new(journal_handle),
            _marker: PhantomData,
        })
    }

    pub(crate) async fn close(mut self) -> Result<(), ManifestError> {
        // SAFETY: we manually drop the journal here
        unsafe { ManuallyDrop::drop(&mut self.journal) };
        unsafe { ManuallyDrop::take(&mut self.journal_handle).await };
        std::mem::forget(self); // prevent Drop from running
        Ok(())
    }

    pub(crate) fn alloc_filenum(&mut self) -> u64 {
        self.data.alloc_filenum()
    }

    pub(crate) fn ssts(&self) -> impl Iterator<Item = &SstMetadata> {
        self.data.ssts.values()
    }

    pub(crate) fn ssts_at_level(&self, level: u32) -> impl Iterator<Item = &SstMetadata> {
        self.data.ssts.values().filter(move |m| m.level == level)
    }

    pub(crate) async fn record_flush(
        &mut self,
        metadata: SstMetadata,
    ) -> Result<(), ManifestError> {
        debug!(?metadata, "recording sst flush in manifest");
        self.journal
            .append(ManifestEdit::add_file(metadata.clone()))
            .await?;
        let old = self.data.ssts.insert(metadata.filenum, metadata);
        debug_assert!(old.is_none(), "must not overwrite existing manifest entry");
        Ok(())
    }

    pub(crate) async fn record_compaction(
        &mut self,
        added: SstMetadata,
        removed: Vec<u64>,
    ) -> Result<(), ManifestError> {
        debug!(?added, ?removed, "recording sst compaction in manifest");
        self.journal
            .append(ManifestEdit::compaction(added.clone(), removed.clone()))
            .await?;
        let old = self.data.ssts.insert(added.filenum, added);
        debug_assert!(old.is_none(), "must not overwrite existing manifest entry");
        for filenum in removed {
            let val = self.data.ssts.remove(&filenum);
            debug_assert!(
                val.is_some(),
                "cannot remove manifest entry that does not exist"
            );
        }
        Ok(())
    }
}

impl<I: Io> Drop for Manifest<I> {
    fn drop(&mut self) {
        warn!("Manifest dropped without close()");
        // SAFETY: both fields are ManuallyDrop, so we have to be explicit here
        unsafe { ManuallyDrop::drop(&mut self.journal) };
        unsafe { ManuallyDrop::drop(&mut self.journal_handle) };
    }
}

#[cfg(test)]
mod tests {
    use std::path::PathBuf;

    use tempest_core::test_utils::setup_tracing;
    use tempest_io::VirtualIo;
    use tempest_rt::block_on;

    use super::*;
    use crate::config::ManifestConfig;

    fn manifest_dir() -> PathBuf {
        PathBuf::from("/manifest")
    }

    #[test]
    fn test_manifest_initializes() {
        setup_tracing();
        let io = VirtualIo::default();
        block_on(io, async {
            let mut manifest =
                Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
                    .await
                    .unwrap();
            assert_eq!(manifest.ssts().count(), 0);
            assert_eq!(manifest.data.next_filenum, 0);
            manifest.close().await.unwrap();
        });
    }

    #[test]
    fn test_manifest_alloc_filenum() {
        setup_tracing();
        let io = VirtualIo::default();
        block_on(io, async {
            let test_alloc = async || {
                let mut manifest =
                    Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
                        .await
                        .unwrap();

                assert_eq!(manifest.alloc_filenum(), 0);
                assert_eq!(manifest.alloc_filenum(), 1);
                assert_eq!(manifest.alloc_filenum(), 2);

                manifest.close().await.unwrap();
            };
            // NB: assertions should succeed the same every time,
            // when we dont persist the filenums in another way
            test_alloc().await;
            test_alloc().await;
            test_alloc().await;
        });
    }

    #[test]
    fn test_manifest_record_flush() {
        setup_tracing();
        let io = VirtualIo::default();
        block_on(io, async {
            let mut manifest =
                Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
                    .await
                    .unwrap();

            let filenum = manifest.alloc_filenum();
            manifest
                .record_flush(SstMetadata { filenum, level: 0 })
                .await
                .unwrap();

            assert_eq!(manifest.ssts().count(), 1);
            assert_eq!(manifest.ssts_at_level(0).count(), 1);

            manifest.close().await.unwrap();
        });
    }

    #[test]
    fn test_manifest_recovers() {
        setup_tracing();
        let io = VirtualIo::default();
        block_on(io, async {
            {
                let mut manifest =
                    Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
                        .await
                        .unwrap();

                let a = manifest.alloc_filenum();
                let b = manifest.alloc_filenum();
                manifest
                    .record_flush(SstMetadata {
                        filenum: a,
                        level: 0,
                    })
                    .await
                    .unwrap();
                manifest
                    .record_flush(SstMetadata {
                        filenum: b,
                        level: 0,
                    })
                    .await
                    .unwrap();

                manifest.close().await.unwrap();
            }

            {
                let mut manifest =
                    Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
                        .await
                        .unwrap();

                assert_eq!(manifest.ssts().count(), 2);
                // filenum counter picks up where it left off
                assert_eq!(manifest.alloc_filenum(), 2);

                manifest.close().await.unwrap();
            }
        });
    }

    #[test]
    fn test_manifest_record_compaction() {
        setup_tracing();
        let io = VirtualIo::default();
        block_on(io, async {
            let mut manifest =
                Manifest::<VirtualIo>::init(manifest_dir(), ManifestConfig::default())
                    .await
                    .unwrap();

            // flush two L0 files
            let a = manifest.alloc_filenum();
            let b = manifest.alloc_filenum();
            manifest
                .record_flush(SstMetadata {
                    filenum: a,
                    level: 0,
                })
                .await
                .unwrap();
            manifest
                .record_flush(SstMetadata {
                    filenum: b,
                    level: 0,
                })
                .await
                .unwrap();

            // compact both from L0 into L1
            let c = manifest.alloc_filenum();
            manifest
                .record_compaction(
                    SstMetadata {
                        filenum: c,
                        level: 1,
                    },
                    vec![a, b],
                )
                .await
                .unwrap();

            assert_eq!(manifest.ssts().count(), 1);
            assert_eq!(manifest.ssts_at_level(0).count(), 0);
            assert_eq!(manifest.ssts_at_level(1).count(), 1);

            manifest.close().await.unwrap();
        })
    }
}