jammdb 0.9.0

An embedded single-file database for Rust
Documentation
use std::{
    cell::RefCell,
    collections::HashSet,
    fs::File,
    io::{Seek, SeekFrom, Write},
    marker::PhantomData,
    rc::Rc,
    sync::{MutexGuard, RwLockReadGuard},
};

use crate::{
    bucket::{Bucket, BucketMeta, InnerBucket},
    bytes::ToBytes,
    cursor::ToBuckets,
    db::{DB, MIN_ALLOC_SIZE},
    errors::{Error, Result},
    freelist::TxFreelist,
    meta::Meta,
    node::Node,
    page::{Page, PageID, Pages},
    BucketName,
};

pub(crate) enum TxLock<'tx> {
    Rw(MutexGuard<'tx, File>),
    Ro(RwLockReadGuard<'tx, ()>),
}

impl<'tx> TxLock<'tx> {
    fn writable(&self) -> bool {
        match self {
            Self::Rw(_) => true,
            Self::Ro(_) => false,
        }
    }
}

/// An isolated view of the database
///
/// Transactions are how you can interact with the database.
/// They are created from a [`DB`](struct.DB.html),
/// and can be read-only or writable<sup>1</sup> depending on the paramater you pass into the [`tx`](struct.DB.html#method.tx) method.
/// Transactions are completely isolated from each other, so a read-only transaction can expect the data to stay exactly the same for the life
/// of the transaction, regardless of how many changes are made in other transactions<sup>2</sup>.
///
/// There are four important methods. Check out their documentation for more details:
/// 1. [`get_bucket`](#method.get_bucket) retreives buckets from the root level. Available in read-only or writable transactions.
/// 2. [`create_bucket`](#method.create_bucket) makes new buckets at the root level. Available in writable transactions only.
/// 3. [`detete_bucket`](#method.delete_bucket) deletes a bucket (including all nested buckets) from the database. Available in writable transactions only.
/// 4. [`commit`](#method.commit) saves a writable transaction. Available in writable transactions.
///
/// Trying to use the methods that require writable transactions from a read-only transaction will result in an error. If you make edits in a writable transaction,
/// and you want to save them, you must call the [`commit`](#method.commit) method, otherwise when the transaction is dropped all your changes will be lost.
///
/// # Examples
///
/// ```no_run
/// use jammdb::{DB, Data};
/// # use jammdb::Error;
///
/// # fn main() -> Result<(), Error> {
/// # let db = DB::open("my.db")?;
/// // create a read-only transaction
/// let mut tx1 = db.tx(false)?;
/// // create a writable transcation
/// let mut tx2 = db.tx(true)?;
///
/// // create a new bucket in the writable transaction
/// tx2.create_bucket("new-bucket")?;
///
/// // the read-only transaction will not be able to see the new bucket
/// assert!(tx1.get_bucket("new-bucket").is_err());
///
/// // get a view of an existing bucket from both transactions
/// let mut b1 = tx1.get_bucket("existing-bucket")?;
/// let mut b2 = tx2.get_bucket("existing-bucket")?;
///
/// // make an edit to the bucket
/// b2.put("new-key", "new-value")?;
///
/// // the read-only transaction will not have this new key
/// assert_eq!(b1.get("new-key"), None);
/// // but it will be able to see data that already existed!
/// assert!(b1.get("existing-key").is_some());
///
/// # Ok(())
/// # }
/// ```
///
///
/// <sup>1</sup> There can only be a single writeable transaction at a time, so trying to open
/// two writable transactions on the same thread will deadlock.
///
/// <sup>2</sup> Keep in mind that long running read-only transactions will prevent the database from
/// reclaiming old pages and your database may increase in disk size quickly if you're writing lots of data,
/// so it's a good idea to keep transactions short.
pub struct Tx<'tx> {
    pub(crate) inner: RefCell<TxInner<'tx>>,
}

pub(crate) struct TxInner<'tx> {
    pub(crate) db: &'tx DB,
    pub(crate) lock: TxLock<'tx>,
    pub(crate) root: Rc<RefCell<InnerBucket<'tx>>>,
    pub(crate) meta: Meta,
    pub(crate) freelist: Rc<RefCell<TxFreelist>>,
    pages: Pages,
    num_freelist_pages: u64,
}

