armdb 0.2.0

sharded bitcask key-value storage optimized for NVMe
Documentation
mod apply;
pub mod client;
mod cursor;
mod log_reader;
pub mod protocol;
pub mod server;

pub use apply::{ApplyOutcome, ReplicationTarget};
pub use client::{ReplicationClient, ReplicationClientOptions};
pub use cursor::ReplicationCursor;
pub use log_reader::{RawEntry, ShardLogReader};
pub use server::{ReplicationServer, ReplicationServerOptions};

use std::sync::atomic::{AtomicU64, Ordering};

use crate::entry::{EntryHeader, entry_size};
use crate::error::{DbError, DbResult};
use crate::shard::Shard;

/// Entry captured from the write path via SPSC channel.
pub struct ReplicationEntry {
    /// Raw serialized entry bytes (header + key + value + padding).
    pub data: Vec<u8>,
    /// Key length of the tree that produced this entry.
    pub key_len: u16,
}

/// Registry of the single replication target (tree) on a follower.
/// Each `Engine` belongs to exactly one `Tree`, so only one target exists.
pub struct ReplicationRegistry {
    target: Box<dyn ReplicationTarget>,
}

impl ReplicationRegistry {
    pub fn new(target: Box<dyn ReplicationTarget>) -> Self {
        Self { target }
    }

    /// Apply a replicated entry in streaming mode (key_len known from SPSC).
    /// Writes raw bytes to the follower's shard, then updates the tree index.
    ///
    /// `last_applied_gsn` tracks the highest GSN successfully applied by the
    /// client. Entries with `gsn <= last_applied_gsn` are silently skipped
    /// (C12 defensive guard against stale resends from a buggy/old leader).
    pub fn apply_streaming(
        &self,
        shard: &Shard,
        entry: &ReplicationEntry,
        last_applied_gsn: &AtomicU64,
    ) -> DbResult<()> {
        // Pre-flight: header + key + value must fit in data. (C8)
        if entry.data.len() < HEADER_SIZE {
            return Err(DbError::Replication("entry header truncated".into()));
        }
        let header = parse_header(&entry.data)?;

        // C12 defensive: reject stale GSN before acquiring the shard lock.
        let cursor_gsn = last_applied_gsn.load(Ordering::Relaxed);
        if header.sequence() <= cursor_gsn {
            tracing::warn!(
                gsn = header.sequence(),
                cursor_gsn,
                "stale entry rejected (streaming)"
            );
            return Ok(());
        }

        let k = entry.key_len as usize;
        let required = HEADER_SIZE + k + header.value_len as usize;
        if entry.data.len() < required {
            return Err(DbError::Replication("entry data truncated".into()));
        }

        let seq = {
            let mut inner = shard.lock();
            let (file_id, entry_offset) =
                inner.append_raw_entry(shard.id, entry.key_len, &entry.data)?;

            let key = &entry.data[HEADER_SIZE..HEADER_SIZE + k];
            let value = &entry.data[HEADER_SIZE + k..HEADER_SIZE + k + header.value_len as usize];

            let outcome = self.target.apply_entry(
                &mut inner,
                shard.id,
                file_id,
                entry_offset,
                &header,
                key,
                value,
            )?;

            match &outcome {
                ApplyOutcome::Replaced(old) | ApplyOutcome::TombstoneRemoved(old) => {
                    let dead = entry_size(self.target.key_len(), old.len);
                    inner.add_dead_bytes(old.file_id, dead);
                }
                _ => {}
            }

            let seq = header.sequence();
            shard
                .gsn()
                .fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);
            seq
        };

        // Advance after successful apply.
        last_applied_gsn.fetch_max(seq, Ordering::Relaxed);
        Ok(())
    }

    /// Apply a raw entry in catch-up mode (key_len from SyncRequest, CRC verified by log reader).
    ///
    /// `last_applied_gsn` tracks the highest GSN successfully applied by the
    /// client. Entries with `gsn <= last_applied_gsn` are silently skipped
    /// (C12 defensive guard against stale resends from a buggy/old leader).
    pub fn apply_catchup(
        &self,
        shard: &Shard,
        raw: &RawEntry,
        last_applied_gsn: &AtomicU64,
    ) -> DbResult<()> {
        // Pre-flight: header + key + value must fit in data. (C8)
        if raw.data.len() < HEADER_SIZE {
            return Err(DbError::Replication("entry header truncated".into()));
        }
        let header = parse_header(&raw.data)?;

        // C12 defensive: reject stale GSN before acquiring the shard lock.
        let cursor_gsn = last_applied_gsn.load(Ordering::Relaxed);
        if header.sequence() <= cursor_gsn {
            tracing::warn!(
                gsn = header.sequence(),
                cursor_gsn,
                "stale entry rejected (catch-up)"
            );
            return Ok(());
        }

        let k = raw.key_len as usize;
        let required = HEADER_SIZE + k + header.value_len as usize;
        if raw.data.len() < required {
            return Err(DbError::Replication("entry data truncated".into()));
        }

        let seq = {
            let mut inner = shard.lock();
            let (file_id, entry_offset) =
                inner.append_raw_entry(shard.id, raw.key_len, &raw.data)?;

            let after_header = &raw.data[HEADER_SIZE..];

            let outcome = self.target.try_apply_entry(
                &mut inner,
                shard.id,
                file_id,
                entry_offset,
                &header,
                after_header,
            )?;

            if matches!(outcome, ApplyOutcome::NotMatched) {
                // Entry was written to disk but CRC didn't match this target's key_len.
                // In the single-target world every on-disk entry should belong to this target.
                // Log a warning and continue — the entry is on disk and will be replayed on recovery.
                tracing::warn!(
                    file_id,
                    entry_offset,
                    "catch-up: entry CRC did not match target key_len — possible stale multi-target log"
                );
            } else {
                match &outcome {
                    ApplyOutcome::Replaced(old) | ApplyOutcome::TombstoneRemoved(old) => {
                        let dead = entry_size(self.target.key_len(), old.len);
                        inner.add_dead_bytes(old.file_id, dead);
                    }
                    _ => {}
                }
            }

            let seq = header.sequence();
            shard
                .gsn()
                .fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);
            seq
        };

        // Advance after successful apply.
        last_applied_gsn.fetch_max(seq, Ordering::Relaxed);
        Ok(())
    }
}

const HEADER_SIZE: usize = 16;

fn parse_header(data: &[u8]) -> DbResult<EntryHeader> {
    if data.len() < HEADER_SIZE {
        return Err(DbError::Replication("entry header truncated".into()));
    }
    use zerocopy::FromBytes;
    EntryHeader::read_from_bytes(&data[..HEADER_SIZE])
        .map_err(|_| DbError::Replication("invalid entry header".into()))
}

#[cfg(test)]
mod apply_validation_tests {
    use super::*;

    #[test]
    fn parse_header_too_short_does_not_panic() {
        let short = vec![0u8; 5];
        let err = parse_header(&short).unwrap_err();
        assert!(matches!(err, DbError::Replication(_)));
    }
}