liteboxfs 0.1.0

A modern POSIX filesystem in a SQLite database
Documentation
use std::{
    mem,
    ops::{Bound, RangeBounds},
};

use crate::chunker::ChunkerGuard;

use super::{
    buffer::{FullSliceBuf, PartialSliceBuf, SliceBuf, SliceReadMode},
    loaded::{LoadedBlock, LoadedSlice},
    store::{BlockStore, BlockStoreInput},
    types::{Block, BlockId, BlockList, BlockListEditor, BlockSignature, FileId, IndexedBlock},
};

#[derive(Debug)]
pub struct WriteOperationState {
    file: FileId,
    editor: BlockListEditor,
    block_buf: Vec<u8>,
}

impl WriteOperationState {
    pub fn new(file: FileId, block_list: BlockList) -> Self {
        Self {
            file,
            editor: BlockListEditor::new(block_list),
            block_buf: Vec::new(),
        }
    }
}

/// A write operation for modifying blocks within a file in the block store.
///
/// This abstraction has two main responsibilities:
///
/// 1. Track changes to the block list as it writes those changes to the block store so that a
///    separate round trip to fetch the updated block list is not necessary.
/// 2. Reduce unnecessary allocations by reusing a byte buffer for read operations.
#[derive(Debug)]
pub struct WriteOperation<'a, Store> {
    store: &'a mut Store,
    state: WriteOperationState,
}

impl<'a, Store> WriteOperation<'a, Store>
where
    Store: BlockStore,
{
    pub fn new(store: &'a mut Store, state: WriteOperationState) -> Self {
        Self { store, state }
    }

    pub fn len(&self) -> usize {
        self.state.editor.len()
    }

    pub fn with_buf(self, buf: Vec<u8>) -> Self {
        Self {
            state: WriteOperationState {
                block_buf: buf,
                ..self.state
            },
            ..self
        }
    }

    /// Read a block from the block store.
    ///
    /// This consumes the write operation and returns a value representing mutations that can
    /// happen with that block loaded in memory.
    pub fn read_block(mut self, block: &Block) -> crate::Result<LoadedBlock<'a, Store>> {
        self.state.block_buf.clear();
        self.state.block_buf.reserve(block.len());

        let read_block = self.store.read_block(block.id, &mut self.state.block_buf)?;

        // Just a sanity check.
        if read_block.signature != block.signature {
            panic!("Unexpected block signature mismatch when reading a block.");
        }

        let buf = mem::take(&mut self.state.block_buf);

        Ok(LoadedBlock::new(self, read_block.signature, buf))
    }

    /// Insert a new block at the given index.
    pub fn insert_block(&mut self, index: usize, input: BlockStoreInput) -> crate::Result<Block> {
        // Use the optimized append path when inserting at the end of the block list.
        let block_id = if index == self.state.editor.len() {
            self.store
                .append_block(self.state.file, self.state.editor.len(), input.clone())?
        } else {
            self.store
                .insert_block(self.state.file, index, input.clone())?
        };

        let new_block = input.into_block(block_id);

        self.state.editor.insert(index, new_block.clone());

        Ok(new_block)
    }

    /// Remove a range of blocks.
    pub fn remove_blocks<R: RangeBounds<usize> + Clone>(&mut self, range: R) -> crate::Result<()> {
        self.store.remove_blocks(self.state.file, range.clone())?;
        self.state.editor.drain(range);
        Ok(())
    }

    /// Replace multiple existing blocks with one new one.
    pub fn replace_blocks<R: RangeBounds<usize> + Clone>(
        &mut self,
        range: R,
        input: BlockStoreInput,
    ) -> crate::Result<BlockId> {
        let start_index = match range.start_bound() {
            Bound::Included(&idx) => idx,
            Bound::Excluded(&idx) => idx + 1,
            Bound::Unbounded => 0,
        };

        let end_index = match range.end_bound() {
            Bound::Included(&idx) => idx,
            Bound::Excluded(&idx) => idx.saturating_sub(1),
            Bound::Unbounded => self.state.editor.len().saturating_sub(1),
        };

        let block_id = if start_index == end_index {
            self.store
                .replace_block(self.state.file, start_index, input.clone())?
        } else {
            self.store.remove_blocks(self.state.file, range.clone())?;
            self.store
                .insert_block(self.state.file, start_index, input.clone())?
        };

        let new_block = input.into_block(block_id);

        // Update the editor with a single splice operation
        self.state.editor.splice(range, [new_block]);

        Ok(block_id)
    }

    /// Consume this write operation and return the updated block list.
    pub fn finish(self) -> BlockList {
        self.state.editor.finish()
    }
}

