use noxu_dbi::EnvironmentImpl;
use noxu_log::MAX_ITEM_SIZE;
use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
use noxu_log::file_header::LOG_VERSION as LOG_FILE_VERSION;
use noxu_log::file_header::on_disk_size as file_header_on_disk_size;
use noxu_log::file_manager::FileManager;
use noxu_sync::Mutex;
use noxu_util::lsn::{Lsn, NULL_LSN};
use std::sync::Arc;
use std::time::{Duration, Instant};
use crate::error::{RepError, Result};
use crate::net::channel::Channel;
use crc32fast;
pub trait LogScanner: Send {
fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)>;
}
pub struct EnvironmentLogScanner {
file_manager: Arc<FileManager>,
cursor_file: u32,
cursor_offset: u64,
last_returned_vlsn: u64,
}
impl EnvironmentLogScanner {
pub fn new(env: &EnvironmentImpl, start_lsn: Option<Lsn>) -> Option<Self> {
let env_home = env.get_env_home().to_path_buf();
let fm = Arc::new(
FileManager::new(&env_home, true, 256 * 1024 * 1024, 32).ok()?,
);
let (cursor_file, cursor_offset) = match start_lsn {
Some(lsn) if lsn != NULL_LSN => {
(lsn.file_number(), lsn.file_offset() as u64)
}
_ => {
let file0_offset =
fm.file_header_size_for(0).unwrap_or_else(|_| {
file_header_on_disk_size(LOG_FILE_VERSION)
}) as u64;
(0, file0_offset)
}
};
Some(Self {
file_manager: fm,
cursor_file,
cursor_offset,
last_returned_vlsn: 0,
})
}
fn read_raw_entry(
&self,
file_num: u32,
offset: u64,
) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
let mut hdr = [0u8; MIN_HEADER_SIZE];
let n = self
.file_manager
.read_from_file(file_num, offset, &mut hdr)
.ok()?;
if n < MIN_HEADER_SIZE {
return None;
}
if hdr[4] == 0 {
return None;
}
let entry_type_byte = hdr[4];
let flags = hdr[5];
let item_size =
u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
let header_size =
if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
if item_size > MAX_ITEM_SIZE {
return None;
}
let entry_size = header_size + item_size;
let mut full = vec![0u8; entry_size];
let n = self
.file_manager
.read_from_file(file_num, offset, &mut full)
.ok()?;
if n < entry_size {
return None;
}
let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
let raw = i64::from_le_bytes(
full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
);
if raw > 0 {
Some(raw as u64)
} else {
log::warn!(
"EnvironmentLogScanner: implausible VLSN value {} at \
file {:08x} offset {:#x}; treating as no-VLSN",
raw,
file_num,
offset,
);
None
}
} else {
None
};
let payload = full[header_size..].to_vec();
Some((entry_size, vlsn_opt, entry_type_byte, payload))
}
}
impl LogScanner for EnvironmentLogScanner {
fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
let file_nums = self.file_manager.list_file_numbers().ok()?;
if file_nums.is_empty() {
return None;
}
loop {
if !file_nums.contains(&self.cursor_file) {
let next =
file_nums.iter().find(|&&n| n > self.cursor_file).copied();
match next {
Some(n) => {
self.cursor_file = n;
self.cursor_offset =
self.file_manager
.file_header_size_for(n)
.unwrap_or_else(|_| {
file_header_on_disk_size(LOG_FILE_VERSION)
}) as u64;
}
None => return None, }
}
let file_len =
self.file_manager.get_file_length(self.cursor_file).ok()?;
if self.cursor_offset >= file_len {
let next =
file_nums.iter().find(|&&n| n > self.cursor_file).copied();
match next {
Some(n) => {
self.cursor_file = n;
self.cursor_offset =
self.file_manager
.file_header_size_for(n)
.unwrap_or_else(|_| {
file_header_on_disk_size(LOG_FILE_VERSION)
}) as u64;
continue;
}
None => return None, }
}
match self.read_raw_entry(self.cursor_file, self.cursor_offset) {
None => {
let next = file_nums
.iter()
.find(|&&n| n > self.cursor_file)
.copied();
match next {
Some(n) => {
self.cursor_file = n;
self.cursor_offset = self
.file_manager
.file_header_size_for(n)
.unwrap_or_else(|_| {
file_header_on_disk_size(LOG_FILE_VERSION)
})
as u64;
continue;
}
None => return None,
}
}
Some((entry_size, vlsn_opt, entry_type_byte, payload)) => {
self.cursor_offset += entry_size as u64;
if let Some(vlsn) = vlsn_opt
&& vlsn >= from_vlsn
&& vlsn > self.last_returned_vlsn
{
self.last_returned_vlsn = vlsn;
return Some((vlsn, entry_type_byte, payload));
}
}
}
}
}
}
const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4;
pub struct FeederRunner {
channel: Arc<dyn Channel>,
vlsn_start: u64,
known_replica_vlsn: Mutex<u64>,
}
impl FeederRunner {
pub fn new(channel: Arc<dyn Channel>, vlsn_start: u64) -> Self {
Self { channel, vlsn_start, known_replica_vlsn: Mutex::new(0) }
}
pub fn known_replica_vlsn(&self) -> u64 {
*self.known_replica_vlsn.lock()
}
pub fn run(&self, log_scanner: &mut dyn LogScanner) -> Result<()> {
let mut next_vlsn = self.vlsn_start;
let poll_interval = Duration::from_millis(5);
let ack_timeout = Duration::from_millis(1);
loop {
while let Some((vlsn, entry_type, payload)) =
log_scanner.next_entry(next_vlsn)
{
self.send_entry(vlsn, entry_type, &payload)?;
next_vlsn = vlsn + 1;
}
match self.channel.receive(ack_timeout) {
Ok(Some(ack_bytes)) => {
if ack_bytes.len() >= 8 {
let vlsn = u64::from_le_bytes(
ack_bytes[..8].try_into().unwrap(),
);
let mut guard = self.known_replica_vlsn.lock();
if vlsn > *guard {
*guard = vlsn;
}
}
continue;
}
Ok(None) => {
std::thread::sleep(poll_interval);
continue;
}
Err(RepError::ChannelClosed(_)) => {
return Ok(());
}
Err(e) => return Err(e),
}
}
}
fn send_entry(
&self,
vlsn: u64,
entry_type: u8,
payload: &[u8],
) -> Result<()> {
let crc = crc32fast::hash(payload);
let mut frame = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
frame.extend_from_slice(&vlsn.to_le_bytes());
frame.push(entry_type);
frame.extend_from_slice(&(payload.len() as u32).to_le_bytes());
frame.extend_from_slice(&crc.to_le_bytes());
frame.extend_from_slice(payload);
self.channel.send(&frame)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FeederState {
Idle,
Handshaking,
Streaming,
Shutdown,
}
pub struct Feeder {
replica_name: String,
state: Mutex<FeederState>,
current_vlsn: Mutex<u64>,
acked_vlsn: Mutex<u64>,
last_activity: Mutex<Instant>,
output_queue: Mutex<Vec<Vec<u8>>>,
}
impl Feeder {
pub fn new(replica_name: String) -> Self {
Feeder {
replica_name,
state: Mutex::new(FeederState::Idle),
current_vlsn: Mutex::new(0),
acked_vlsn: Mutex::new(0),
last_activity: Mutex::new(Instant::now()),
output_queue: Mutex::new(Vec::new()),
}
}
pub fn get_replica_name(&self) -> String {
self.replica_name.clone()
}
pub fn get_state(&self) -> FeederState {
*self.state.lock()
}
pub fn set_state(&self, state: FeederState) {
*self.state.lock() = state;
}
pub fn get_current_vlsn(&self) -> u64 {
*self.current_vlsn.lock()
}
pub fn get_acked_vlsn(&self) -> u64 {
*self.acked_vlsn.lock()
}
pub fn queue_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
let mut msg = Vec::with_capacity(9 + data.len());
msg.extend_from_slice(&vlsn.to_le_bytes());
msg.push(entry_type);
msg.extend_from_slice(&data);
self.output_queue.lock().push(msg);
let mut current = self.current_vlsn.lock();
if vlsn >= *current {
*current = vlsn + 1;
}
*self.last_activity.lock() = Instant::now();
}
pub fn record_ack(&self, vlsn: u64) {
let mut acked = self.acked_vlsn.lock();
if vlsn > *acked {
*acked = vlsn;
}
*self.last_activity.lock() = Instant::now();
}
pub fn get_lag(&self) -> u64 {
let current = *self.current_vlsn.lock();
let acked = *self.acked_vlsn.lock();
current.saturating_sub(acked)
}
pub fn drain_queue(&self) -> Vec<Vec<u8>> {
let mut queue = self.output_queue.lock();
std::mem::take(&mut *queue)
}
pub fn is_timed_out(&self, timeout: Duration) -> bool {
self.last_activity.lock().elapsed() > timeout
}
pub fn touch(&self) {
*self.last_activity.lock() = Instant::now();
}
}
impl std::fmt::Debug for Feeder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Feeder")
.field("replica_name", &self.replica_name)
.field("state", &self.get_state())
.field("current_vlsn", &self.get_current_vlsn())
.field("acked_vlsn", &self.get_acked_vlsn())
.field("lag", &self.get_lag())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::net::channel::LocalChannelPair;
use std::collections::VecDeque;
struct VecLogScanner {
entries: VecDeque<(u64, u8, Vec<u8>)>,
}
impl VecLogScanner {
fn new(entries: Vec<(u64, u8, Vec<u8>)>) -> Self {
Self { entries: entries.into_iter().collect() }
}
}
impl LogScanner for VecLogScanner {
fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
if let Some(&(vlsn, _, _)) = self.entries.front()
&& vlsn >= from_vlsn
{
return self.entries.pop_front();
}
None
}
}
#[test]
fn test_feeder_runner_sends_entries_via_local_channel() {
let entries = vec![
(1u64, 10u8, vec![0xAA]),
(2u64, 20u8, vec![0xBB, 0xCC]),
(3u64, 30u8, vec![]),
];
let pair = LocalChannelPair::new();
let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
let recv_handle = {
let receiver = Arc::clone(&receiver);
std::thread::spawn(move || {
let mut received: Vec<(u64, u8, Vec<u8>)> = Vec::new();
let timeout = Duration::from_secs(5);
for _ in 0..3 {
let frame = receiver.receive(timeout).unwrap().unwrap();
let vlsn =
u64::from_le_bytes(frame[0..8].try_into().unwrap());
let entry_type = frame[8];
let payload_len =
u32::from_le_bytes(frame[9..13].try_into().unwrap())
as usize;
let expected_crc =
u32::from_le_bytes(frame[13..17].try_into().unwrap());
let payload = frame[17..17 + payload_len].to_vec();
let actual_crc = crc32fast::hash(&payload);
assert_eq!(
actual_crc, expected_crc,
"CRC mismatch for vlsn={vlsn}"
);
received.push((vlsn, entry_type, payload));
let mut ack = Vec::with_capacity(8);
ack.extend_from_slice(&vlsn.to_le_bytes());
receiver.send(&ack).unwrap();
}
received
})
};
let mut scanner = VecLogScanner::new(entries);
let runner = FeederRunner::new(Arc::clone(&sender), 1);
let runner_arc = Arc::new(runner);
let runner_ref = Arc::clone(&runner_arc);
let sender_ref = Arc::clone(&sender);
let run_handle =
std::thread::spawn(move || runner_ref.run(&mut scanner));
let received = recv_handle.join().unwrap();
assert_eq!(received.len(), 3);
assert_eq!(received[0], (1, 10, vec![0xAA]));
assert_eq!(received[1], (2, 20, vec![0xBB, 0xCC]));
assert_eq!(received[2], (3, 30, vec![]));
let deadline = Instant::now() + Duration::from_millis(100);
while runner_arc.known_replica_vlsn() < 3 && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(2));
}
assert!(
runner_arc.known_replica_vlsn() == 3,
"FeederRunner did not drain all 3 acks within 100 ms; \
known_replica_vlsn() == {}, expected 3 (the receiver thread \
sent 3 acks before exiting; the runner reads them one at a \
time with a 1 ms timeout + 5 ms sleep cycle)",
runner_arc.known_replica_vlsn()
);
sender_ref.close().unwrap();
run_handle.join().unwrap().unwrap();
}
#[test]
fn test_feeder_runner_empty_scanner_returns_on_close() {
let pair = LocalChannelPair::new();
let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
let runner = FeederRunner::new(Arc::clone(&sender), 1);
let sender_clone = Arc::clone(&sender);
let close_handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
receiver.close().unwrap();
sender_clone.close().unwrap();
});
let mut scanner = VecLogScanner::new(vec![]);
let result = runner.run(&mut scanner);
assert!(
result.is_ok(),
"expected Ok on channel close, got {:?}",
result
);
close_handle.join().unwrap();
}
#[test]
fn test_new_feeder() {
let feeder = Feeder::new("replica1".to_string());
assert_eq!(feeder.get_replica_name(), "replica1");
assert_eq!(feeder.get_state(), FeederState::Idle);
assert_eq!(feeder.get_current_vlsn(), 0);
assert_eq!(feeder.get_acked_vlsn(), 0);
assert_eq!(feeder.get_lag(), 0);
}
#[test]
fn test_state_transitions() {
let feeder = Feeder::new("r1".to_string());
assert_eq!(feeder.get_state(), FeederState::Idle);
feeder.set_state(FeederState::Handshaking);
assert_eq!(feeder.get_state(), FeederState::Handshaking);
feeder.set_state(FeederState::Streaming);
assert_eq!(feeder.get_state(), FeederState::Streaming);
feeder.set_state(FeederState::Shutdown);
assert_eq!(feeder.get_state(), FeederState::Shutdown);
}
#[test]
fn test_queue_and_drain() {
let feeder = Feeder::new("r1".to_string());
feeder.queue_entry(1, 10, vec![0xAA, 0xBB]);
feeder.queue_entry(2, 20, vec![0xCC]);
feeder.queue_entry(3, 30, vec![]);
let messages = feeder.drain_queue();
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].len(), 8 + 1 + 2);
assert_eq!(messages[1].len(), 8 + 1 + 1);
assert_eq!(messages[2].len(), (8 + 1));
let vlsn_bytes: [u8; 8] = messages[0][0..8].try_into().unwrap();
assert_eq!(u64::from_le_bytes(vlsn_bytes), 1);
assert_eq!(messages[0][8], 10);
let messages2 = feeder.drain_queue();
assert!(messages2.is_empty());
}
#[test]
fn test_current_vlsn_advances() {
let feeder = Feeder::new("r1".to_string());
feeder.queue_entry(5, 1, vec![]);
assert_eq!(feeder.get_current_vlsn(), 6);
feeder.queue_entry(10, 1, vec![]);
assert_eq!(feeder.get_current_vlsn(), 11);
feeder.queue_entry(3, 1, vec![]);
assert_eq!(feeder.get_current_vlsn(), 11);
}
#[test]
fn test_ack_recording() {
let feeder = Feeder::new("r1".to_string());
feeder.queue_entry(1, 1, vec![]);
feeder.queue_entry(2, 1, vec![]);
feeder.queue_entry(3, 1, vec![]);
feeder.record_ack(1);
assert_eq!(feeder.get_acked_vlsn(), 1);
feeder.record_ack(3);
assert_eq!(feeder.get_acked_vlsn(), 3);
feeder.record_ack(2);
assert_eq!(feeder.get_acked_vlsn(), 3);
}
#[test]
fn test_lag_calculation() {
let feeder = Feeder::new("r1".to_string());
assert_eq!(feeder.get_lag(), 0);
feeder.queue_entry(1, 1, vec![]);
feeder.queue_entry(2, 1, vec![]);
feeder.queue_entry(3, 1, vec![]);
assert_eq!(feeder.get_lag(), 4);
feeder.record_ack(2);
assert_eq!(feeder.get_lag(), 2);
feeder.record_ack(4);
assert_eq!(feeder.get_lag(), 0);
}
#[test]
fn test_timeout() {
let feeder = Feeder::new("r1".to_string());
assert!(!feeder.is_timed_out(Duration::from_secs(60)));
assert!(feeder.is_timed_out(Duration::from_nanos(0)));
}
#[test]
fn test_touch_resets_activity() {
let feeder = Feeder::new("r1".to_string());
std::thread::sleep(Duration::from_millis(5));
feeder.touch();
assert!(!feeder.is_timed_out(Duration::from_secs(1)));
}
#[test]
fn test_debug_format() {
let feeder = Feeder::new("replica_debug".to_string());
feeder.set_state(FeederState::Streaming);
let debug = format!("{:?}", feeder);
assert!(debug.contains("replica_debug"));
assert!(debug.contains("Streaming"));
}
fn make_runner_with_pair(
vlsn_start: u64,
) -> (Arc<dyn Channel>, Arc<dyn Channel>, FeederRunner) {
let pair = LocalChannelPair::new();
let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
let runner = FeederRunner::new(Arc::clone(&sender), vlsn_start);
(sender, receiver, runner)
}
#[test]
fn test_feeder_runner_short_ack_is_ignored_then_close() {
let (sender, receiver, runner) = make_runner_with_pair(1);
let receiver_clone = Arc::clone(&receiver);
let close_handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(20));
receiver_clone.send(&[0xAA, 0xBB, 0xCC]).unwrap();
std::thread::sleep(Duration::from_millis(40));
sender.close().unwrap();
receiver_clone.close().unwrap();
});
let mut scanner = VecLogScanner::new(vec![]);
let r = runner.run(&mut scanner);
assert!(r.is_ok(), "short-ack path should not error: {:?}", r);
assert_eq!(
runner.known_replica_vlsn(),
0,
"short ack must NOT advance known_replica_vlsn"
);
close_handle.join().unwrap();
}
#[test]
fn test_feeder_runner_ack_advances_known_replica_vlsn() {
let (sender, receiver, runner) = make_runner_with_pair(1);
let receiver_clone = Arc::clone(&receiver);
let close_handle = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(20));
receiver_clone.send(&42u64.to_le_bytes()).unwrap();
std::thread::sleep(Duration::from_millis(20));
receiver_clone.send(&10u64.to_le_bytes()).unwrap();
std::thread::sleep(Duration::from_millis(40));
sender.close().unwrap();
receiver_clone.close().unwrap();
});
let mut scanner = VecLogScanner::new(vec![]);
let r = runner.run(&mut scanner);
assert!(r.is_ok(), "ack path should not error: {:?}", r);
assert_eq!(
runner.known_replica_vlsn(),
42,
"ack must advance to highest, never regress"
);
close_handle.join().unwrap();
}
#[test]
fn test_feeder_runner_restart_resumes_from_provided_vlsn() {
let entries: [(u64, u8, Vec<u8>); 5] = [
(1u64, 0u8, b"e1".to_vec()),
(2, 0, b"e2".to_vec()),
(3, 0, b"e3".to_vec()),
(4, 0, b"e4".to_vec()),
(5, 0, b"e5".to_vec()),
];
let (sender_a, receiver_a, runner_a) = make_runner_with_pair(1);
let close_a = {
let s = Arc::clone(&sender_a);
let r = Arc::clone(&receiver_a);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(60));
s.close().unwrap();
r.close().unwrap();
})
};
let received_a = {
let r = Arc::clone(&receiver_a);
std::thread::spawn(move || {
let mut got = Vec::new();
while let Ok(Some(frame)) =
r.receive(Duration::from_millis(100))
{
got.push(frame);
}
got
})
};
let mut scanner_a = VecLogScanner::new(entries[0..3].to_vec());
runner_a.run(&mut scanner_a).unwrap();
close_a.join().unwrap();
let frames_a = received_a.join().unwrap();
assert_eq!(
frames_a.len(),
3,
"first runner must send 3 entries, got {}",
frames_a.len()
);
let (sender_b, receiver_b, runner_b) = make_runner_with_pair(4);
let close_b = {
let s = Arc::clone(&sender_b);
let r = Arc::clone(&receiver_b);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(60));
s.close().unwrap();
r.close().unwrap();
})
};
let received_b = {
let r = Arc::clone(&receiver_b);
std::thread::spawn(move || {
let mut got = Vec::new();
while let Ok(Some(frame)) =
r.receive(Duration::from_millis(100))
{
got.push(frame);
}
got
})
};
let mut scanner_b = VecLogScanner::new(entries[3..].to_vec());
runner_b.run(&mut scanner_b).unwrap();
close_b.join().unwrap();
let frames_b = received_b.join().unwrap();
assert_eq!(
frames_b.len(),
2,
"second runner must send 2 entries (4 and 5), got {}",
frames_b.len()
);
}
#[test]
fn test_feeder_runner_known_replica_vlsn_initial_zero() {
let (_sender, _receiver, runner) = make_runner_with_pair(1);
assert_eq!(runner.known_replica_vlsn(), 0);
}
#[test]
fn test_environment_log_scanner_new_with_empty_env() {
let dir = tempfile::tempdir().expect("tempdir");
let env = EnvironmentImpl::new(dir.path(), false, true)
.expect("EnvironmentImpl::new");
let scanner = EnvironmentLogScanner::new(&env, None);
assert!(scanner.is_some(), "scanner construction should succeed");
let mut scanner = scanner.unwrap();
let r = scanner.next_entry(0);
assert!(
r.is_none(),
"next_entry on empty log must return None, got {:?}",
r
);
}
#[test]
fn test_environment_log_scanner_with_explicit_null_lsn() {
let dir = tempfile::tempdir().expect("tempdir");
let env = EnvironmentImpl::new(dir.path(), false, true)
.expect("EnvironmentImpl::new");
let scanner = EnvironmentLogScanner::new(&env, Some(NULL_LSN));
assert!(scanner.is_some());
}
#[test]
fn test_environment_log_scanner_with_explicit_start_lsn() {
let dir = tempfile::tempdir().expect("tempdir");
let env = EnvironmentImpl::new(dir.path(), false, true)
.expect("EnvironmentImpl::new");
let lsn = Lsn::new(5, 128);
let scanner = EnvironmentLogScanner::new(&env, Some(lsn));
assert!(scanner.is_some());
let mut scanner = scanner.unwrap();
assert!(scanner.next_entry(0).is_none());
}
}