impl<'tx> Tx<'tx> {
    pub(crate) fn new(db: &'tx DB, writable: bool) -> Result<Tx<'tx>> {
        let lock = match writable {
            true => TxLock::Rw(db.inner.file.lock()?),
            false => TxLock::Ro(db.inner.mmap_lock.read()?),
        };
        let mut freelist = db.inner.freelist.lock()?.clone();
        let mut meta = db.inner.meta()?;
        debug_assert!(meta.valid());
        {
            let mut open_ro_txs = db.inner.open_ro_txs.lock().unwrap();
            if writable {
                meta.tx_id += 1;
                if open_ro_txs.len() > 0 {
                    freelist.release(open_ro_txs[0]);
                } else {
                    freelist.release(meta.tx_id);
                }
            } else {
                open_ro_txs.push(meta.tx_id);
                open_ro_txs.sort_unstable();
            }
        }
        let freelist = Rc::new(RefCell::new(TxFreelist::new(meta.clone(), freelist)));

        let data = db.inner.data.lock()?.clone();
        let pages = Pages::new(data, db.inner.pagesize);
        let num_freelist_pages = pages.page(meta.freelist_page).overflow + 1;
        let root = InnerBucket::from_meta(meta.root, pages.clone());
        let root = Rc::new(RefCell::new(root));
        let inner = TxInner {
            db,
            lock,
            root,
            meta,
            freelist,
            num_freelist_pages,
            pages,
        };
        Ok(Tx {
            inner: RefCell::new(inner),
        })
    }

    pub(crate) fn writable(&self) -> bool {
        self.inner.borrow().lock.writable()
    }

    /// Returns a reference to the root level bucket with the given name.
    ///
    /// # Errors
    ///
    /// Will return a [`BucketMissing`](enum.Error.html#variant.BucketMissing) error if the bucket does not exist,
    /// or an [`IncompatibleValue`](enum.Error.html#variant.IncompatibleValue) error if the key exists but is not a bucket.
    ///
    /// In a read-only transaction, you will get an error when trying to use any of the bucket's methods that modify data.    
    pub fn get_bucket<'b, T: ToBytes<'tx>>(&'b self, name: T) -> Result<Bucket<'b, 'tx>> {
        let tx = self.inner.borrow();
        let mut root = tx.root.borrow_mut();
        let inner = root.get_bucket(name)?;
        Ok(Bucket {
            inner,
            freelist: tx.freelist.clone(),
            writable: tx.lock.writable(),
            _phantom: PhantomData,
        })
    }

    /// Creates a new bucket with the given name and returns a reference it.
    ///
    /// # Errors
    ///
    /// Will return a [`BucketExists`](enum.Error.html#variant.BucketExists) error if the bucket already exists,
    /// an [`IncompatibleValue`](enum.Error.html#variant.IncompatibleValue) error if the key exists but is not a bucket,
    /// or a [`ReadOnlyTx`](enum.Error.html#variant.ReadOnlyTx) error if this is called on a read-only transaction.
    pub fn create_bucket<'b, T: ToBytes<'tx>>(&'b self, name: T) -> Result<Bucket<'b, 'tx>> {
        let tx = self.inner.borrow();
        if !tx.lock.writable() {
            return Err(Error::ReadOnlyTx);
        }
        let mut root = tx.root.borrow_mut();
        let inner = root.create_bucket(name)?;
        Ok(Bucket {
            inner,
            freelist: tx.freelist.clone(),
            writable: true,
            _phantom: PhantomData,
        })
    }

    /// Creates an existing root-level bucket with the given name if it does not already exist.
    /// Gets the existing bucket if it does exist.
    ///
    /// # Errors
    ///
    /// Will return an [`IncompatibleValue`](enum.Error.html#variant.IncompatibleValue) error if the key exists but is not a bucket,
    /// or a [`ReadOnlyTx`](enum.Error.html#variant.ReadOnlyTx) error if this is called on a read-only transaction.
    pub fn get_or_create_bucket<'b, T: ToBytes<'tx>>(&'b self, name: T) -> Result<Bucket<'b, 'tx>> {
        let tx = self.inner.borrow();
        if !tx.lock.writable() {
            return Err(Error::ReadOnlyTx);
        }
        let mut root = tx.root.borrow_mut();
        let inner = root.get_or_create_bucket(name)?;
        Ok(Bucket {
            inner,
            freelist: tx.freelist.clone(),
            writable: true,
            _phantom: PhantomData,
        })
    }

    /// Deletes an existing root-level bucket with the given name
    ///
    /// # Errors
    ///
    /// Will return a [`BucketMissing`](enum.Error.html#variant.BucketMissing) error if the bucket does not exist,
    /// an [`IncompatibleValue`](enum.Error.html#variant.IncompatibleValue) error if the key exists but is not a bucket,
    /// or a [`ReadOnlyTx`](enum.Error.html#variant.ReadOnlyTx) error if this is called on a read-only transaction.
    pub fn delete_bucket<T: ToBytes<'tx>>(&self, key: T) -> Result<()> {
        let tx = self.inner.borrow();
        if !tx.lock.writable() {
            return Err(Error::ReadOnlyTx);
        }
        let freelist = tx.freelist.clone();
        let mut freelist = freelist.borrow_mut();
        let mut root = tx.root.borrow_mut();
        root.delete_bucket(key, &mut freelist)
    }

