use noxu_log::{LogEntryType, Provisional};
use noxu_sync::Mutex;
use std::sync::Arc;
use std::time::Duration;
use crate::error::{RepError, Result};
use crate::net::channel::Channel;
use crc32fast::hash as crc32_hash;
pub trait LogWriter: Send {
fn write_entry(
&mut self,
vlsn: u64,
entry_type: u8,
payload: &[u8],
) -> Result<()>;
}
pub struct EnvironmentLogWriter {
log_manager: Arc<noxu_log::LogManager>,
vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
replay: Option<noxu_dbi::ReplicaReplay>,
}
impl EnvironmentLogWriter {
pub fn new(
log_manager: Arc<noxu_log::LogManager>,
vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
) -> Self {
Self { log_manager, vlsn_index, replay: None }
}
pub fn with_replay(
log_manager: Arc<noxu_log::LogManager>,
vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
replay: noxu_dbi::ReplicaReplay,
) -> Self {
Self { log_manager, vlsn_index, replay: Some(replay) }
}
pub fn last_applied_vlsn_handle(
&self,
) -> Option<std::sync::Arc<std::sync::atomic::AtomicU64>> {
self.replay.as_ref().map(|r| r.last_applied_vlsn_handle())
}
}
impl LogWriter for EnvironmentLogWriter {
fn write_entry(
&mut self,
vlsn: u64,
entry_type: u8,
payload: &[u8],
) -> crate::error::Result<()> {
let log_entry_type = LogEntryType::from_type_num(entry_type)
.ok_or_else(|| {
crate::error::RepError::ProtocolError(format!(
"replica: unknown entry_type byte {}",
entry_type
))
})?;
let lsn = if vlsn > 0 {
self.log_manager.log_with_vlsn(
log_entry_type,
payload,
vlsn,
true,
false,
)
} else {
self.log_manager.log(
log_entry_type,
payload,
Provisional::No,
true,
false,
)
}
.map_err(|e| {
crate::error::RepError::DatabaseError(format!(
"replica log write failed: {}",
e
))
})?;
if vlsn > 0 {
self.vlsn_index.put_with_type(
vlsn,
lsn.file_number(),
lsn.file_offset(),
log_entry_type,
);
}
if let Some(replay) = self.replay.as_mut() {
replay.apply_entry(vlsn, entry_type, payload, lsn);
}
log::trace!(
"replica: wrote entry vlsn={} type={} lsn=({},{})",
vlsn,
log_entry_type,
lsn.file_number(),
lsn.file_offset(),
);
Ok(())
}
}
const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4;
pub struct ReplicaReceiver {
channel: Arc<dyn Channel>,
}
impl ReplicaReceiver {
pub fn new(channel: Arc<dyn Channel>) -> Self {
Self { channel }
}
pub fn run(&self, log_writer: &mut dyn LogWriter) -> Result<()> {
self.run_until(log_writer, None)
}
pub fn run_until(
&self,
log_writer: &mut dyn LogWriter,
shutdown: Option<&std::sync::atomic::AtomicBool>,
) -> Result<()> {
use std::sync::atomic::Ordering;
let recv_timeout = if shutdown.is_some() {
Duration::from_secs(1)
} else {
Duration::from_secs(30)
};
let mut received_vlsn_high_water: u64 = 0;
loop {
if let Some(flag) = shutdown
&& flag.load(Ordering::SeqCst)
{
return Ok(());
}
let frame = match self.channel.receive(recv_timeout) {
Ok(Some(f)) => f,
Ok(None) => {
continue;
}
Err(RepError::ChannelClosed(_)) => {
return Ok(());
}
Err(e) => return Err(e),
};
if frame.len() < FRAME_HEADER_LEN {
return Err(RepError::ProtocolError(format!(
"replica: short frame: {} bytes",
frame.len()
)));
}
let vlsn = u64::from_le_bytes(frame[0..8].try_into().unwrap());
let entry_type = frame[8];
let payload_len =
u32::from_le_bytes(frame[9..13].try_into().unwrap()) as usize;
let expected_crc =
u32::from_le_bytes(frame[13..17].try_into().unwrap());
if frame.len() < FRAME_HEADER_LEN + payload_len {
return Err(RepError::ProtocolError(format!(
"replica: frame payload truncated: expected {} bytes, got {}",
payload_len,
frame.len() - FRAME_HEADER_LEN,
)));
}
let payload =
&frame[FRAME_HEADER_LEN..FRAME_HEADER_LEN + payload_len];
let actual_crc = crc32_hash(payload);
if actual_crc != expected_crc {
return Err(RepError::FrameCorrupted {
vlsn,
expected: expected_crc,
actual: actual_crc,
});
}
if vlsn != 0 && vlsn <= received_vlsn_high_water {
return Err(RepError::ProtocolError(format!(
"replica: VLSN ordering violation: incoming vlsn={vlsn} \
<= received high-water {received_vlsn_high_water}; \
possible replay attack or master clock-skew"
)));
}
if LogEntryType::from_type_num(entry_type).is_none() {
log::error!(
"replica: unknown entry_type byte {entry_type} on frame \
vlsn={vlsn}; skipping (LOG-10)"
);
continue;
}
log_writer.write_entry(vlsn, entry_type, payload)?;
if vlsn != 0 {
received_vlsn_high_water = vlsn;
}
let ack = vlsn.to_le_bytes();
match self.channel.send(&ack) {
Ok(()) => {}
Err(RepError::ChannelClosed(_)) => return Ok(()),
Err(e) => return Err(e),
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplicaStreamState {
Idle,
Connecting,
Streaming,
CatchingUp,
Shutdown,
}
pub struct ReplicaStream {
master_name: Mutex<Option<String>>,
state: Mutex<ReplicaStreamState>,
applied_vlsn: Mutex<u64>,
received_vlsn: Mutex<u64>,
master_vlsn: Mutex<u64>,
pending_entries: Mutex<Vec<(u64, u8, Vec<u8>)>>,
}
impl Default for ReplicaStream {
fn default() -> Self {
Self::new()
}
}
impl ReplicaStream {
pub fn new() -> Self {
ReplicaStream {
master_name: Mutex::new(None),
state: Mutex::new(ReplicaStreamState::Idle),
applied_vlsn: Mutex::new(0),
received_vlsn: Mutex::new(0),
master_vlsn: Mutex::new(0),
pending_entries: Mutex::new(Vec::new()),
}
}
pub fn get_state(&self) -> ReplicaStreamState {
*self.state.lock()
}
pub fn set_state(&self, state: ReplicaStreamState) {
*self.state.lock() = state;
}
pub fn get_applied_vlsn(&self) -> u64 {
*self.applied_vlsn.lock()
}
pub fn get_received_vlsn(&self) -> u64 {
*self.received_vlsn.lock()
}
pub fn get_master_vlsn(&self) -> u64 {
*self.master_vlsn.lock()
}
pub fn set_master(&self, name: &str) {
*self.master_name.lock() = Some(name.to_string());
}
pub fn get_master(&self) -> Option<String> {
self.master_name.lock().clone()
}
pub fn receive_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
self.pending_entries.lock().push((vlsn, entry_type, data));
let mut received = self.received_vlsn.lock();
if vlsn > *received {
*received = vlsn;
}
}
pub fn mark_applied(&self, vlsn: u64) {
let mut applied = self.applied_vlsn.lock();
if vlsn > *applied {
*applied = vlsn;
}
}
pub fn update_master_vlsn(&self, vlsn: u64) {
let mut master = self.master_vlsn.lock();
if vlsn > *master {
*master = vlsn;
}
}
pub fn get_lag(&self) -> u64 {
let master = *self.master_vlsn.lock();
let applied = *self.applied_vlsn.lock();
master.saturating_sub(applied)
}
pub fn drain_pending(&self) -> Vec<(u64, u8, Vec<u8>)> {
let mut pending = self.pending_entries.lock();
std::mem::take(&mut *pending)
}
pub fn is_caught_up(&self) -> bool {
let applied = *self.applied_vlsn.lock();
let master = *self.master_vlsn.lock();
let pending_empty = self.pending_entries.lock().is_empty();
applied >= master && master > 0 && pending_empty
}
}
impl std::fmt::Debug for ReplicaStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReplicaStream")
.field("master", &self.get_master())
.field("state", &self.get_state())
.field("applied_vlsn", &self.get_applied_vlsn())
.field("received_vlsn", &self.get_received_vlsn())
.field("master_vlsn", &self.get_master_vlsn())
.field("lag", &self.get_lag())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::net::channel::LocalChannelPair;
struct RecordingWriter {
entries: Vec<(u64, u8, Vec<u8>)>,
}
impl RecordingWriter {
fn new() -> Self {
Self { entries: Vec::new() }
}
}
impl LogWriter for RecordingWriter {
fn write_entry(
&mut self,
vlsn: u64,
entry_type: u8,
payload: &[u8],
) -> Result<()> {
self.entries.push((vlsn, entry_type, payload.to_vec()));
Ok(())
}
}
fn make_frame(vlsn: u64, entry_type: u8, payload: &[u8]) -> Vec<u8> {
let crc = crc32_hash(payload);
let mut f = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
f.extend_from_slice(&vlsn.to_le_bytes());
f.push(entry_type);
f.extend_from_slice(&(payload.len() as u32).to_le_bytes());
f.extend_from_slice(&crc.to_le_bytes());
f.extend_from_slice(payload);
f
}
#[test]
fn test_replica_receiver_receives_and_acks() {
let pair = LocalChannelPair::new();
let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
let frames = vec![
make_frame(1, 10, &[0xAA]),
make_frame(2, 20, &[0xBB, 0xCC]),
make_frame(3, 30, &[]),
];
let master_clone = Arc::clone(&master_side);
let send_handle = std::thread::spawn(move || {
for f in &frames {
master_clone.send(f).unwrap();
}
let mut acked = Vec::new();
let timeout = Duration::from_secs(5);
for _ in 0..3 {
let ack = master_clone.receive(timeout).unwrap().unwrap();
let vlsn = u64::from_le_bytes(ack[..8].try_into().unwrap());
acked.push(vlsn);
}
master_clone.close().unwrap();
acked
});
let receiver = ReplicaReceiver::new(Arc::clone(&replica_side));
let mut writer = RecordingWriter::new();
receiver.run(&mut writer).unwrap();
let acked = send_handle.join().unwrap();
assert_eq!(acked, vec![1, 2, 3]);
assert_eq!(writer.entries.len(), 3);
assert_eq!(writer.entries[0], (1, 10, vec![0xAA]));
assert_eq!(writer.entries[1], (2, 20, vec![0xBB, 0xCC]));
assert_eq!(writer.entries[2], (3, 30, vec![]));
}
#[test]
fn test_replica_receiver_stops_on_channel_close() {
let pair = LocalChannelPair::new();
let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
master_side.close().unwrap();
let receiver = ReplicaReceiver::new(replica_side);
let mut writer = RecordingWriter::new();
let res = receiver.run(&mut writer);
assert!(res.is_ok() || matches!(res, Err(RepError::ChannelClosed(_))));
}
#[test]
fn test_feeder_to_replica_round_trip() {
use crate::stream::feeder::{FeederRunner, LogScanner};
use std::collections::VecDeque;
struct SimpleScanner {
items: VecDeque<(u64, u8, Vec<u8>)>,
}
impl LogScanner for SimpleScanner {
fn next_entry(
&mut self,
from_vlsn: u64,
) -> Option<(u64, u8, Vec<u8>)> {
if let Some(&(v, _, _)) = self.items.front()
&& v >= from_vlsn
{
return self.items.pop_front();
}
None
}
}
let pair = LocalChannelPair::new();
let master_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
let valid_types: [u8; 5] = [1, 2, 3, 4, 10];
let entries: Vec<(u64, u8, Vec<u8>)> = (1..=5)
.map(|i| {
let etype = valid_types[(i - 1) as usize];
(i, etype, vec![i as u8; i as usize])
})
.collect();
let replica_ch_clone = Arc::clone(&replica_ch);
let replica_handle = std::thread::spawn(move || {
let receiver = ReplicaReceiver::new(replica_ch_clone);
let mut writer = RecordingWriter::new();
receiver.run(&mut writer).unwrap();
writer.entries
});
let master_ch_clone = Arc::clone(&master_ch);
let feeder_handle = std::thread::spawn(move || {
let runner = FeederRunner::new(Arc::clone(&master_ch_clone), 1);
let mut scanner =
SimpleScanner { items: entries.into_iter().collect() };
runner.run(&mut scanner).unwrap();
runner.known_replica_vlsn()
});
std::thread::sleep(Duration::from_millis(200));
master_ch.close().unwrap();
replica_ch.close().unwrap();
let last_acked = feeder_handle.join().unwrap();
let written = replica_handle.join().unwrap();
assert_eq!(written.len(), 5);
for (i, (vlsn, etype, payload)) in written.iter().enumerate() {
let expected_vlsn = (i + 1) as u64;
assert_eq!(*vlsn, expected_vlsn);
assert_eq!(*etype, valid_types[i]);
assert_eq!(payload.len(), expected_vlsn as usize);
}
assert_eq!(last_acked, 5);
}
#[test]
fn test_replica_rejects_replayed_vlsn() {
let pair = LocalChannelPair::new();
let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
let frames =
vec![make_frame(5, 10, b"first"), make_frame(3, 10, b"replay")];
let master_clone = Arc::clone(&master_side);
let _send_handle = std::thread::spawn(move || {
for f in &frames {
let _ = master_clone.send(f);
}
let _ = master_clone.receive(Duration::from_secs(2));
});
let receiver = ReplicaReceiver::new(replica_side);
let mut writer = RecordingWriter::new();
let res = receiver.run(&mut writer);
match res {
Err(RepError::ProtocolError(msg)) => {
assert!(
msg.contains("VLSN ordering violation"),
"expected VLSN-ordering protocol error, got: {msg}"
);
}
other => {
panic!("expected ProtocolError on replay, got {other:?}")
}
}
assert_eq!(writer.entries.len(), 1);
assert_eq!(writer.entries[0].0, 5);
}
#[test]
fn test_replica_rejects_duplicate_vlsn() {
let pair = LocalChannelPair::new();
let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
let frames = vec![make_frame(7, 10, b"a"), make_frame(7, 10, b"b")];
let master_clone = Arc::clone(&master_side);
let _send_handle = std::thread::spawn(move || {
for f in &frames {
let _ = master_clone.send(f);
}
let _ = master_clone.receive(Duration::from_secs(2));
});
let receiver = ReplicaReceiver::new(replica_side);
let mut writer = RecordingWriter::new();
let res = receiver.run(&mut writer);
assert!(
matches!(res, Err(RepError::ProtocolError(_))),
"expected ProtocolError on duplicate VLSN, got {res:?}"
);
assert_eq!(writer.entries.len(), 1);
}
#[test]
fn test_replica_allows_vlsn_gap() {
let pair = LocalChannelPair::new();
let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
let frames = vec![
make_frame(1, 10, b"a"),
make_frame(5, 10, b"b"),
make_frame(100, 10, b"c"),
];
let master_clone = Arc::clone(&master_side);
let send_handle = std::thread::spawn(move || {
for f in &frames {
master_clone.send(f).unwrap();
}
for _ in 0..3 {
let _ = master_clone.receive(Duration::from_secs(2));
}
master_clone.close().unwrap();
});
let receiver = ReplicaReceiver::new(replica_side);
let mut writer = RecordingWriter::new();
receiver.run(&mut writer).unwrap();
send_handle.join().unwrap();
assert_eq!(writer.entries.len(), 3);
assert_eq!(writer.entries[0].0, 1);
assert_eq!(writer.entries[1].0, 5);
assert_eq!(writer.entries[2].0, 100);
}
#[test]
fn test_replica_skips_unknown_entry_type() {
let pair = LocalChannelPair::new();
let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
let frames =
vec![make_frame(1, 200, b"bogus"), make_frame(2, 10, b"good")];
let master_clone = Arc::clone(&master_side);
let send_handle = std::thread::spawn(move || {
for f in &frames {
master_clone.send(f).unwrap();
}
let ack = master_clone.receive(Duration::from_secs(2)).unwrap();
master_clone.close().unwrap();
ack
});
let receiver = ReplicaReceiver::new(replica_side);
let mut writer = RecordingWriter::new();
receiver.run(&mut writer).unwrap();
let ack = send_handle.join().unwrap();
let acked_vlsn =
u64::from_le_bytes(ack.unwrap()[..8].try_into().unwrap());
assert_eq!(writer.entries.len(), 1, "bogus frame must be skipped");
assert_eq!(writer.entries[0].0, 2);
assert_eq!(writer.entries[0].1, 10);
assert_eq!(acked_vlsn, 2);
}
#[test]
fn test_new_replica_stream() {
let stream = ReplicaStream::new();
assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
assert_eq!(stream.get_applied_vlsn(), 0);
assert_eq!(stream.get_received_vlsn(), 0);
assert_eq!(stream.get_master_vlsn(), 0);
assert!(stream.get_master().is_none());
assert_eq!(stream.get_lag(), 0);
}
#[test]
fn test_default() {
let stream = ReplicaStream::default();
assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
}
#[test]
fn test_state_transitions() {
let stream = ReplicaStream::new();
assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
stream.set_state(ReplicaStreamState::Connecting);
assert_eq!(stream.get_state(), ReplicaStreamState::Connecting);
stream.set_state(ReplicaStreamState::Streaming);
assert_eq!(stream.get_state(), ReplicaStreamState::Streaming);
stream.set_state(ReplicaStreamState::CatchingUp);
assert_eq!(stream.get_state(), ReplicaStreamState::CatchingUp);
stream.set_state(ReplicaStreamState::Shutdown);
assert_eq!(stream.get_state(), ReplicaStreamState::Shutdown);
}
#[test]
fn test_master_name() {
let stream = ReplicaStream::new();
assert!(stream.get_master().is_none());
stream.set_master("master-node-1");
assert_eq!(stream.get_master(), Some("master-node-1".to_string()));
stream.set_master("master-node-2");
assert_eq!(stream.get_master(), Some("master-node-2".to_string()));
}
#[test]
fn test_receive_and_drain() {
let stream = ReplicaStream::new();
stream.receive_entry(1, 10, vec![0xAA]);
stream.receive_entry(2, 20, vec![0xBB, 0xCC]);
stream.receive_entry(3, 30, vec![]);
assert_eq!(stream.get_received_vlsn(), 3);
let entries = stream.drain_pending();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0], (1, 10, vec![0xAA]));
assert_eq!(entries[1], (2, 20, vec![0xBB, 0xCC]));
assert_eq!(entries[2], (3, 30, vec![]));
let entries2 = stream.drain_pending();
assert!(entries2.is_empty());
}
#[test]
fn test_received_vlsn_monotonic() {
let stream = ReplicaStream::new();
stream.receive_entry(5, 1, vec![]);
assert_eq!(stream.get_received_vlsn(), 5);
stream.receive_entry(3, 1, vec![]);
assert_eq!(stream.get_received_vlsn(), 5);
stream.receive_entry(7, 1, vec![]);
assert_eq!(stream.get_received_vlsn(), 7);
}
#[test]
fn test_mark_applied() {
let stream = ReplicaStream::new();
stream.mark_applied(5);
assert_eq!(stream.get_applied_vlsn(), 5);
stream.mark_applied(10);
assert_eq!(stream.get_applied_vlsn(), 10);
stream.mark_applied(7);
assert_eq!(stream.get_applied_vlsn(), 10);
}
#[test]
fn test_update_master_vlsn() {
let stream = ReplicaStream::new();
stream.update_master_vlsn(100);
assert_eq!(stream.get_master_vlsn(), 100);
stream.update_master_vlsn(150);
assert_eq!(stream.get_master_vlsn(), 150);
stream.update_master_vlsn(120);
assert_eq!(stream.get_master_vlsn(), 150);
}
#[test]
fn test_lag_calculation() {
let stream = ReplicaStream::new();
stream.update_master_vlsn(100);
assert_eq!(stream.get_lag(), 100);
stream.mark_applied(50);
assert_eq!(stream.get_lag(), 50);
stream.mark_applied(100);
assert_eq!(stream.get_lag(), 0);
stream.mark_applied(110);
assert_eq!(stream.get_lag(), 0);
}
#[test]
fn test_is_caught_up() {
let stream = ReplicaStream::new();
assert!(!stream.is_caught_up());
stream.update_master_vlsn(10);
assert!(!stream.is_caught_up());
stream.mark_applied(10);
assert!(stream.is_caught_up());
stream.receive_entry(11, 1, vec![]);
stream.update_master_vlsn(11);
assert!(!stream.is_caught_up());
stream.drain_pending();
stream.mark_applied(11);
assert!(stream.is_caught_up());
}
#[test]
fn test_caught_up_with_excess_applied() {
let stream = ReplicaStream::new();
stream.update_master_vlsn(5);
stream.mark_applied(10);
assert!(stream.is_caught_up());
}
#[test]
fn test_receive_apply_cycle() {
let stream = ReplicaStream::new();
stream.set_master("master1");
stream.set_state(ReplicaStreamState::Streaming);
stream.update_master_vlsn(5);
for i in 1..=5 {
stream.receive_entry(i, 1, vec![i as u8]);
}
assert_eq!(stream.get_received_vlsn(), 5);
assert_eq!(stream.get_lag(), 5);
let entries = stream.drain_pending();
assert_eq!(entries.len(), 5);
for (vlsn, _, _) in &entries {
stream.mark_applied(*vlsn);
}
assert_eq!(stream.get_applied_vlsn(), 5);
assert_eq!(stream.get_lag(), 0);
assert!(stream.is_caught_up());
}
#[test]
fn test_debug_format() {
let stream = ReplicaStream::new();
stream.set_master("test-master");
stream.set_state(ReplicaStreamState::Streaming);
let debug = format!("{:?}", stream);
assert!(debug.contains("test-master"));
assert!(debug.contains("Streaming"));
}
}