rollblock 0.4.1

A super-fast, block-oriented and rollbackable key-value store.
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use crate::error::{StoreError, StoreResult};
use crate::state::shard::StateShard;
use crate::types::{
    BlockDelta, BlockId, Operation, ShardDelta, ShardOp, StoreKey as Key, UndoEntry, UndoOp, Value,
    ValueBuf,
};

pub(crate) fn plan_block_delta(
    shards: &[Arc<dyn StateShard>],
    block_height: BlockId,
    ops: &[Operation],
) -> StoreResult<BlockDelta> {
    if ops.is_empty() {
        return Ok(BlockDelta {
            block_height,
            shards: Vec::new(),
        });
    }

    let shard_count = shards.len();

    let mut deltas: Vec<ShardDelta> = (0..shard_count)
        .map(|index| ShardDelta {
            shard_index: index,
            operations: Vec::new(),
            undo_entries: Vec::new(),
        })
        .collect();
    let mut planned_states: Vec<HashMap<Key, Option<ValueBuf>>> =
        (0..shard_count).map(|_| HashMap::new()).collect();

    for op in ops {
        let shard_index = shard_for_key(shards, &op.key)?;
        let shard = shards
            .get(shard_index)
            .ok_or(StoreError::NoShardsConfigured)?;

        let shard_delta = &mut deltas[shard_index];
        shard_delta.operations.push(ShardOp {
            key: op.key,
            value: op.value.clone(),
        });

        let plan = &mut planned_states[shard_index];
        let previous_state = if let Some(state) = plan.get(&op.key) {
            state.clone()
        } else {
            shard.get(&op.key)
        };

        if op.is_delete() {
            if let Some(ref previous) = previous_state {
                shard_delta.undo_entries.push(UndoEntry {
                    key: op.key,
                    previous: Some(Value::from(previous.clone())),
                    op: UndoOp::Deleted,
                });
            }
            plan.insert(op.key, None);
        } else {
            let undo_op = if previous_state.is_some() {
                UndoOp::Updated
            } else {
                UndoOp::Inserted
            };

            shard_delta.undo_entries.push(UndoEntry {
                key: op.key,
                previous: previous_state.as_ref().map(|buf| Value::from(buf.clone())),
                op: undo_op,
            });

            plan.insert(op.key, Some(ValueBuf::from(&op.value)));
        }
    }

    deltas.retain(|delta| !delta.operations.is_empty());

    Ok(BlockDelta {
        block_height,
        shards: deltas,
    })
}

fn shard_for_key(shards: &[Arc<dyn StateShard>], key: &Key) -> StoreResult<usize> {
    if shards.is_empty() {
        return Err(StoreError::NoShardsConfigured);
    }

    let shard_count = shards.len();
    let hash = super::hashing::shard_hash(key);
    Ok((hash % shard_count as u64) as usize)
}