    /// Iterator over the root level buckets
    pub fn buckets<'b>(&'b self) -> impl Iterator<Item = (BucketName<'b, 'tx>, Bucket<'b, 'tx>)> {
        let tx = self.inner.borrow();
        let bucket = Bucket {
            inner: tx.root.clone(),
            freelist: tx.freelist.clone(),
            writable: tx.lock.writable(),
            _phantom: PhantomData,
        };
        bucket.cursor().to_buckets()
    }

    /// Writes the changes made in the writeable transaction to the underlying file.
    ///
    /// # Errors
    ///
    /// Will return an [`IOError`](enum.Error.html#variant.IOError) error if there are any io errors while writing to disk,
    /// or a [`ReadOnlyTx`](enum.Error.html#variant.ReadOnlyTx) error if this is called on a read-only transaction.
    pub fn commit(self) -> Result<()> {
        if !self.writable() {
            return Err(Error::ReadOnlyTx);
        }
        let mut tx = self.inner.borrow_mut();
        let freelist = tx.freelist.clone();
        let mut freelist = freelist.borrow_mut();
        let meta = {
            let mut root = tx.root.borrow_mut();
            root.rebalance(&mut freelist)?;
            root.spill(&mut freelist)?
        };
        tx.meta.root = meta;
        tx.write_data(&mut freelist)
    }

    pub(crate) fn check(&self) -> Result<()> {
        self.inner.borrow().check()
    }
}

impl<'tx> TxInner<'tx> {
    fn write_data(&mut self, freelist: &mut TxFreelist) -> Result<()> {
        if let TxLock::Rw(file) = &mut self.lock {
            // Write the freelist to a new page
            {
                freelist.free(self.meta.freelist_page, self.num_freelist_pages);
                let freelist_size = freelist.inner.size();
                let page = freelist.allocate(freelist_size);
                self.meta.freelist_page = page.id;
                let free_page_ids = freelist.inner.pages();
                page.page_type = Page::TYPE_FREELIST;
                page.count = free_page_ids.len() as u64;
                page.freelist_mut()
                    .copy_from_slice(free_page_ids.as_slice());
            }

            // Update our num_pages from the freelist now that we've allocated everything
            self.meta.num_pages = freelist.meta.num_pages;

            // Grow the file, if needed
            let required_size = self.meta.num_pages * self.db.inner.pagesize;
            let current_size = file.metadata()?.len();
            if current_size < required_size {
                let size_diff = required_size - current_size;
                let alloc_size = ((size_diff / MIN_ALLOC_SIZE) + 1) * MIN_ALLOC_SIZE;
                let data = self.db.inner.resize(file, current_size + alloc_size)?;
                self.pages = Pages::new(data, self.db.inner.pagesize);
            }

            // write the data to the file
            {
                // freelist.pages is a BTreeMap so we're writing the pages in order to minmize
                // the random seeks.
                for (page_id, (ptr, size)) in freelist.pages.iter() {
                    let buf = unsafe { std::slice::from_raw_parts(ptr.as_ptr(), *size) };
                    file.seek(SeekFrom::Start(self.db.inner.pagesize * page_id))?;
                    file.write_all(buf)?;
                }
            }
        }
        if self.db.inner.flags.strict_mode {
            self.check()?;
        }
        if let TxLock::Rw(file) = &mut self.lock {
            // write meta page to file
            {
                let mut buf = vec![0; self.db.inner.pagesize as usize];

                #[allow(clippy::cast_ptr_alignment)]
                let mut page = unsafe { &mut *(&mut buf[0] as *mut u8 as *mut Page) };
                let meta_page_id = u64::from(self.meta.meta_page == 0);
                page.id = meta_page_id;
                page.page_type = Page::TYPE_META;
                let m = page.meta_mut();
                m.meta_page = meta_page_id as u32;
                m.magic = self.meta.magic;
                m.version = self.meta.version;
                m.pagesize = self.meta.pagesize;
                m.root = self.meta.root;
                m.num_pages = self.meta.num_pages;
                m.freelist_page = self.meta.freelist_page;
                m.tx_id = self.meta.tx_id;
                m.hash = m.hash_self();

                file.seek(SeekFrom::Start(self.db.inner.pagesize * meta_page_id))?;
                file.write_all(buf.as_slice())?;
            }

            file.flush()?;
            file.sync_all()?;

            let mut lock = self.db.inner.freelist.lock()?;
            *lock = freelist.inner.clone();
            Ok(())
        } else {
            unreachable!()
        }
    }

