mod apply;
pub mod client;
mod cursor;
mod log_reader;
pub mod protocol;
pub mod server;
pub use apply::ReplicationTarget;
pub use client::ReplicationClient;
pub use cursor::ReplicationCursor;
pub use log_reader::{RawEntry, ShardLogReader};
pub use server::ReplicationServer;
use std::collections::HashMap;
use crate::entry::EntryHeader;
use crate::error::{DbError, DbResult};
use crate::shard::Shard;
pub struct ReplicationEntry {
pub data: Vec<u8>,
pub key_len: u16,
}
#[derive(Default)]
pub struct ReplicationRegistry {
targets: Vec<Box<dyn ReplicationTarget>>,
by_key_len: HashMap<u16, Vec<usize>>,
}
impl ReplicationRegistry {
pub fn register(&mut self, target: Box<dyn ReplicationTarget>) {
let kl = target.key_len() as u16;
let idx = self.targets.len();
self.targets.push(target);
self.by_key_len.entry(kl).or_default().push(idx);
}
pub fn apply_streaming(&self, shard: &Shard, entry: &ReplicationEntry) -> DbResult<()> {
let entry_offset = {
let mut inner = shard.lock();
inner.append_raw_entry(shard.id, &entry.data)?
};
let header = parse_header(&entry.data)?;
let seq = header.sequence();
shard
.gsn()
.fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);
let k = entry.key_len as usize;
if entry.data.len() < 16 + k + header.value_len as usize {
return Ok(());
}
let file_id = shard.active_file_id();
let key = &entry.data[16..16 + k];
let value = &entry.data[16 + k..16 + k + header.value_len as usize];
if let Some(indices) = self.by_key_len.get(&entry.key_len) {
for &idx in indices {
self.targets[idx].apply_entry(
shard.id,
file_id,
entry_offset,
&header,
key,
value,
)?;
}
}
Ok(())
}
pub fn apply_catchup(&self, shard: &Shard, raw: &RawEntry) -> DbResult<()> {
let entry_offset = {
let mut inner = shard.lock();
inner.append_raw_entry(shard.id, &raw.data)?
};
let header = parse_header(&raw.data)?;
let seq = header.sequence();
shard
.gsn()
.fetch_max(seq + 1, std::sync::atomic::Ordering::Relaxed);
let file_id = shard.active_file_id();
let after_header = &raw.data[16..];
for target in &self.targets {
if target.try_apply_entry(shard.id, file_id, entry_offset, &header, after_header)? {
break;
}
}
Ok(())
}
}
fn parse_header(data: &[u8]) -> DbResult<EntryHeader> {
use zerocopy::FromBytes;
EntryHeader::read_from_bytes(&data[..16])
.map_err(|_| DbError::Replication("invalid entry header".into()))
}