block-db 0.2.0

Local, multi-threaded, durable byte DB.
Documentation
// Authors: Robert Lopez

use super::{data_block::DataBlock, util::bytes::BytesChunkIter, DataFile};
use crate::error::Error;
use std::collections::{HashSet, VecDeque};

impl DataFile {
    pub async fn attempt_write(
        &mut self,
        bytes: &[u8],
        offset: &mut usize,
        new_free_bytes: &mut usize,
        new_used_bytes: &mut usize,
        used_free_chunks: &mut HashSet<usize>,
        new_free_chunk_offsets: &mut VecDeque<usize>,
    ) -> Result<(String, DataBlock), Error> {
        let id = DataBlock::generate_id();
        let mut data_block = DataBlock::default();
        let chunk_size = self.options_store.chunk_size();
        let mut bytes_chunk_iter = BytesChunkIter::new(bytes, chunk_size);

        macro_rules! write_chunk {
            ($chunk:expr, $padding:expr, $offset:expr) => {
                if let Some(padding) = $padding {
                    data_block.padding = padding;

                    self.write_chunk($offset, $chunk).await?;
                    self.write_chunk($offset + $chunk.len(), &vec![0; padding])
                        .await?;
                } else {
                    self.write_chunk($offset, $chunk).await?;
                }
            };
        }

        while let Some((chunk, padding)) = bytes_chunk_iter.next() {
            if let Some(free_chunk_offset) = new_free_chunk_offsets.pop_front() {
                used_free_chunks.insert(free_chunk_offset);
                data_block.chunk_offsets.push_back(free_chunk_offset);

                write_chunk!(chunk, padding, free_chunk_offset);

                *new_free_bytes -= chunk_size;
            } else {
                data_block.chunk_offsets.push_back(*offset);

                write_chunk!(chunk, padding, *offset);

                *offset += chunk_size;
            }

            *new_used_bytes += chunk_size;
        }

        self.flush().await?;

        Ok((id, data_block))
    }

    pub async fn write<Bytes: AsRef<[u8]>>(&mut self, bytes: Bytes) -> Result<String, Error> {
        self.wal_checkpoint_handler().await?;

        let mut new_size = self.size;
        let mut new_free_bytes = self.free_bytes;
        let mut new_used_bytes = self.used_bytes;
        let mut used_free_chunks = HashSet::new();
        let mut new_free_chunk_offsets = self.free_chunk_offsets.clone();

        match self
            .attempt_write(
                bytes.as_ref(),
                &mut new_size,
                &mut new_free_bytes,
                &mut new_used_bytes,
                &mut used_free_chunks,
                &mut new_free_chunk_offsets,
            )
            .await
        {
            Ok((data_block_id, data_block)) => {
                if let Err(err) = self
                    .wal
                    .log(&super::wal::DataFileLog::Write {
                        new_block: data_block.clone(),
                        data_block_id: data_block_id.clone(),
                        used_free_chunks,
                    })
                    .await
                {
                    self.recover().await?;

                    return Err(Error::Walr(err));
                }

                self.size = new_size;
                self.free_bytes = new_free_bytes;
                self.used_bytes = new_used_bytes;
                self.free_chunk_offsets = new_free_chunk_offsets;
                self.data_blocks.insert(data_block_id.clone(), data_block);

                Ok(data_block_id)
            }
            Err(err) => {
                self.recover().await?;

                Err(err)
            }
        }
    }
}