    fn check(&self) -> Result<()> {
        let mut unused_pages: HashSet<PageID> = (2..self.meta.num_pages).collect();
        let mut page_stack = Vec::new();
        page_stack.push(self.meta.root.root_page);
        page_stack.push(self.meta.freelist_page);
        while !page_stack.is_empty() {
            let page_id = page_stack.pop().unwrap();
            // Make sure this page hasn't already been used
            if !unused_pages.remove(&page_id) {
                return Err(Error::InvalidDB(format!(
                    "Page {} missing from unused_pages",
                    page_id,
                )));
            }
            let page = self.pages.page(page_id);
            // Make sure none of the overflow pages have been used
            for i in 0..page.overflow {
                let page_id = page_id + i + 1;
                if !unused_pages.remove(&page_id) {
                    return Err(Error::InvalidDB(format!(
                        "Overflow Page {} from missing from unused_pages",
                        page_id,
                    )));
                }
            }
            // Check the page type and explore all possible pages
            match page.page_type {
                Page::TYPE_BRANCH => {
                    let mut last: Option<&[u8]> = None;
                    for b in page.branch_elements().iter() {
                        // Make sure we visit every branch page
                        page_stack.push(b.page);
                        // and that the keys are in order
                        if let Some(last) = last {
                            if last >= b.key() {
                                return Err(Error::InvalidDB(format!(
                                    "Branch page {} contains unsorted elements",
                                    page_id
                                )));
                            }
                        }
                        last = Some(b.key());
                    }
                }
                Page::TYPE_LEAF => {
                    let mut last: Option<&[u8]> = None;
                    for (i, leaf) in page.leaf_elements().iter().enumerate() {
                        match leaf.node_type {
                            Node::TYPE_BUCKET => {
                                let meta: BucketMeta = leaf.value().into();
                                // Push all nested bucket pages onto the queue for exploration
                                page_stack.push(meta.root_page);
                            }
                            // Ignore data nodes since they don't point to more pages
                            Node::TYPE_DATA => (),
                            // If somehow it isn't a bucket or data, that's really bad...
                            _ => {
                                return Err(Error::InvalidDB(format!(
                                    "Page {} index {} has an invalid leaf node type {}",
                                    page_id, i, leaf.node_type,
                                )))
                            }
                        }
                        // Make sure all leaf elements are in order
                        if let Some(last) = last {
                            if last >= leaf.key() {
                                // let keys: Vec<&[u8]> =
                                //     page.leaf_elements().iter().map(|l| l.key()).collect();
                                // let key = leaf.key();
                                return Err(Error::InvalidDB(format!(
                                    "Leaf page {} contains unsorted elements",
                                    page_id
                                )));
                            }
                        }
                        last = Some(leaf.key());
                    }
                }
                Page::TYPE_FREELIST => {
                    // Make sure our metadata is pointing at the correct freelist page
                    // and we didn't somehow find our way to another one.
                    if page_id != self.meta.freelist_page {
                        return Err(Error::InvalidDB(format!(
                            "Found Invalid Freelist Page {}",
                            page_id
                        )));
                    }
                    // "visit" all freelist pages (we don't actually care what data is in these pages)
                    for page_id in page.freelist() {
                        if !unused_pages.remove(page_id) {
                            return Err(Error::InvalidDB(format!(
                                "Page {} from freelist missing from unused_pages",
                                page_id,
                            )));
                        }
                    }
                }
                // There are no other valid page types, so getting here is really bad 😅
                _ => {
                    return Err(Error::InvalidDB(format!(
                        "Invalid page type {} for page {}",
                        page.page_type, page_id,
                    )))
                }
            }
        }

        // Once we've explored all of the pages we can reach from the root bucket and freelist,
        // If there are any pages left then we have an invalid database.
        if !unused_pages.is_empty() {
            return Err(Error::InvalidDB(format!(
                "Unreachable pages {:?}",
                unused_pages,
            )));
        }
        Ok(())
    }
}

