use std::collections::VecDeque;
use std::sync::Arc;
use noxu_sync::Mutex;
use crate::error::{RepError, Result};
use crate::net::channel::Channel;
use crate::net::service_dispatcher::ServiceHandler;
use crate::stream::feeder::{EnvironmentLogScanner, FeederRunner, LogScanner};
pub const PEER_FEEDER_SERVICE_NAME: &str = "PEER_FEEDER";
pub const DEFAULT_PEER_SCANNER_MAX_ENTRIES: usize = 16_384;
pub const DEFAULT_PEER_SCANNER_MAX_BYTES: usize = 64 * 1024 * 1024;
pub struct PeerLogScanner {
queue: Mutex<VecDeque<(u64, u8, Vec<u8>)>>,
first_vlsn: Mutex<u64>,
last_vlsn: Mutex<u64>,
max_entries: usize,
max_bytes: usize,
current_bytes: Mutex<usize>,
evicted_count: std::sync::atomic::AtomicU64,
}
impl PeerLogScanner {
pub fn new() -> Self {
Self::with_capacity(
DEFAULT_PEER_SCANNER_MAX_ENTRIES,
DEFAULT_PEER_SCANNER_MAX_BYTES,
)
}
pub fn with_capacity(max_entries: usize, max_bytes: usize) -> Self {
Self {
queue: Mutex::new(VecDeque::new()),
first_vlsn: Mutex::new(0),
last_vlsn: Mutex::new(0),
max_entries: max_entries.max(1),
max_bytes: max_bytes.max(1),
current_bytes: Mutex::new(0),
evicted_count: std::sync::atomic::AtomicU64::new(0),
}
}
pub fn push(&self, vlsn: u64, entry_type: u8, payload: Vec<u8>) {
let payload_len = payload.len();
{
let mut last = self.last_vlsn.lock();
if vlsn > *last {
*last = vlsn;
}
}
let mut q = self.queue.lock();
let mut current_bytes = self.current_bytes.lock();
q.push_back((vlsn, entry_type, payload));
*current_bytes += payload_len;
let mut evicted = 0u64;
while q.len() > self.max_entries || *current_bytes > self.max_bytes {
if let Some((_evicted_vlsn, _ty, evicted_payload)) = q.pop_front() {
*current_bytes =
current_bytes.saturating_sub(evicted_payload.len());
evicted += 1;
} else {
break;
}
}
if evicted > 0 {
self.evicted_count
.fetch_add(evicted, std::sync::atomic::Ordering::Relaxed);
}
let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
drop(current_bytes);
drop(q);
*self.first_vlsn.lock() = new_first;
}
pub fn evicted_count(&self) -> u64 {
self.evicted_count.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn current_bytes(&self) -> usize {
*self.current_bytes.lock()
}
pub fn log_range(&self) -> Option<(u64, u64)> {
let first = *self.first_vlsn.lock();
let last = *self.last_vlsn.lock();
if first == 0 { None } else { Some((first, last)) }
}
pub fn len(&self) -> usize {
self.queue.lock().len()
}
pub fn is_empty(&self) -> bool {
self.queue.lock().is_empty()
}
}
impl Default for PeerLogScanner {
fn default() -> Self {
Self::new()
}
}
impl LogScanner for PeerLogScanner {
fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
let mut q = self.queue.lock();
let mut current_bytes = self.current_bytes.lock();
while let Some(&(vlsn, _, _)) = q.front() {
if vlsn >= from_vlsn {
let entry = q.pop_front();
if let Some((_, _, ref payload)) = entry {
*current_bytes =
current_bytes.saturating_sub(payload.len());
}
let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
drop(current_bytes);
drop(q);
*self.first_vlsn.lock() = new_first;
return entry;
}
if let Some((_, _, evicted_payload)) = q.pop_front() {
*current_bytes =
current_bytes.saturating_sub(evicted_payload.len());
}
}
let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
drop(current_bytes);
drop(q);
*self.first_vlsn.lock() = new_first;
None
}
}
pub struct PeerFeederSource(pub Arc<PeerLogScanner>);
impl PeerFeederSource {
pub fn new() -> Self {
Self(Arc::new(PeerLogScanner::new()))
}
pub fn clone_scanner(&self) -> Arc<PeerLogScanner> {
Arc::clone(&self.0)
}
}
impl Default for PeerFeederSource {
fn default() -> Self {
Self::new()
}
}
pub struct PeerScannerAdapter {
source: Arc<PeerLogScanner>,
cursor_vlsn: u64,
}
impl PeerScannerAdapter {
pub fn new(source: Arc<PeerLogScanner>, start_vlsn: u64) -> Self {
Self { source, cursor_vlsn: start_vlsn }
}
}
impl LogScanner for PeerScannerAdapter {
fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
let effective_from = self.cursor_vlsn.max(from_vlsn);
let entry = {
let mut q = self.source.queue.lock();
let mut result = None;
while let Some(&(vlsn, _, _)) = q.front() {
if vlsn >= effective_from {
result = q.pop_front();
break;
}
q.pop_front(); }
result
};
if let Some((vlsn, _, _)) = &entry {
self.cursor_vlsn = vlsn + 1;
}
entry
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncupResult {
CanServe { start_vlsn: u64 },
NeedsRestore,
}
pub fn negotiate_syncup(
peer_range: Option<(u64, u64)>,
replica_needs: u64,
) -> SyncupResult {
match peer_range {
Some((first, last))
if first <= replica_needs && replica_needs <= last =>
{
SyncupResult::CanServe { start_vlsn: replica_needs }
}
_ => SyncupResult::NeedsRestore,
}
}
pub struct PeerFeederService {
source: Arc<PeerLogScanner>,
wal_source: Option<WalFeederSource>,
wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
}
#[derive(Clone)]
pub struct WalFeederSource {
env: Arc<noxu_dbi::EnvironmentImpl>,
vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
}
impl WalFeederSource {
pub fn new(
env: Arc<noxu_dbi::EnvironmentImpl>,
vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
) -> Self {
Self { env, vlsn_index }
}
}
impl PeerFeederService {
pub fn new(source: Arc<PeerLogScanner>) -> Self {
Self {
source,
wal_source: None,
wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn with_wal_source(
source: Arc<PeerLogScanner>,
wal_source: WalFeederSource,
) -> Self {
Self {
source,
wal_source: Some(wal_source),
wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn with_wal_source_counted(
source: Arc<PeerLogScanner>,
wal_source: WalFeederSource,
wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
) -> Self {
Self { source, wal_source: Some(wal_source), wal_feeds_served }
}
pub fn wal_feeds_served(&self) -> u64 {
self.wal_feeds_served.load(std::sync::atomic::Ordering::SeqCst)
}
}
const PEER_FEEDER_CAN_SERVE: u8 = 0;
const PEER_FEEDER_NEEDS_RESTORE: u8 = 1;
impl ServiceHandler for PeerFeederService {
fn service_name(&self) -> &str {
PEER_FEEDER_SERVICE_NAME
}
fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
use std::time::Duration;
let msg =
channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
RepError::NetworkError(
"PEER_FEEDER: no start_vlsn received".into(),
)
})?;
if msg.len() < 8 {
return Err(RepError::NetworkError(format!(
"PEER_FEEDER: short handshake ({} bytes)",
msg.len()
)));
}
let start_vlsn =
u64::from_le_bytes(msg[..8].try_into().expect("slice of 8 bytes"));
if let Some(wal) = &self.wal_source {
let range = wal.vlsn_index.get_range();
let (first, last) = (range.first(), range.last());
let have_data = last > 0 && first > 0;
let effective_start =
if start_vlsn == 0 { first } else { start_vlsn };
let can_serve = have_data
&& effective_start >= first
&& effective_start <= last;
if can_serve {
channel.send(&[PEER_FEEDER_CAN_SERVE])?;
let channel_arc: Arc<dyn Channel> = Arc::from(channel);
let mut scanner =
match EnvironmentLogScanner::new(&wal.env, None) {
Some(s) => s,
None => {
return Err(RepError::NetworkError(
"PEER_FEEDER: WAL scanner unavailable".into(),
));
}
};
self.wal_feeds_served
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let runner = FeederRunner::new(channel_arc, effective_start);
let _ = runner.run(&mut scanner);
return Ok(());
}
}
let range = self.source.log_range();
match negotiate_syncup(range, start_vlsn) {
SyncupResult::CanServe { start_vlsn: sv } => {
channel.send(&[PEER_FEEDER_CAN_SERVE])?;
let channel_arc: Arc<dyn Channel> = Arc::from(channel);
let mut source =
PeerScannerAdapter::new(Arc::clone(&self.source), sv);
let runner = FeederRunner::new(channel_arc, sv);
let _ = runner.run(&mut source);
Ok(())
}
SyncupResult::NeedsRestore => {
channel.send(&[PEER_FEEDER_NEEDS_RESTORE])?;
Err(RepError::NetworkError(format!(
"PEER_FEEDER: cannot serve vlsn={start_vlsn}, \
range={range:?}"
)))
}
}
}
}
pub fn catch_up_from_peer(
peer_addr: std::net::SocketAddr,
start_vlsn: u64,
log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
) -> Result<bool> {
use crate::net::service_dispatcher::connect_to_service;
use crate::stream::replica_stream::ReplicaReceiver;
use std::sync::Arc;
use std::time::Duration;
let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
channel.send(&start_vlsn.to_le_bytes())?;
let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
RepError::NetworkError("no response from peer feeder".into())
})?;
if resp.is_empty() {
return Err(RepError::NetworkError(
"empty response from peer feeder".into(),
));
}
match resp[0] {
PEER_FEEDER_CAN_SERVE => {}
PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
other => {
return Err(RepError::ProtocolError(format!(
"peer feeder unknown response byte: {other:#x}"
)));
}
}
let channel_arc: Arc<dyn Channel> = Arc::from(channel);
let receiver = ReplicaReceiver::new(channel_arc);
receiver.run(log_writer)?;
Ok(true)
}
pub fn catch_up_from_peer_until(
peer_addr: std::net::SocketAddr,
start_vlsn: u64,
log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
shutdown: &std::sync::atomic::AtomicBool,
) -> Result<bool> {
use crate::net::service_dispatcher::connect_to_service;
use crate::stream::replica_stream::ReplicaReceiver;
use std::sync::Arc;
use std::time::Duration;
let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
channel.send(&start_vlsn.to_le_bytes())?;
let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
RepError::NetworkError("no response from peer feeder".into())
})?;
if resp.is_empty() {
return Err(RepError::NetworkError(
"empty response from peer feeder".into(),
));
}
match resp[0] {
PEER_FEEDER_CAN_SERVE => {}
PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
other => {
return Err(RepError::ProtocolError(format!(
"peer feeder unknown response byte: {other:#x}"
)));
}
}
let channel_arc: Arc<dyn Channel> = Arc::from(channel);
let receiver = ReplicaReceiver::new(channel_arc);
receiver.run_until(log_writer, Some(shutdown))?;
Ok(true)
}
pub struct MultiPeerCatchUp {
peers: Vec<(String, std::net::SocketAddr)>,
start_vlsn: u64,
}
impl MultiPeerCatchUp {
pub fn new(
peers: Vec<(String, std::net::SocketAddr)>,
start_vlsn: u64,
) -> Self {
Self { peers, start_vlsn }
}
pub fn run<F, W>(self, make_writer: F) -> Option<String>
where
F: Fn() -> W + Send + Sync + 'static,
W: crate::stream::replica_stream::LogWriter + Send + 'static,
{
use std::sync::atomic::{AtomicBool, Ordering};
let make_writer = std::sync::Arc::new(make_writer);
let done = std::sync::Arc::new(AtomicBool::new(false));
let winner: std::sync::Arc<noxu_sync::Mutex<Option<String>>> =
std::sync::Arc::new(noxu_sync::Mutex::new(None));
let mut handles = Vec::new();
for (name, addr) in self.peers {
let make_writer = std::sync::Arc::clone(&make_writer);
let done = std::sync::Arc::clone(&done);
let winner = std::sync::Arc::clone(&winner);
let start_vlsn = self.start_vlsn;
let name_clone = name.clone();
let handle = std::thread::Builder::new()
.name(format!("noxu-peer-catchup-{}", name))
.spawn(move || {
if done.load(Ordering::Acquire) {
return; }
let mut writer = make_writer();
match catch_up_from_peer(addr, start_vlsn, &mut writer) {
Ok(true) => {
if !done.swap(true, Ordering::AcqRel) {
*winner.lock() = Some(name_clone);
}
}
Ok(false) => {
log::debug!(
"peer '{}' cannot serve vlsn={start_vlsn}",
name
);
}
Err(e) => {
log::warn!(
"catch-up from peer '{}' failed: {e}",
name
);
}
}
})
.expect("failed to spawn peer catch-up thread");
handles.push(handle);
}
for h in handles {
let _ = h.join();
}
winner.lock().clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::net::channel::LocalChannelPair;
use std::time::Duration;
#[test]
fn test_peer_scanner_push_and_log_range() {
let scanner = PeerLogScanner::new();
assert!(scanner.is_empty());
assert!(scanner.log_range().is_none());
scanner.push(5, 1, b"entry5".to_vec());
scanner.push(6, 1, b"entry6".to_vec());
scanner.push(10, 1, b"entry10".to_vec());
assert_eq!(scanner.len(), 3);
assert_eq!(scanner.log_range(), Some((5, 10)));
}
#[test]
fn test_peer_scanner_next_entry_in_order() {
let mut scanner = PeerLogScanner::new();
for vlsn in [3u64, 4, 5, 6, 7] {
scanner.push(vlsn, 1, vlsn.to_le_bytes().to_vec());
}
let mut results = Vec::new();
while let Some((vlsn, _, _)) = scanner.next_entry(4) {
results.push(vlsn);
}
assert_eq!(results, vec![4, 5, 6, 7]);
}
#[test]
fn test_peer_scanner_skips_stale_entries() {
let mut scanner = PeerLogScanner::new();
for v in [1u64, 2, 3, 10, 11] {
scanner.push(v, 1, vec![v as u8]);
}
let entry = scanner.next_entry(10);
assert_eq!(entry.map(|(v, _, _)| v), Some(10));
}
#[test]
fn test_peer_scanner_empty_returns_none() {
let mut scanner = PeerLogScanner::new();
assert!(scanner.next_entry(1).is_none());
}
#[test]
fn test_peer_scanner_adapter_cursor_advances() {
let source = Arc::new(PeerLogScanner::new());
for v in [1u64, 2, 3, 4, 5] {
source.push(v, 1, vec![v as u8]);
}
let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 1);
let mut seen = Vec::new();
while let Some((v, _, _)) = adapter.next_entry(1) {
seen.push(v);
}
assert_eq!(seen, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_in_memory_source_streams_to_replica_via_feeder_runner() {
let source = Arc::new(PeerLogScanner::new());
for v in [10u64, 11, 12, 13, 14] {
source.push(v, 1, format!("payload-{v}").into_bytes());
}
let pair = LocalChannelPair::new();
let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
let recv_handle = {
let receiver = Arc::clone(&receiver);
std::thread::spawn(move || {
let mut vlsns = Vec::new();
for _ in 0..5 {
let frame = receiver
.receive(Duration::from_secs(5))
.unwrap()
.unwrap();
let vlsn =
u64::from_le_bytes(frame[0..8].try_into().unwrap());
vlsns.push(vlsn);
let _ = receiver.send(&vlsn.to_le_bytes());
}
vlsns
})
};
let sender_clone = Arc::clone(&sender);
let run_handle = std::thread::spawn(move || {
let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 10);
let runner = FeederRunner::new(sender, 10);
let _ = runner.run(&mut adapter);
});
let vlsns = recv_handle.join().unwrap();
assert_eq!(vlsns, vec![10, 11, 12, 13, 14]);
sender_clone.close().unwrap();
let _ = run_handle.join();
}
#[test]
fn test_negotiate_syncup_can_serve() {
assert_eq!(
negotiate_syncup(Some((5, 20)), 10),
SyncupResult::CanServe { start_vlsn: 10 }
);
assert_eq!(
negotiate_syncup(Some((10, 10)), 10),
SyncupResult::CanServe { start_vlsn: 10 }
);
}
#[test]
fn test_negotiate_syncup_needs_restore_too_early() {
assert_eq!(
negotiate_syncup(Some((5, 20)), 3),
SyncupResult::NeedsRestore
);
}
#[test]
fn test_negotiate_syncup_needs_restore_too_late() {
assert_eq!(
negotiate_syncup(Some((5, 20)), 25),
SyncupResult::NeedsRestore
);
}
#[test]
fn test_negotiate_syncup_no_range() {
assert_eq!(negotiate_syncup(None, 10), SyncupResult::NeedsRestore);
}
#[test]
fn test_group_service_cbvlsn_tracks_minimum() {
use crate::group_service::{GroupService, NodeInfo};
use crate::node_type::NodeType;
use std::time::Instant;
let gs = GroupService::new("test_group".to_string());
for (name, port) in [("n1", 5001u16), ("n2", 5002), ("n3", 5003)] {
gs.add_node(NodeInfo {
name: name.to_string(),
node_type: NodeType::Electable,
host: "localhost".to_string(),
port,
node_id: port as u32,
joined_at: Instant::now(),
last_seen: Instant::now(),
is_active: true,
known_vlsn: 0,
log_range: None,
read_capacity_pct: 100,
write_capacity_pct: 100,
latency_hint_ms: 1,
})
.unwrap();
}
assert_eq!(gs.get_cbvlsn(), 0);
gs.update_node_vlsn("n1", 50);
gs.update_node_vlsn("n2", 40);
assert_eq!(gs.get_cbvlsn(), 0, "n3 still at 0, CBVLSN must be 0");
gs.update_node_vlsn("n3", 30);
assert_eq!(gs.get_cbvlsn(), 30);
gs.update_node_vlsn("n3", 45);
assert_eq!(gs.get_cbvlsn(), 40);
}
#[test]
fn test_group_service_cbvlsn_monotone_nondecreasing() {
use crate::group_service::{GroupService, NodeInfo};
use crate::node_type::NodeType;
use std::time::Instant;
let gs = GroupService::new("cbvlsn_monotone".to_string());
for (name, port) in [("a", 5001u16), ("b", 5002)] {
gs.add_node(NodeInfo {
name: name.to_string(),
node_type: NodeType::Electable,
host: "localhost".to_string(),
port,
node_id: port as u32,
joined_at: Instant::now(),
last_seen: Instant::now(),
is_active: true,
known_vlsn: 0,
log_range: None,
read_capacity_pct: 100,
write_capacity_pct: 100,
latency_hint_ms: 1,
})
.unwrap();
}
let mut prev = 0u64;
for (na, va, nb, vb) in [
("a", 10u64, "b", 5u64),
("a", 20, "b", 15),
("a", 25, "b", 22),
("a", 30, "b", 28),
] {
gs.update_node_vlsn(na, va);
gs.update_node_vlsn(nb, vb);
let cbvlsn = gs.get_cbvlsn();
assert!(
cbvlsn >= prev,
"CBVLSN must not decrease: was {prev}, now {cbvlsn}"
);
prev = cbvlsn;
}
}
#[test]
fn test_group_service_find_peers_with_vlsn() {
use crate::group_service::{GroupService, NodeInfo};
use crate::node_type::NodeType;
use std::time::Instant;
let gs = GroupService::new("peer_select".to_string());
for (name, port, range) in [
("a", 5001u16, Some((1u64, 100u64))),
("b", 5002, Some((50, 200))),
("c", 5003, None),
] {
let mut info = NodeInfo {
name: name.to_string(),
node_type: NodeType::Electable,
host: "localhost".to_string(),
port,
node_id: port as u32,
joined_at: Instant::now(),
last_seen: Instant::now(),
is_active: true,
known_vlsn: 0,
log_range: range,
read_capacity_pct: 100,
write_capacity_pct: 100,
latency_hint_ms: 1,
};
info.last_seen = Instant::now()
- std::time::Duration::from_millis(port as u64 * 10);
gs.add_node(info).unwrap();
}
let peers = gs.find_peers_with_vlsn(75);
assert!(peers.contains(&"a".to_string()));
assert!(peers.contains(&"b".to_string()));
assert!(!peers.contains(&"c".to_string()));
let peers = gs.find_peers_with_vlsn(150);
assert_eq!(peers, vec!["b".to_string()]);
assert!(gs.find_peers_with_vlsn(201).is_empty());
}
#[test]
fn test_group_service_update_log_range() {
use crate::group_service::{GroupService, NodeInfo};
use crate::node_type::NodeType;
use std::time::Instant;
let gs = GroupService::new("log_range_test".to_string());
gs.add_node(NodeInfo {
name: "r1".to_string(),
node_type: NodeType::Electable,
host: "localhost".to_string(),
port: 5001,
node_id: 1,
joined_at: Instant::now(),
last_seen: Instant::now(),
is_active: true,
known_vlsn: 0,
log_range: None,
read_capacity_pct: 100,
write_capacity_pct: 100,
latency_hint_ms: 1,
})
.unwrap();
assert!(gs.get_node("r1").unwrap().log_range.is_none());
gs.update_node_log_range("r1", 100, 500);
assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 500)));
gs.update_node_log_range("r1", 100, 800);
assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 800)));
}
#[test]
fn test_peer_feeder_service_can_serve() {
use crate::net::channel::LocalChannelPair;
let source = Arc::new(PeerLogScanner::new());
for v in 10u64..=20 {
source.push(v, 0, format!("e{}", v).into_bytes());
}
let svc = PeerFeederService::new(Arc::clone(&source));
let pair = LocalChannelPair::new();
let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
let client_ch = pair.channel_b;
client_ch.send(&15u64.to_le_bytes()).unwrap();
let svc_handle = std::thread::spawn(move || svc.handle(server_ch));
let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
assert_eq!(
resp,
vec![PEER_FEEDER_CAN_SERVE],
"service should reply CAN_SERVE for in-range start_vlsn"
);
let mut frames = 0;
while let Ok(Some(_)) = client_ch.receive(Duration::from_millis(80)) {
frames += 1;
if frames >= 3 {
break;
}
}
client_ch.close().unwrap();
let _ = svc_handle.join().unwrap();
assert!(frames >= 1, "service must have streamed at least one frame");
}
#[test]
fn test_peer_feeder_service_needs_restore() {
use crate::net::channel::LocalChannelPair;
let source = Arc::new(PeerLogScanner::new());
for v in 10u64..=20 {
source.push(v, 0, vec![]);
}
let svc = PeerFeederService::new(Arc::clone(&source));
let pair = LocalChannelPair::new();
let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
let client_ch = pair.channel_b;
client_ch.send(&5u64.to_le_bytes()).unwrap();
let r = svc.handle(server_ch);
assert!(r.is_err(), "service must return Err on NEEDS_RESTORE");
let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
assert_eq!(
resp,
vec![PEER_FEEDER_NEEDS_RESTORE],
"service should reply NEEDS_RESTORE for out-of-range start_vlsn"
);
}
#[test]
fn test_peer_feeder_service_short_handshake_errors() {
use crate::net::channel::LocalChannelPair;
let source = Arc::new(PeerLogScanner::new());
let svc = PeerFeederService::new(Arc::clone(&source));
let pair = LocalChannelPair::new();
let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
let client_ch = pair.channel_b;
client_ch.send(&[0u8; 4]).unwrap();
let r = svc.handle(server_ch);
assert!(r.is_err(), "short handshake must error");
let msg = format!("{}", r.err().unwrap());
assert!(
msg.contains("short handshake"),
"expected 'short handshake' in error, got: {msg}"
);
}
#[test]
fn test_peer_feeder_service_no_handshake_errors() {
use crate::net::channel::LocalChannelPair;
let source = Arc::new(PeerLogScanner::new());
let svc = PeerFeederService::new(Arc::clone(&source));
let pair = LocalChannelPair::new();
let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
drop(pair.channel_b);
let r = svc.handle(server_ch);
assert!(r.is_err(), "no-handshake must error");
}
#[test]
fn test_peer_feeder_service_name() {
let source = Arc::new(PeerLogScanner::new());
let svc = PeerFeederService::new(source);
assert_eq!(
svc.service_name(),
PEER_FEEDER_SERVICE_NAME,
"service_name must match the protocol const"
);
}
#[test]
fn test_peer_feeder_source_default_and_clone_scanner() {
let src1 = PeerFeederSource::new();
let src2 = PeerFeederSource::default();
let s1 = src1.clone_scanner();
let s2 = src2.clone_scanner();
s1.push(1, 0, b"a".to_vec());
assert_eq!(s1.len(), 1);
assert_eq!(s2.len(), 0);
}
#[test]
fn test_peer_log_scanner_default_is_empty() {
let s = PeerLogScanner::default();
assert!(s.is_empty());
assert_eq!(s.len(), 0);
assert!(s.log_range().is_none());
}
#[test]
fn test_feeder_runner_known_replica_vlsn_initial_zero() {
use crate::net::channel::LocalChannelPair;
let pair = LocalChannelPair::new();
let channel: Arc<dyn Channel> = Arc::new(pair.channel_a);
let runner = FeederRunner::new(channel, 1);
assert_eq!(runner.known_replica_vlsn(), 0);
}
#[test]
fn test_peer_scanner_adapter_skips_stale_via_pop_front() {
let source = Arc::new(PeerLogScanner::new());
for v in 1u64..=5 {
source.push(v, 0, vec![]);
}
let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 3);
let r = adapter.next_entry(3);
assert!(r.is_some());
let (vlsn, _, _) = r.unwrap();
assert_eq!(vlsn, 3);
let (vlsn, _, _) = adapter.next_entry(4).unwrap();
assert_eq!(vlsn, 4);
}
}