mod apply;
pub mod client;
mod cursor;
mod log_reader;
pub mod protocol;
pub mod server;
pub use apply::{ApplyOutcome, ReplicationTarget};
pub use client::{ReplicationClient, ReplicationClientOptions};
pub use cursor::ReplicationCursor;
pub use log_reader::{RawEntry, ShardLogReader};
pub use server::{ReplicationServer, ReplicationServerOptions};
use std::sync::atomic::{AtomicU64, Ordering};
use crate::entry::{EntryHeader, entry_size};
use crate::error::{DbError, DbResult};
use crate::shard::Shard;
pub struct ReplicationEntry {
pub data: Vec<u8>,
pub key_len: u16,
}
pub struct ReplicationRegistry {
target: Box<dyn ReplicationTarget>,
}
impl ReplicationRegistry {
pub fn new(target: Box<dyn ReplicationTarget>) -> Self {
Self { target }
}
pub fn apply_streaming(
&self,
shard: &Shard,
entry: &ReplicationEntry,
last_applied_gsn: &AtomicU64,
) -> DbResult<()> {
if entry.data.len() < HEADER_SIZE {
return Err(DbError::Replication("entry header truncated".into()));
}
let header = parse_header(&entry.data)?;
let cursor_gsn = last_applied_gsn.load(Ordering::Relaxed);
if header.sequence() <= cursor_gsn {
tracing::warn!(
gsn = header.sequence(),
cursor_gsn,
"stale entry rejected (streaming)"
);
return Ok(());
}
let k = entry.key_len as usize;
let required = HEADER_SIZE + k + header.value_len as usize;
if entry.data.len() < required {
return Err(DbError::Replication("entry data truncated".into()));
}
let seq = {
let mut inner = shard.lock();
let (file_id, entry_offset) =
inner.append_raw_entry(shard.id, entry.key_len, &entry.data)?;
let key = &entry.data[HEADER_SIZE..HEADER_SIZE + k];
let value = &entry.data[HEADER_SIZE + k..HEADER_SIZE + k + header.value_len as usize];
let outcome = self.target.apply_entry(
&mut inner,
shard.id,
file_id,
entry_offset,
&header,
key,
value,
)?;
match &outcome {
ApplyOutcome::Replaced(old) | ApplyOutcome::TombstoneRemoved(old) => {
let dead = entry_size(self.target.key_len(), old.len);
inner.add_dead_bytes(old.file_id, dead);
}
_ => {}
}
let seq = header.sequence();
shard
.gsn()
.fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);
seq
};
last_applied_gsn.fetch_max(seq, Ordering::Relaxed);
Ok(())
}
pub fn apply_catchup(
&self,
shard: &Shard,
raw: &RawEntry,
last_applied_gsn: &AtomicU64,
) -> DbResult<()> {
if raw.data.len() < HEADER_SIZE {
return Err(DbError::Replication("entry header truncated".into()));
}
let header = parse_header(&raw.data)?;
let cursor_gsn = last_applied_gsn.load(Ordering::Relaxed);
if header.sequence() <= cursor_gsn {
tracing::warn!(
gsn = header.sequence(),
cursor_gsn,
"stale entry rejected (catch-up)"
);
return Ok(());
}
let k = raw.key_len as usize;
let required = HEADER_SIZE + k + header.value_len as usize;
if raw.data.len() < required {
return Err(DbError::Replication("entry data truncated".into()));
}
let seq = {
let mut inner = shard.lock();
let (file_id, entry_offset) =
inner.append_raw_entry(shard.id, raw.key_len, &raw.data)?;
let after_header = &raw.data[HEADER_SIZE..];
let outcome = self.target.try_apply_entry(
&mut inner,
shard.id,
file_id,
entry_offset,
&header,
after_header,
)?;
if matches!(outcome, ApplyOutcome::NotMatched) {
tracing::warn!(
file_id,
entry_offset,
"catch-up: entry CRC did not match target key_len — possible stale multi-target log"
);
} else {
match &outcome {
ApplyOutcome::Replaced(old) | ApplyOutcome::TombstoneRemoved(old) => {
let dead = entry_size(self.target.key_len(), old.len);
inner.add_dead_bytes(old.file_id, dead);
}
_ => {}
}
}
let seq = header.sequence();
shard
.gsn()
.fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);
seq
};
last_applied_gsn.fetch_max(seq, Ordering::Relaxed);
Ok(())
}
}
const HEADER_SIZE: usize = 16;
fn parse_header(data: &[u8]) -> DbResult<EntryHeader> {
if data.len() < HEADER_SIZE {
return Err(DbError::Replication("entry header truncated".into()));
}
use zerocopy::FromBytes;
EntryHeader::read_from_bytes(&data[..HEADER_SIZE])
.map_err(|_| DbError::Replication("invalid entry header".into()))
}
#[cfg(test)]
mod apply_validation_tests {
use super::*;
#[test]
fn parse_header_too_short_does_not_panic() {
let short = vec![0u8; 5];
let err = parse_header(&short).unwrap_err();
assert!(matches!(err, DbError::Replication(_)));
}
}