pub struct SliceOperation<'a, Store> {
    op: WriteOperation<'a, Store>,
    chunker: &'a mut ChunkerGuard,
}

impl<'a, Store> AsRef<WriteOperation<'a, Store>> for SliceOperation<'a, Store> {
    fn as_ref(&self) -> &WriteOperation<'a, Store> {
        &self.op
    }
}

impl<'a, Store> AsMut<WriteOperation<'a, Store>> for SliceOperation<'a, Store> {
    fn as_mut(&mut self) -> &mut WriteOperation<'a, Store> {
        &mut self.op
    }
}

impl<'a, Store> From<SliceOperation<'a, Store>> for WriteOperation<'a, Store> {
    fn from(slice_op: SliceOperation<'a, Store>) -> Self {
        slice_op.op
    }
}

impl<'a, Store> SliceOperation<'a, Store>
where
    Store: BlockStore,
{
    pub fn new(
        store: &'a mut Store,
        state: WriteOperationState,
        chunker: &'a mut ChunkerGuard,
    ) -> Self {
        Self {
            op: WriteOperation::new(store, state),
            chunker,
        }
    }

    pub fn into_inner(self) -> WriteOperation<'a, Store> {
        self.op
    }

    fn read_full_slice(
        mut self,
        blocks: Vec<IndexedBlock>,
    ) -> crate::Result<LoadedSlice<'a, Store, FullSliceBuf>> {
        self.op.state.block_buf.clear();
        self.op
            .state
            .block_buf
            .reserve(blocks.iter().map(|b| b.block.len()).sum());

        for block in &blocks {
            let read_block = self
                .op
                .store
                .read_block(block.block.id, &mut self.op.state.block_buf)?;

            // Just a sanity check.
            if read_block.signature != block.block.signature {
                panic!("Unexpected block signature mismatch when reading a block.");
            }
        }

        let buf = mem::take(&mut self.op.state.block_buf);

        Ok(LoadedSlice::new(
            self.op,
            self.chunker,
            blocks,
            FullSliceBuf::new(buf),
        ))
    }

    fn read_partial_slice(
        mut self,
        blocks: Vec<IndexedBlock>,
    ) -> crate::Result<LoadedSlice<'a, Store, PartialSliceBuf>> {
        self.op.state.block_buf.clear();

        let (first_signature, last_signature) = match (blocks.first(), blocks.last()) {
            (Some(first), Some(last)) => {
                self.op
                    .state
                    .block_buf
                    .reserve(first.block.len() + last.block.len());

                // We skip reading hole blocks because the data store treats their content as
                // implicit zeros and does not append any bytes to the buffer. Carrying the block
                // signature instead lets the splice logic preserve the hole's sparsity.
                if matches!(first.block.signature, BlockSignature::Data { .. }) {
                    self.op
                        .store
                        .read_block(first.block.id, &mut self.op.state.block_buf)
                        .map(drop)?;
                }

                if matches!(last.block.signature, BlockSignature::Data { .. }) {
                    self.op
                        .store
                        .read_block(last.block.id, &mut self.op.state.block_buf)
                        .map(drop)?;
                }

                (
                    Some(first.block.signature.clone()),
                    Some(last.block.signature.clone()),
                )
            }
            _ => (None, None),
        };

        let slice_buf = PartialSliceBuf::new(
            mem::take(&mut self.op.state.block_buf),
            first_signature,
            last_signature,
        );

        Ok(LoadedSlice::new(self.op, self.chunker, blocks, slice_buf))
    }

    /// Read a slice of blocks from the block store.
    ///
    /// This consumes the write operation and returns a value representing mutations that can
    /// happen with those blocks loaded in memory.
    pub fn read_slice(
        self,
        blocks: Vec<IndexedBlock>,
        read_mode: SliceReadMode,
    ) -> crate::Result<LoadedSlice<'a, Store, SliceBuf>> {
        Ok(match read_mode {
            SliceReadMode::Full => self.read_full_slice(blocks)?.into_slice_buf(),
            SliceReadMode::Partial => self.read_partial_slice(blocks)?.into_slice_buf(),
        })
    }
}