rollblock 0.4.1

A super-fast, block-oriented and rollbackable key-value store.
Documentation
use std::sync::Arc;

use rayon::ThreadPool;

use crate::error::{StoreError, StoreResult};
use crate::state::shard::StateShard;
use crate::types::{BlockDelta, BlockId, BlockUndo, ShardDelta, ShardUndo, StateStats};

pub(crate) fn commit_block(
    shards: &[Arc<dyn StateShard>],
    thread_pool: Option<&Arc<ThreadPool>>,
    block_height: BlockId,
    delta: BlockDelta,
) -> StoreResult<(StateStats, BlockUndo)> {
    if delta.block_height != block_height {
        return Err(StoreError::BlockDeltaMismatch {
            expected: block_height,
            found: delta.block_height,
        });
    }

    let results: Vec<_> = match thread_pool {
        Some(pool) => pool.install(|| {
            use rayon::prelude::*;
            delta
                .shards
                .par_iter()
                .map(|shard_delta| process_shard_commit(shards, shard_delta))
                .collect()
        }),
        None => delta
            .shards
            .iter()
            .map(|shard_delta| process_shard_commit(shards, shard_delta))
            .collect(),
    };

    let mut total_ops = 0usize;
    let mut modified_keys = 0usize;
    let mut block_undo = BlockUndo {
        block_height,
        shard_undos: Vec::new(),
    };

    for result in results {
        let (ops, keys, shard_undo_opt) = result?;
        total_ops += ops;
        modified_keys += keys;
        if let Some(shard_undo) = shard_undo_opt {
            block_undo.shard_undos.push(shard_undo);
        }
    }

    Ok((
        StateStats {
            operation_count: total_ops,
            modified_keys,
        },
        block_undo,
    ))
}

pub(crate) fn revert_block(
    shards: &[Arc<dyn StateShard>],
    thread_pool: Option<&Arc<ThreadPool>>,
    block_height: BlockId,
    undo: BlockUndo,
) -> StoreResult<()> {
    if undo.block_height != block_height {
        return Err(StoreError::BlockDeltaMismatch {
            expected: block_height,
            found: undo.block_height,
        });
    }

    let results: Vec<_> = match thread_pool {
        Some(pool) => pool.install(|| {
            use rayon::prelude::*;
            undo.shard_undos
                .par_iter()
                .map(|shard_undo| process_shard_revert(shards, shard_undo))
                .collect()
        }),
        None => undo
            .shard_undos
            .iter()
            .map(|shard_undo| process_shard_revert(shards, shard_undo))
            .collect(),
    };

    for result in results {
        result?;
    }

    Ok(())
}

fn process_shard_commit(
    shards: &[Arc<dyn StateShard>],
    shard_delta: &ShardDelta,
) -> StoreResult<(usize, usize, Option<ShardUndo>)> {
    let shard = shards
        .get(shard_delta.shard_index)
        .ok_or(StoreError::InvalidShardIndex {
            shard_index: shard_delta.shard_index,
            shard_count: shards.len(),
        })?;

    let mut shard_undo = shard.apply(&shard_delta.operations);
    let ops_count = shard_delta.operations.len();
    let entry_count = shard_undo.entries.len();

    debug_assert_eq!(
        entry_count,
        shard_delta.undo_entries.len(),
        "planned undo entries diverged from shard undo entries",
    );

    if entry_count > 0 {
        if shard_undo.shard_index != shard_delta.shard_index {
            shard_undo.shard_index = shard_delta.shard_index;
        }
        Ok((ops_count, entry_count, Some(shard_undo)))
    } else {
        Ok((ops_count, 0, None))
    }
}

fn process_shard_revert(shards: &[Arc<dyn StateShard>], shard_undo: &ShardUndo) -> StoreResult<()> {
    let shard = shards
        .get(shard_undo.shard_index)
        .ok_or(StoreError::InvalidShardIndex {
            shard_index: shard_undo.shard_index,
            shard_count: shards.len(),
        })?;
    shard.revert(shard_undo);
    Ok(())
}