impl<'tx> Drop for TxInner<'tx> {
    fn drop(&mut self) {
        if !self.lock.writable() {
            let mut open_txs = self.db.inner.open_ro_txs.lock().unwrap();
            let index = match open_txs.binary_search(&self.meta.tx_id) {
                Ok(i) => i,
                _ => return, // this shouldn't happen, but isn't the end of the world if it does
            };
            open_txs.remove(index);
        }
    }
}

#[cfg(test)]
mod tests {
    use std::mem::size_of;

    use super::*;
    use crate::{
        db::{OpenOptions, DB},
        testutil::RandomFile,
    };

    #[test]
    fn test_ro_txs() -> Result<()> {
        let random_file = RandomFile::new();
        let db = DB::open(&random_file)?;

        {
            let tx = db.tx(true)?;
            assert!(tx.create_bucket("abc").is_ok());
            tx.commit()?;
        }

        let tx = db.tx(false)?;
        assert!(tx.create_bucket("def").is_err());
        let b = tx.get_bucket("abc")?;
        assert_eq!(b.put("key", "value"), Err(Error::ReadOnlyTx));
        assert_eq!(b.delete("key"), Err(Error::ReadOnlyTx));
        assert_eq!(b.create_bucket("dev").err(), Some(Error::ReadOnlyTx));
        assert_eq!(tx.commit(), Err(Error::ReadOnlyTx));

        Ok(())
    }

    #[test]
    fn test_concurrent_txs() -> Result<()> {
        let random_file = RandomFile::new();
        let db = OpenOptions::new()
            .pagesize(1024)
            // make sure we have plenty of pages so we don't have to resize while the read-only tx is open
            .num_pages(10)
            .open(&random_file)?;
        {
            // create a read-only tx
            let tx = db.tx(false)?;
            assert!(!tx.writable());
            let tx = tx.inner.borrow_mut();
            assert_eq!(tx.pages.data.len(), 1024 * 10);
            assert!(!tx.lock.writable());
            {
                let open_ro_txs = tx.db.inner.open_ro_txs.lock().unwrap();
                assert_eq!(open_ro_txs.len(), 1);
                assert_eq!(open_ro_txs[0], tx.meta.tx_id);
            }
            {
                // create a writable transaction while the read-only transaction is still open
                let tx = db.tx(true)?;
                assert!(tx.writable());
                {
                    {
                        let inner = tx.inner.borrow_mut();
                        assert_eq!(inner.meta.tx_id, 1);
                        let freelist = inner.freelist.borrow();
                        assert_eq!(freelist.inner.pages(), vec![]);
                    }
                    let b = tx.create_bucket("abc")?;
                    b.put("123", "456")?;
                }
                tx.commit()?;
            }
            {
                // create a second writable transaction while the read-only transaction is still open
                let tx = db.tx(true)?;
                assert!(tx.writable());
                {
                    {
                        let inner = tx.inner.borrow_mut();
                        let freelist = inner.freelist.borrow();
                        assert_eq!(inner.meta.tx_id, 2);
                        assert_eq!(freelist.inner.pages(), vec![2, 3]);
                    }
                    let b = tx.get_bucket("abc")?;
                    b.put("123", "456")?;
                }
                tx.commit()?;
            }
            // let the read-only tx drop
        }
        {
            // make sure we can reuse the freelist
            let tx = db.tx(true)?;
            assert!(tx.writable());
            let inner = tx.inner.borrow_mut();
            let mut freelist = inner.freelist.borrow_mut();
            assert_eq!(freelist.inner.pages(), vec![2, 3, 4, 5, 6]);
            // allocate some pages from the freelist
            assert_eq!(freelist.meta.num_pages, 10);
            let page = freelist.allocate(size_of::<Page>() as u64);
            assert!(page.id == 2);
            assert!(page.overflow == 0);

            let page = freelist.allocate(size_of::<Page>() as u64);
            assert!(page.id == 3);
            assert!(page.overflow == 0);

            let page = freelist.allocate(size_of::<Page>() as u64);
            assert!(page.id == 4);
            assert!(page.overflow == 0);

            let page = freelist.allocate(size_of::<Page>() as u64);
            assert!(page.id == 5);
            assert!(page.overflow == 0);

            let page = freelist.allocate(size_of::<Page>() as u64);
            assert!(page.id == 6);
            assert!(page.overflow == 0);

            // freelist should be empty so make sure the page is new
            assert_eq!(freelist.meta.num_pages, 10);
            let page = freelist.allocate(size_of::<Page>() as u64);
            assert!(page.id == 10);
            assert!(page.overflow == 0);
            assert_eq!(freelist.meta.num_pages, 11);
            assert_eq!(freelist.inner.pages(), vec![]);
        }
        Ok(())
    }
}