armdb 0.1.13

sharded bitcask key-value storage optimized for NVMe
Documentation
mod apply;
pub mod client;
mod cursor;
mod log_reader;
pub mod protocol;
pub mod server;

pub use apply::ReplicationTarget;
pub use client::ReplicationClient;
pub use cursor::ReplicationCursor;
pub use log_reader::{RawEntry, ShardLogReader};
pub use server::ReplicationServer;

use std::collections::HashMap;

use crate::entry::EntryHeader;
use crate::error::{DbError, DbResult};
use crate::shard::Shard;

/// Entry captured from the write path via SPSC channel.
pub struct ReplicationEntry {
    /// Raw serialized entry bytes (header + key + value + padding).
    pub data: Vec<u8>,
    /// Key length of the tree that produced this entry.
    pub key_len: u16,
}

/// Registry of replication targets (trees) on a follower.
/// Routes incoming entries to the correct tree's index.
#[derive(Default)]
pub struct ReplicationRegistry {
    targets: Vec<Box<dyn ReplicationTarget>>,
    /// key_len → indices into `targets` for O(1) routing in streaming mode.
    by_key_len: HashMap<u16, Vec<usize>>,
}

impl ReplicationRegistry {
    pub fn register(&mut self, target: Box<dyn ReplicationTarget>) {
        let kl = target.key_len() as u16;
        let idx = self.targets.len();
        self.targets.push(target);
        self.by_key_len.entry(kl).or_default().push(idx);
    }

    /// Apply a replicated entry in streaming mode (key_len known from SPSC).
    /// Writes raw bytes to the follower's shard, then updates the tree index.
    pub fn apply_streaming(&self, shard: &Shard, entry: &ReplicationEntry) -> DbResult<()> {
        let entry_offset = {
            let mut inner = shard.lock();
            inner.append_raw_entry(shard.id, &entry.data)?
        };

        let header = parse_header(&entry.data)?;
        let seq = header.sequence();
        shard
            .gsn()
            .fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);

        let k = entry.key_len as usize;
        if entry.data.len() < 16 + k + header.value_len as usize {
            return Ok(());
        }

        let file_id = shard.active_file_id();
        let key = &entry.data[16..16 + k];
        let value = &entry.data[16 + k..16 + k + header.value_len as usize];

        if let Some(indices) = self.by_key_len.get(&entry.key_len) {
            for &idx in indices {
                self.targets[idx].apply_entry(
                    shard.id,
                    file_id,
                    entry_offset,
                    &header,
                    key,
                    value,
                )?;
            }
        }

        Ok(())
    }

    /// Apply a raw entry in catch-up mode (key_len unknown, CRC-based routing).
    pub fn apply_catchup(&self, shard: &Shard, raw: &RawEntry) -> DbResult<()> {
        let entry_offset = {
            let mut inner = shard.lock();
            inner.append_raw_entry(shard.id, &raw.data)?
        };

        let header = parse_header(&raw.data)?;
        let seq = header.sequence();
        shard
            .gsn()
            .fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);

        let file_id = shard.active_file_id();
        let after_header = &raw.data[16..];

        for target in &self.targets {
            if target.try_apply_entry(shard.id, file_id, entry_offset, &header, after_header)? {
                break;
            }
        }

        Ok(())
    }
}

fn parse_header(data: &[u8]) -> DbResult<EntryHeader> {
    use zerocopy::FromBytes;
    EntryHeader::read_from_bytes(&data[..16])
        .map_err(|_| DbError::Replication("invalid entry header".into()))
}