noxu_rep/stream/peer_feeder.rs
1//! Peer-to-peer log distribution — master-feeder pattern.
2//!
3//! Each replica maintains a log range `[first_vlsn,
4//! last_vlsn]` and can act as a feeder for other replicas that need entries
5//! in that range. This avoids routing all log-shipping traffic through the
6//! master, which would otherwise be the sole bottleneck.
7//!
8//! ## Architecture
9//!
10//! ```text
11//! Master ──► FeederRunner (master feeder) ──► Replica A
12//! ──► Replica B ──► PeerFeederRunner ──► Replica C
13//! ```
14//!
15//! `PeerLogScanner` is a `LogScanner` implementation backed by an
16//! in-memory `VecDeque<(vlsn, entry_type, payload)>`. It is populated by
17//! the local `ReplicaReceiver` as entries arrive from the master.
18//!
19//! `PeerFeederRunner` wraps a `FeederRunner` and a `PeerLogScanner`. The
20//! `GroupService` is queried to find peers that hold the needed VLSN range,
21//! and a `FeederRunner` is spawned to stream entries to the requesting node.
22//!
23//! ## CBVLSN
24//!
25//! The Cleaner Barrier VLSN is the global minimum `known_vlsn` across all
26//! active electable replicas. The log cleaner uses this to decide which
27//! log files it is safe to reclaim. It is maintained by `GroupService`
28//! and updated on every heartbeat / ack.
29//!
30//! Corresponds to `FeederReplicaSyncup`, `LocalCBVLSNUpdater`, and
31//! `RepGroupImpl.getCBVLSN()` in the implementation.
32
33use std::collections::VecDeque;
34use std::sync::Arc;
35
36use noxu_sync::Mutex;
37
38use crate::error::{RepError, Result};
39use crate::net::channel::Channel;
40use crate::net::service_dispatcher::ServiceHandler;
41use crate::stream::feeder::{FeederRunner, LogScanner};
42
43/// Service name registered with `TcpServiceDispatcher` for peer log feeds.
44pub const PEER_FEEDER_SERVICE_NAME: &str = "PEER_FEEDER";
45
46// ---------------------------------------------------------------------------
47// PeerLogScanner
48// ---------------------------------------------------------------------------
49
50/// Default maximum number of entries retained in a `PeerLogScanner`
51/// in-memory queue.
52///
53/// Without a bound, every replicated entry stays in RAM until it is
54/// consumed by a downstream peer. A long-running replica with no
55/// downstream consumers therefore accumulates one VecDeque entry per
56/// replicated record forever (audit finding F10).
57///
58/// 16 384 entries is enough headroom for the slowest expected
59/// downstream peer to drain while keeping resident memory bounded
60/// (assuming sub-MiB entries, this caps the queue at ~16 GiB worst
61/// case; the byte-size cap is the harder bound).
62pub const DEFAULT_PEER_SCANNER_MAX_ENTRIES: usize = 16_384;
63
64/// Default maximum total payload size, in bytes, retained in a
65/// `PeerLogScanner` queue. Once the cumulative payload bytes exceed
66/// this threshold, the oldest entries are evicted on each `push`.
67///
68/// 64 MiB matches the channel's `MAX_FRAME_PAYLOAD` and is large
69/// enough to absorb a large in-flight transaction without being
70/// large enough to OOM a small replica box.
71pub const DEFAULT_PEER_SCANNER_MAX_BYTES: usize = 64 * 1024 * 1024;
72
73/// A [`LogScanner`] backed by an in-memory queue of `(vlsn, type, payload)`
74/// entries.
75///
76/// Entries are pushed by the `ReplicaReceiver` as they arrive from the
77/// master (or from another peer). The `PeerFeederRunner` consumes entries
78/// from this queue and streams them to a downstream replica.
79///
80/// ## Bounded memory (F10)
81///
82/// The queue has two configurable bounds:
83///
84/// - `max_entries`: maximum entry count (default
85/// [`DEFAULT_PEER_SCANNER_MAX_ENTRIES`]).
86/// - `max_bytes`: maximum cumulative payload size in bytes
87/// (default [`DEFAULT_PEER_SCANNER_MAX_BYTES`]).
88///
89/// On `push`, if either bound is exceeded the oldest entries are
90/// evicted from the front of the queue until both bounds are
91/// satisfied. The evicted entries are no longer available for peer
92/// streaming through this scanner; downstream peers that fall behind
93/// the eviction window must catch up via the on-disk
94/// `EnvironmentLogScanner` or via network restore. This matches
95/// HA semantics where peer-to-peer log distribution is
96/// best-effort and the on-disk log is the durable source.
97///
98/// Closes finding F10 of `docs/src/internal/api-audit-2026-05-rep.md`.
99///
100/// Thread safety: the queue is protected by a `Mutex` so that the receiver
101/// thread (writer) and the feeder thread (reader) can operate concurrently.
102pub struct PeerLogScanner {
103 queue: Mutex<VecDeque<(u64, u8, Vec<u8>)>>,
104 /// The VLSN range currently held in `queue`: `(first, last)`.
105 /// Updated lazily on `push`; used by `GroupService` callers to determine
106 /// whether this scanner can serve a given VLSN.
107 first_vlsn: Mutex<u64>,
108 last_vlsn: Mutex<u64>,
109 /// Maximum entry count before oldest-evicting begins.
110 max_entries: usize,
111 /// Maximum cumulative payload bytes before oldest-evicting begins.
112 max_bytes: usize,
113 /// Current cumulative payload bytes (updated on every push/evict).
114 current_bytes: Mutex<usize>,
115 /// Cumulative count of entries evicted by the F10 bound (for
116 /// observability and tests).
117 evicted_count: std::sync::atomic::AtomicU64,
118}
119
120impl PeerLogScanner {
121 /// Create an empty scanner with the default F10 bounds.
122 pub fn new() -> Self {
123 Self::with_capacity(
124 DEFAULT_PEER_SCANNER_MAX_ENTRIES,
125 DEFAULT_PEER_SCANNER_MAX_BYTES,
126 )
127 }
128
129 /// Create an empty scanner with explicit bounds.
130 ///
131 /// `max_entries` and `max_bytes` are both honoured; whichever is
132 /// breached first triggers oldest-evicting on subsequent `push`
133 /// calls. Passing `usize::MAX` disables the corresponding bound
134 /// (not recommended in production).
135 pub fn with_capacity(max_entries: usize, max_bytes: usize) -> Self {
136 Self {
137 queue: Mutex::new(VecDeque::new()),
138 first_vlsn: Mutex::new(0),
139 last_vlsn: Mutex::new(0),
140 max_entries: max_entries.max(1),
141 max_bytes: max_bytes.max(1),
142 current_bytes: Mutex::new(0),
143 evicted_count: std::sync::atomic::AtomicU64::new(0),
144 }
145 }
146
147 /// Push a log entry into the scanner's queue.
148 ///
149 /// Called by the `ReplicaReceiver` each time an entry is applied.
150 /// Entries are expected to be pushed in VLSN order, but this method is
151 /// not enforcing: every entry is appended to the queue unconditionally
152 /// and the cached `(first_vlsn, last_vlsn)` range is widened to cover
153 /// the new VLSN. Out-of-order or duplicate entries are filtered later
154 /// by [`LogScanner::next_entry`](crate::stream::feeder::LogScanner),
155 /// which skips entries with `vlsn < from_vlsn`.
156 ///
157 /// **F10 bound**: after the new entry is appended, if the queue
158 /// exceeds either `max_entries` or `max_bytes`, the oldest entries
159 /// are evicted from the front until both bounds are satisfied. The
160 /// retained `first_vlsn` is updated to the new front-of-queue VLSN
161 /// so downstream peers that ask for an evicted VLSN range observe
162 /// `log_range().first > from_vlsn` and know they must catch up via
163 /// the durable log.
164 pub fn push(&self, vlsn: u64, entry_type: u8, payload: Vec<u8>) {
165 let payload_len = payload.len();
166 {
167 let mut last = self.last_vlsn.lock();
168 if vlsn > *last {
169 *last = vlsn;
170 }
171 }
172 let mut q = self.queue.lock();
173 let mut current_bytes = self.current_bytes.lock();
174 // Append unconditionally.
175 q.push_back((vlsn, entry_type, payload));
176 *current_bytes += payload_len;
177
178 // F10 eviction: drop oldest until both bounds are honoured.
179 let mut evicted = 0u64;
180 while q.len() > self.max_entries || *current_bytes > self.max_bytes {
181 if let Some((_evicted_vlsn, _ty, evicted_payload)) = q.pop_front() {
182 *current_bytes =
183 current_bytes.saturating_sub(evicted_payload.len());
184 evicted += 1;
185 } else {
186 break;
187 }
188 }
189 if evicted > 0 {
190 self.evicted_count
191 .fetch_add(evicted, std::sync::atomic::Ordering::Relaxed);
192 }
193 // Refresh first_vlsn from the (possibly mutated) queue front.
194 let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
195 drop(current_bytes);
196 drop(q);
197 *self.first_vlsn.lock() = new_first;
198 }
199
200 /// Cumulative number of entries dropped by the F10 bound since
201 /// scanner construction. Useful for monitoring whether downstream
202 /// peers are keeping up.
203 pub fn evicted_count(&self) -> u64 {
204 self.evicted_count.load(std::sync::atomic::Ordering::Relaxed)
205 }
206
207 /// Current cumulative payload size in bytes (live snapshot).
208 pub fn current_bytes(&self) -> usize {
209 *self.current_bytes.lock()
210 }
211
212 /// Return the VLSN range currently held in this scanner.
213 ///
214 /// Returns `None` if the scanner is empty (no entries pushed yet).
215 pub fn log_range(&self) -> Option<(u64, u64)> {
216 let first = *self.first_vlsn.lock();
217 let last = *self.last_vlsn.lock();
218 if first == 0 { None } else { Some((first, last)) }
219 }
220
221 /// Return the number of entries currently queued.
222 pub fn len(&self) -> usize {
223 self.queue.lock().len()
224 }
225
226 /// Returns true if no entries are queued.
227 pub fn is_empty(&self) -> bool {
228 self.queue.lock().is_empty()
229 }
230}
231
232impl Default for PeerLogScanner {
233 fn default() -> Self {
234 Self::new()
235 }
236}
237
238impl LogScanner for PeerLogScanner {
239 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
240 let mut q = self.queue.lock();
241 // Skip entries with VLSN < from_vlsn (they were already seen by the
242 // downstream replica). Track byte-budget reduction so the F10
243 // bound stays accurate.
244 let mut current_bytes = self.current_bytes.lock();
245 while let Some(&(vlsn, _, _)) = q.front() {
246 if vlsn >= from_vlsn {
247 let entry = q.pop_front();
248 if let Some((_, _, ref payload)) = entry {
249 *current_bytes =
250 current_bytes.saturating_sub(payload.len());
251 }
252 let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
253 drop(current_bytes);
254 drop(q);
255 *self.first_vlsn.lock() = new_first;
256 return entry;
257 }
258 if let Some((_, _, evicted_payload)) = q.pop_front() {
259 *current_bytes =
260 current_bytes.saturating_sub(evicted_payload.len());
261 }
262 }
263 let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
264 drop(current_bytes);
265 drop(q);
266 *self.first_vlsn.lock() = new_first;
267 None
268 }
269}
270
271// ---------------------------------------------------------------------------
272// PeerFeederSource — Arc-wrapped PeerLogScanner that implements LogScanner
273// ---------------------------------------------------------------------------
274
275/// A shared, `Arc`-wrapped `PeerLogScanner` that can be passed between
276/// threads.
277///
278/// The `ReplicaReceiver` holds an `Arc<PeerFeederSource>` and calls
279/// `push()` as entries arrive. A `PeerFeederRunner` holds another clone
280/// and calls `next_entry()` to stream those entries downstream.
281pub struct PeerFeederSource(pub Arc<PeerLogScanner>);
282
283impl PeerFeederSource {
284 /// Create a new `PeerFeederSource` backed by a fresh `PeerLogScanner`.
285 pub fn new() -> Self {
286 Self(Arc::new(PeerLogScanner::new()))
287 }
288
289 /// Return a clone of the inner `Arc<PeerLogScanner>` for the receiver
290 /// thread to use when pushing entries.
291 pub fn clone_scanner(&self) -> Arc<PeerLogScanner> {
292 Arc::clone(&self.0)
293 }
294}
295
296impl Default for PeerFeederSource {
297 fn default() -> Self {
298 Self::new()
299 }
300}
301
302/// Adapter so `PeerFeederSource` can be used directly as a `LogScanner`.
303///
304/// We implement `LogScanner` on the *mutable reference* side: since
305/// `PeerFeederSource` is `Arc`-wrapped, we implement on a thin wrapper
306/// struct that holds an `Arc` and a local cursor.
307pub struct PeerScannerAdapter {
308 source: Arc<PeerLogScanner>,
309 cursor_vlsn: u64,
310}
311
312impl PeerScannerAdapter {
313 /// Create a new adapter starting from `start_vlsn`.
314 pub fn new(source: Arc<PeerLogScanner>, start_vlsn: u64) -> Self {
315 Self { source, cursor_vlsn: start_vlsn }
316 }
317}
318
319impl LogScanner for PeerScannerAdapter {
320 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
321 let effective_from = self.cursor_vlsn.max(from_vlsn);
322 let entry = {
323 let mut q = self.source.queue.lock();
324 let mut result = None;
325 while let Some(&(vlsn, _, _)) = q.front() {
326 if vlsn >= effective_from {
327 result = q.pop_front();
328 break;
329 }
330 q.pop_front(); // discard stale entries
331 }
332 result
333 };
334 if let Some((vlsn, _, _)) = &entry {
335 self.cursor_vlsn = vlsn + 1;
336 }
337 entry
338 }
339}
340
341// ---------------------------------------------------------------------------
342// PeerFeederRunner
343// ---------------------------------------------------------------------------
344
345/// Streams log entries from a `PeerLogScanner` to a downstream replica.
346///
347/// This is a thin wrapper around `FeederRunner` that uses a
348/// `PeerScannerAdapter` as the log source instead of reading from disk.
349///
350/// Corresponds to 's peer-to-peer feeder path in `FeederReplicaSyncup`.
351pub struct PeerFeederRunner {
352 inner: FeederRunner,
353 source: Arc<PeerLogScanner>,
354 start_vlsn: u64,
355}
356
357impl PeerFeederRunner {
358 /// Create a new peer feeder that streams entries from `source` to
359 /// `channel`, starting at `start_vlsn`.
360 pub fn new(
361 channel: Arc<dyn Channel>,
362 source: Arc<PeerLogScanner>,
363 start_vlsn: u64,
364 ) -> Self {
365 let inner = FeederRunner::new(channel, start_vlsn);
366 Self { inner, source, start_vlsn }
367 }
368
369 /// Run the peer feeder loop.
370 ///
371 /// Streams entries from the `PeerLogScanner` to the downstream replica.
372 /// Returns when the channel is closed or an I/O error occurs.
373 pub fn run(&self) -> Result<()> {
374 let mut adapter =
375 PeerScannerAdapter::new(Arc::clone(&self.source), self.start_vlsn);
376 self.inner.run(&mut adapter)
377 }
378
379 /// Return the last VLSN acknowledged by the downstream replica.
380 pub fn known_replica_vlsn(&self) -> u64 {
381 self.inner.known_replica_vlsn()
382 }
383}
384
385// ---------------------------------------------------------------------------
386// Syncup helpers
387// ---------------------------------------------------------------------------
388
389/// Result of a peer syncup negotiation.
390///
391/// HA, `FeederReplicaSyncup` finds the highest VLSN that is committed
392/// on BOTH the feeder and the replica (the "matchpoint"). The feeder then
393/// streams entries from matchpoint+1 onwards.
394///
395/// We model this as a simple VLSN range comparison: if the peer holds
396/// `[peer_first, peer_last]` and the replica needs `replica_needs` onwards,
397/// we can serve if `peer_first <= replica_needs <= peer_last`.
398#[derive(Debug, Clone, PartialEq, Eq)]
399pub enum SyncupResult {
400 /// The peer holds the needed range; stream from `start_vlsn`.
401 CanServe { start_vlsn: u64 },
402 /// The peer does not hold the needed VLSN; fall back to master or
403 /// network restore.
404 NeedsRestore,
405}
406
407/// Determine whether `peer_range` can serve a replica that needs
408/// log entries starting from `replica_needs`.
409pub fn negotiate_syncup(
410 peer_range: Option<(u64, u64)>,
411 replica_needs: u64,
412) -> SyncupResult {
413 match peer_range {
414 Some((first, last))
415 if first <= replica_needs && replica_needs <= last =>
416 {
417 SyncupResult::CanServe { start_vlsn: replica_needs }
418 }
419 _ => SyncupResult::NeedsRestore,
420 }
421}
422
423// ---------------------------------------------------------------------------
424// PeerFeederService — TCP ServiceHandler backed by a PeerLogScanner
425// ---------------------------------------------------------------------------
426
427/// A [`ServiceHandler`] that streams peer-held log entries to a requesting
428/// downstream replica over the `"PEER_FEEDER"` service.
429///
430/// The service is registered on the `TcpServiceDispatcher` at startup.
431/// When a downstream replica connects, the protocol is:
432///
433/// 1. The downstream sends `[start_vlsn: u64 LE]` (8 bytes) — the first VLSN
434/// it needs.
435/// 2. The server negotiates via `negotiate_syncup()`.
436/// 3. If the range is available, a `PeerFeederRunner` is spawned in a new
437/// thread and streams entries until the channel closes.
438/// 4. If the range is not available, the server responds with
439/// `[NEEDS_RESTORE: u8 = 1]` and closes the connection.
440///
441/// The `PeerLogScanner` (`source`) is populated by the node's own
442/// `ReplicaReceiver` as entries arrive from the master.
443pub struct PeerFeederService {
444 source: Arc<PeerLogScanner>,
445}
446
447impl PeerFeederService {
448 /// Create a new service backed by `source`.
449 pub fn new(source: Arc<PeerLogScanner>) -> Self {
450 Self { source }
451 }
452}
453
454/// Wire-level response codes sent by the server.
455const PEER_FEEDER_CAN_SERVE: u8 = 0;
456const PEER_FEEDER_NEEDS_RESTORE: u8 = 1;
457
458impl ServiceHandler for PeerFeederService {
459 fn service_name(&self) -> &str {
460 PEER_FEEDER_SERVICE_NAME
461 }
462
463 fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
464 use std::time::Duration;
465
466 // 1. Read the 8-byte start_vlsn from the downstream replica.
467 let msg =
468 channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
469 RepError::NetworkError(
470 "PEER_FEEDER: no start_vlsn received".into(),
471 )
472 })?;
473
474 if msg.len() < 8 {
475 return Err(RepError::NetworkError(format!(
476 "PEER_FEEDER: short handshake ({} bytes)",
477 msg.len()
478 )));
479 }
480 let start_vlsn =
481 u64::from_le_bytes(msg[..8].try_into().expect("slice of 8 bytes"));
482
483 // 2. Negotiate: do we hold the requested VLSN range?
484 let range = self.source.log_range();
485 match negotiate_syncup(range, start_vlsn) {
486 SyncupResult::CanServe { start_vlsn: sv } => {
487 // Tell the downstream it can proceed.
488 channel.send(&[PEER_FEEDER_CAN_SERVE])?;
489
490 // 3. Stream entries in this thread (the dispatcher already
491 // called us from a per-connection thread).
492 let channel_arc: Arc<dyn Channel> = Arc::from(channel);
493 let runner = PeerFeederRunner::new(
494 channel_arc,
495 Arc::clone(&self.source),
496 sv,
497 );
498 let _ = runner.run();
499 Ok(())
500 }
501 SyncupResult::NeedsRestore => {
502 channel.send(&[PEER_FEEDER_NEEDS_RESTORE])?;
503 Err(RepError::NetworkError(format!(
504 "PEER_FEEDER: cannot serve vlsn={start_vlsn}, \
505 range={range:?}"
506 )))
507 }
508 }
509 }
510}
511
512// ---------------------------------------------------------------------------
513// Client-side peer catch-up
514// ---------------------------------------------------------------------------
515
516/// Connect to a peer node's `PEER_FEEDER` service and pull log entries
517/// starting from `start_vlsn`.
518///
519/// This is the client counterpart to [`PeerFeederService`]. It is called
520/// by a replica that is behind and wants to catch up from a peer that holds
521/// the needed VLSN range (rather than routing all traffic through the master).
522///
523/// Protocol (matches `PeerFeederService::handle`):
524/// 1. Open a TCP connection and request the `"PEER_FEEDER"` service via
525/// `service_dispatcher::connect_to_service()`.
526/// 2. Send `[start_vlsn: u64 LE]`.
527/// 3. Read the one-byte response:
528/// - `0` (`PEER_FEEDER_CAN_SERVE`) — peer has the range; proceed.
529/// - `1` (`PEER_FEEDER_NEEDS_RESTORE`) — peer cannot serve; return
530/// `Ok(false)` so the caller can fall back to the master.
531/// 4. Start a `ReplicaReceiver` loop on the channel, passing each entry
532/// to `log_writer`. Returns `Ok(true)` when the peer closes the
533/// channel (i.e. the catch-up is complete).
534///
535/// # Pipelining
536///
537/// `catch_up_from_peer` is intentionally non-async. Call it from a
538/// dedicated thread per peer. To pipeline catch-up from multiple peers
539/// simultaneously, spawn one thread per peer (e.g. from a `ThreadPool`).
540/// The [`MultiPeerCatchUp`] helper below manages this.
541pub fn catch_up_from_peer(
542 peer_addr: std::net::SocketAddr,
543 start_vlsn: u64,
544 log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
545) -> Result<bool> {
546 use crate::net::service_dispatcher::connect_to_service;
547 use crate::stream::replica_stream::ReplicaReceiver;
548 use std::sync::Arc;
549 use std::time::Duration;
550
551 // Connect and request the PEER_FEEDER service.
552 let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
553
554 // Send start_vlsn.
555 channel.send(&start_vlsn.to_le_bytes())?;
556
557 // Read the one-byte response.
558 let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
559 RepError::NetworkError("no response from peer feeder".into())
560 })?;
561 if resp.is_empty() {
562 return Err(RepError::NetworkError(
563 "empty response from peer feeder".into(),
564 ));
565 }
566 match resp[0] {
567 PEER_FEEDER_CAN_SERVE => {}
568 PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
569 other => {
570 return Err(RepError::ProtocolError(format!(
571 "peer feeder unknown response byte: {other:#x}"
572 )));
573 }
574 }
575
576 // Run the replica receive loop.
577 let channel_arc: Arc<dyn Channel> = Arc::from(channel);
578 let receiver = ReplicaReceiver::new(channel_arc);
579 receiver.run(log_writer)?;
580
581 Ok(true)
582}
583
584/// Pipelined catch-up from multiple peer nodes simultaneously.
585///
586/// Spawns one thread per peer in `peers` and waits for all to finish (or
587/// for the first to succeed). Returns the name of the peer that supplied
588/// the entries, or `None` if no peer could serve the range.
589///
590/// The `log_writer_factory` closure is called once per thread to produce a
591/// per-thread `LogWriter`. The factory must be `Send + Sync`.
592pub struct MultiPeerCatchUp {
593 peers: Vec<(String, std::net::SocketAddr)>,
594 start_vlsn: u64,
595}
596
597impl MultiPeerCatchUp {
598 /// Create a new multi-peer catch-up request.
599 ///
600 /// `peers` is a list of `(node_name, socket_addr)` pairs to try.
601 pub fn new(
602 peers: Vec<(String, std::net::SocketAddr)>,
603 start_vlsn: u64,
604 ) -> Self {
605 Self { peers, start_vlsn }
606 }
607
608 /// Run pipelined catch-up.
609 ///
610 /// Spawns one thread per peer and waits for the first to succeed.
611 /// Each thread calls `catch_up_from_peer`; the winning thread's entries
612 /// are applied through `make_writer()`. Other threads are joined once
613 /// the first succeeds.
614 ///
615 /// Returns the name of the first peer that successfully served the range,
616 /// or `None` if all peers declined.
617 pub fn run<F, W>(self, make_writer: F) -> Option<String>
618 where
619 F: Fn() -> W + Send + Sync + 'static,
620 W: crate::stream::replica_stream::LogWriter + Send + 'static,
621 {
622 use std::sync::atomic::{AtomicBool, Ordering};
623 let make_writer = std::sync::Arc::new(make_writer);
624 let done = std::sync::Arc::new(AtomicBool::new(false));
625 let winner: std::sync::Arc<noxu_sync::Mutex<Option<String>>> =
626 std::sync::Arc::new(noxu_sync::Mutex::new(None));
627
628 let mut handles = Vec::new();
629
630 for (name, addr) in self.peers {
631 let make_writer = std::sync::Arc::clone(&make_writer);
632 let done = std::sync::Arc::clone(&done);
633 let winner = std::sync::Arc::clone(&winner);
634 let start_vlsn = self.start_vlsn;
635 let name_clone = name.clone();
636
637 let handle = std::thread::Builder::new()
638 .name(format!("noxu-peer-catchup-{}", name))
639 .spawn(move || {
640 if done.load(Ordering::Acquire) {
641 return; // another peer already won
642 }
643 let mut writer = make_writer();
644 match catch_up_from_peer(addr, start_vlsn, &mut writer) {
645 Ok(true) => {
646 if !done.swap(true, Ordering::AcqRel) {
647 *winner.lock() = Some(name_clone);
648 }
649 }
650 Ok(false) => {
651 log::debug!(
652 "peer '{}' cannot serve vlsn={start_vlsn}",
653 name
654 );
655 }
656 Err(e) => {
657 log::warn!(
658 "catch-up from peer '{}' failed: {e}",
659 name
660 );
661 }
662 }
663 })
664 .expect("failed to spawn peer catch-up thread");
665
666 handles.push(handle);
667 }
668
669 for h in handles {
670 let _ = h.join();
671 }
672
673 winner.lock().clone()
674 }
675}
676
677// ---------------------------------------------------------------------------
678// Tests
679// ---------------------------------------------------------------------------
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684 use crate::net::channel::LocalChannelPair;
685 use std::time::Duration;
686
687 // -----------------------------------------------------------------------
688 // PeerLogScanner unit tests
689 // -----------------------------------------------------------------------
690
691 #[test]
692 fn test_peer_scanner_push_and_log_range() {
693 let scanner = PeerLogScanner::new();
694 assert!(scanner.is_empty());
695 assert!(scanner.log_range().is_none());
696
697 scanner.push(5, 1, b"entry5".to_vec());
698 scanner.push(6, 1, b"entry6".to_vec());
699 scanner.push(10, 1, b"entry10".to_vec());
700
701 assert_eq!(scanner.len(), 3);
702 assert_eq!(scanner.log_range(), Some((5, 10)));
703 }
704
705 #[test]
706 fn test_peer_scanner_next_entry_in_order() {
707 let mut scanner = PeerLogScanner::new();
708 for vlsn in [3u64, 4, 5, 6, 7] {
709 scanner.push(vlsn, 1, vlsn.to_le_bytes().to_vec());
710 }
711
712 // Ask from_vlsn=4 — should skip 3 and return 4, 5, 6, 7.
713 let mut results = Vec::new();
714 while let Some((vlsn, _, _)) = scanner.next_entry(4) {
715 results.push(vlsn);
716 }
717 assert_eq!(results, vec![4, 5, 6, 7]);
718 }
719
720 #[test]
721 fn test_peer_scanner_skips_stale_entries() {
722 let mut scanner = PeerLogScanner::new();
723 for v in [1u64, 2, 3, 10, 11] {
724 scanner.push(v, 1, vec![v as u8]);
725 }
726 // Ask from 10 — entries 1, 2, 3 are stale.
727 let entry = scanner.next_entry(10);
728 assert_eq!(entry.map(|(v, _, _)| v), Some(10));
729 }
730
731 #[test]
732 fn test_peer_scanner_empty_returns_none() {
733 let mut scanner = PeerLogScanner::new();
734 assert!(scanner.next_entry(1).is_none());
735 }
736
737 // -----------------------------------------------------------------------
738 // PeerScannerAdapter unit tests
739 // -----------------------------------------------------------------------
740
741 #[test]
742 fn test_peer_scanner_adapter_cursor_advances() {
743 let source = Arc::new(PeerLogScanner::new());
744 for v in [1u64, 2, 3, 4, 5] {
745 source.push(v, 1, vec![v as u8]);
746 }
747
748 let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 1);
749 let mut seen = Vec::new();
750 while let Some((v, _, _)) = adapter.next_entry(1) {
751 seen.push(v);
752 }
753 assert_eq!(seen, vec![1, 2, 3, 4, 5]);
754 }
755
756 // -----------------------------------------------------------------------
757 // PeerFeederRunner integration test
758 // -----------------------------------------------------------------------
759
760 #[test]
761 fn test_peer_feeder_runner_streams_to_replica() {
762 let source = Arc::new(PeerLogScanner::new());
763 for v in [10u64, 11, 12, 13, 14] {
764 source.push(v, 1, format!("payload-{v}").into_bytes());
765 }
766
767 let pair = LocalChannelPair::new();
768 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
769 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
770
771 // Receiver: collect all frames.
772 let recv_handle = {
773 let receiver = Arc::clone(&receiver);
774 std::thread::spawn(move || {
775 let mut vlsns = Vec::new();
776 // Expect 5 frames then a timeout.
777 for _ in 0..5 {
778 let frame = receiver
779 .receive(Duration::from_secs(5))
780 .unwrap()
781 .unwrap();
782 let vlsn =
783 u64::from_le_bytes(frame[0..8].try_into().unwrap());
784 vlsns.push(vlsn);
785 // Send ack.
786 let _ = receiver.send(&vlsn.to_le_bytes());
787 }
788 vlsns
789 })
790 };
791
792 let runner =
793 PeerFeederRunner::new(Arc::clone(&sender), Arc::clone(&source), 10);
794 let sender_clone = Arc::clone(&sender);
795 let run_handle = std::thread::spawn(move || {
796 let _ = runner.run();
797 });
798
799 // Wait for receiver to collect all 5 frames.
800 let vlsns = recv_handle.join().unwrap();
801 assert_eq!(vlsns, vec![10, 11, 12, 13, 14]);
802
803 sender_clone.close().unwrap();
804 let _ = run_handle.join();
805 }
806
807 // -----------------------------------------------------------------------
808 // negotiate_syncup tests
809 // -----------------------------------------------------------------------
810
811 #[test]
812 fn test_negotiate_syncup_can_serve() {
813 assert_eq!(
814 negotiate_syncup(Some((5, 20)), 10),
815 SyncupResult::CanServe { start_vlsn: 10 }
816 );
817 // Exact boundary.
818 assert_eq!(
819 negotiate_syncup(Some((10, 10)), 10),
820 SyncupResult::CanServe { start_vlsn: 10 }
821 );
822 }
823
824 #[test]
825 fn test_negotiate_syncup_needs_restore_too_early() {
826 // Replica needs VLSN 3 but peer only has [5, 20].
827 assert_eq!(
828 negotiate_syncup(Some((5, 20)), 3),
829 SyncupResult::NeedsRestore
830 );
831 }
832
833 #[test]
834 fn test_negotiate_syncup_needs_restore_too_late() {
835 // Replica needs VLSN 25 but peer only has [5, 20].
836 assert_eq!(
837 negotiate_syncup(Some((5, 20)), 25),
838 SyncupResult::NeedsRestore
839 );
840 }
841
842 #[test]
843 fn test_negotiate_syncup_no_range() {
844 // Peer has no log range (just joined or restoring).
845 assert_eq!(negotiate_syncup(None, 10), SyncupResult::NeedsRestore);
846 }
847
848 // -----------------------------------------------------------------------
849 // GroupService CBVLSN integration
850 // -----------------------------------------------------------------------
851
852 #[test]
853 fn test_group_service_cbvlsn_tracks_minimum() {
854 use crate::group_service::{GroupService, NodeInfo};
855 use crate::node_type::NodeType;
856 use std::time::Instant;
857
858 let gs = GroupService::new("test_group".to_string());
859
860 // Add 3 electable nodes.
861 for (name, port) in [("n1", 5001u16), ("n2", 5002), ("n3", 5003)] {
862 gs.add_node(NodeInfo {
863 name: name.to_string(),
864 node_type: NodeType::Electable,
865 host: "localhost".to_string(),
866 port,
867 node_id: port as u32,
868 joined_at: Instant::now(),
869 last_seen: Instant::now(),
870 is_active: true,
871 known_vlsn: 0,
872 log_range: None,
873 read_capacity_pct: 100,
874 write_capacity_pct: 100,
875 latency_hint_ms: 1,
876 })
877 .unwrap();
878 }
879
880 // Initially all known_vlsn = 0 → CBVLSN = 0.
881 assert_eq!(gs.get_cbvlsn(), 0);
882
883 // Update n1 and n2 but not n3 → CBVLSN = min(50, 40, 0) = 0.
884 gs.update_node_vlsn("n1", 50);
885 gs.update_node_vlsn("n2", 40);
886 assert_eq!(gs.get_cbvlsn(), 0, "n3 still at 0, CBVLSN must be 0");
887
888 // Now n3 also updates → CBVLSN = min(50, 40, 30) = 30.
889 gs.update_node_vlsn("n3", 30);
890 assert_eq!(gs.get_cbvlsn(), 30);
891
892 // n3 advances → min(50, 40, 45) = 40.
893 gs.update_node_vlsn("n3", 45);
894 assert_eq!(gs.get_cbvlsn(), 40);
895 }
896
897 #[test]
898 fn test_group_service_cbvlsn_monotone_nondecreasing() {
899 use crate::group_service::{GroupService, NodeInfo};
900 use crate::node_type::NodeType;
901 use std::time::Instant;
902
903 let gs = GroupService::new("cbvlsn_monotone".to_string());
904
905 for (name, port) in [("a", 5001u16), ("b", 5002)] {
906 gs.add_node(NodeInfo {
907 name: name.to_string(),
908 node_type: NodeType::Electable,
909 host: "localhost".to_string(),
910 port,
911 node_id: port as u32,
912 joined_at: Instant::now(),
913 last_seen: Instant::now(),
914 is_active: true,
915 known_vlsn: 0,
916 log_range: None,
917 read_capacity_pct: 100,
918 write_capacity_pct: 100,
919 latency_hint_ms: 1,
920 })
921 .unwrap();
922 }
923
924 // CBVLSN must never decrease.
925 let mut prev = 0u64;
926 for (na, va, nb, vb) in [
927 ("a", 10u64, "b", 5u64),
928 ("a", 20, "b", 15),
929 ("a", 25, "b", 22),
930 ("a", 30, "b", 28),
931 ] {
932 gs.update_node_vlsn(na, va);
933 gs.update_node_vlsn(nb, vb);
934 let cbvlsn = gs.get_cbvlsn();
935 assert!(
936 cbvlsn >= prev,
937 "CBVLSN must not decrease: was {prev}, now {cbvlsn}"
938 );
939 prev = cbvlsn;
940 }
941 }
942
943 #[test]
944 fn test_group_service_find_peers_with_vlsn() {
945 use crate::group_service::{GroupService, NodeInfo};
946 use crate::node_type::NodeType;
947 use std::time::Instant;
948
949 let gs = GroupService::new("peer_select".to_string());
950
951 // Node a: holds [1, 100]
952 // Node b: holds [50, 200]
953 // Node c: no range
954 for (name, port, range) in [
955 ("a", 5001u16, Some((1u64, 100u64))),
956 ("b", 5002, Some((50, 200))),
957 ("c", 5003, None),
958 ] {
959 let mut info = NodeInfo {
960 name: name.to_string(),
961 node_type: NodeType::Electable,
962 host: "localhost".to_string(),
963 port,
964 node_id: port as u32,
965 joined_at: Instant::now(),
966 last_seen: Instant::now(),
967 is_active: true,
968 known_vlsn: 0,
969 log_range: range,
970 read_capacity_pct: 100,
971 write_capacity_pct: 100,
972 latency_hint_ms: 1,
973 };
974 // Set last_seen differently so we can check sort order.
975 info.last_seen = Instant::now()
976 - std::time::Duration::from_millis(port as u64 * 10);
977 gs.add_node(info).unwrap();
978 }
979
980 // VLSN 75: only a and b hold it.
981 let peers = gs.find_peers_with_vlsn(75);
982 assert!(peers.contains(&"a".to_string()));
983 assert!(peers.contains(&"b".to_string()));
984 assert!(!peers.contains(&"c".to_string()));
985
986 // VLSN 150: only b holds it.
987 let peers = gs.find_peers_with_vlsn(150);
988 assert_eq!(peers, vec!["b".to_string()]);
989
990 // VLSN 201: nobody holds it.
991 assert!(gs.find_peers_with_vlsn(201).is_empty());
992 }
993
994 #[test]
995 fn test_group_service_update_log_range() {
996 use crate::group_service::{GroupService, NodeInfo};
997 use crate::node_type::NodeType;
998 use std::time::Instant;
999
1000 let gs = GroupService::new("log_range_test".to_string());
1001 gs.add_node(NodeInfo {
1002 name: "r1".to_string(),
1003 node_type: NodeType::Electable,
1004 host: "localhost".to_string(),
1005 port: 5001,
1006 node_id: 1,
1007 joined_at: Instant::now(),
1008 last_seen: Instant::now(),
1009 is_active: true,
1010 known_vlsn: 0,
1011 log_range: None,
1012 read_capacity_pct: 100,
1013 write_capacity_pct: 100,
1014 latency_hint_ms: 1,
1015 })
1016 .unwrap();
1017
1018 // Initially no range.
1019 assert!(gs.get_node("r1").unwrap().log_range.is_none());
1020
1021 // Update range.
1022 gs.update_node_log_range("r1", 100, 500);
1023 assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 500)));
1024
1025 // Extend range.
1026 gs.update_node_log_range("r1", 100, 800);
1027 assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 800)));
1028 }
1029
1030 // -----------------------------------------------------------------------
1031 // PeerFeederService::handle paths
1032 // -----------------------------------------------------------------------
1033
1034 #[test]
1035 fn test_peer_feeder_service_can_serve() {
1036 use crate::net::channel::LocalChannelPair;
1037
1038 // Source has range [10, 20]. Client requests start_vlsn=15
1039 // (in range). Service should respond with CAN_SERVE then
1040 // stream entries until close.
1041 let source = Arc::new(PeerLogScanner::new());
1042 for v in 10u64..=20 {
1043 source.push(v, 0, format!("e{}", v).into_bytes());
1044 }
1045 let svc = PeerFeederService::new(Arc::clone(&source));
1046
1047 let pair = LocalChannelPair::new();
1048 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1049 let client_ch = pair.channel_b;
1050
1051 // Client sends start_vlsn=15.
1052 client_ch.send(&15u64.to_le_bytes()).unwrap();
1053
1054 // Service runs in another thread so we can observe the
1055 // streaming side.
1056 let svc_handle = std::thread::spawn(move || svc.handle(server_ch));
1057
1058 // Read the 1-byte response.
1059 let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1060 assert_eq!(
1061 resp,
1062 vec![PEER_FEEDER_CAN_SERVE],
1063 "service should reply CAN_SERVE for in-range start_vlsn"
1064 );
1065
1066 // Drain a few frames then close to terminate the runner.
1067 let mut frames = 0;
1068 while let Ok(Some(_)) = client_ch.receive(Duration::from_millis(80)) {
1069 frames += 1;
1070 if frames >= 3 {
1071 break;
1072 }
1073 }
1074 // Close client side so runner returns.
1075 client_ch.close().unwrap();
1076 let _ = svc_handle.join().unwrap();
1077 assert!(frames >= 1, "service must have streamed at least one frame");
1078 }
1079
1080 #[test]
1081 fn test_peer_feeder_service_needs_restore() {
1082 use crate::net::channel::LocalChannelPair;
1083
1084 // Source has range [10, 20]. Client requests start_vlsn=5
1085 // (too early). Service replies NEEDS_RESTORE and errors.
1086 let source = Arc::new(PeerLogScanner::new());
1087 for v in 10u64..=20 {
1088 source.push(v, 0, vec![]);
1089 }
1090 let svc = PeerFeederService::new(Arc::clone(&source));
1091
1092 let pair = LocalChannelPair::new();
1093 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1094 let client_ch = pair.channel_b;
1095
1096 client_ch.send(&5u64.to_le_bytes()).unwrap();
1097
1098 let r = svc.handle(server_ch);
1099 assert!(r.is_err(), "service must return Err on NEEDS_RESTORE");
1100 let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1101 assert_eq!(
1102 resp,
1103 vec![PEER_FEEDER_NEEDS_RESTORE],
1104 "service should reply NEEDS_RESTORE for out-of-range start_vlsn"
1105 );
1106 }
1107
1108 #[test]
1109 fn test_peer_feeder_service_short_handshake_errors() {
1110 use crate::net::channel::LocalChannelPair;
1111
1112 let source = Arc::new(PeerLogScanner::new());
1113 let svc = PeerFeederService::new(Arc::clone(&source));
1114
1115 let pair = LocalChannelPair::new();
1116 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1117 let client_ch = pair.channel_b;
1118
1119 // Send only 4 bytes — too short to be a valid u64
1120 // start_vlsn. Service must Err with "short handshake".
1121 client_ch.send(&[0u8; 4]).unwrap();
1122
1123 let r = svc.handle(server_ch);
1124 assert!(r.is_err(), "short handshake must error");
1125 let msg = format!("{}", r.err().unwrap());
1126 assert!(
1127 msg.contains("short handshake"),
1128 "expected 'short handshake' in error, got: {msg}"
1129 );
1130 }
1131
1132 #[test]
1133 fn test_peer_feeder_service_no_handshake_errors() {
1134 use crate::net::channel::LocalChannelPair;
1135
1136 let source = Arc::new(PeerLogScanner::new());
1137 let svc = PeerFeederService::new(Arc::clone(&source));
1138
1139 let pair = LocalChannelPair::new();
1140 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1141 // Drop client side so receive returns None (no message).
1142 drop(pair.channel_b);
1143
1144 let r = svc.handle(server_ch);
1145 assert!(r.is_err(), "no-handshake must error");
1146 }
1147
1148 #[test]
1149 fn test_peer_feeder_service_name() {
1150 let source = Arc::new(PeerLogScanner::new());
1151 let svc = PeerFeederService::new(source);
1152 assert_eq!(
1153 svc.service_name(),
1154 PEER_FEEDER_SERVICE_NAME,
1155 "service_name must match the protocol const"
1156 );
1157 }
1158
1159 // -----------------------------------------------------------------------
1160 // PeerFeederSource and adapter constructors
1161 // -----------------------------------------------------------------------
1162
1163 #[test]
1164 fn test_peer_feeder_source_default_and_clone_scanner() {
1165 let src1 = PeerFeederSource::new();
1166 // default() and new() produce the same shape.
1167 let src2 = PeerFeederSource::default();
1168 let s1 = src1.clone_scanner();
1169 let s2 = src2.clone_scanner();
1170 // Distinct underlying scanners (each PeerFeederSource owns
1171 // its own Arc).
1172 s1.push(1, 0, b"a".to_vec());
1173 assert_eq!(s1.len(), 1);
1174 assert_eq!(s2.len(), 0);
1175 }
1176
1177 #[test]
1178 fn test_peer_log_scanner_default_is_empty() {
1179 let s = PeerLogScanner::default();
1180 assert!(s.is_empty());
1181 assert_eq!(s.len(), 0);
1182 assert!(s.log_range().is_none());
1183 }
1184
1185 #[test]
1186 fn test_peer_feeder_runner_known_replica_vlsn_initial_zero() {
1187 use crate::net::channel::LocalChannelPair;
1188
1189 let pair = LocalChannelPair::new();
1190 let channel: Arc<dyn Channel> = Arc::new(pair.channel_a);
1191 let source = Arc::new(PeerLogScanner::new());
1192 let runner = PeerFeederRunner::new(channel, source, 1);
1193 assert_eq!(runner.known_replica_vlsn(), 0);
1194 }
1195
1196 // -----------------------------------------------------------------------
1197 // PeerScannerAdapter: stale-entry skipping
1198 // -----------------------------------------------------------------------
1199
1200 #[test]
1201 fn test_peer_scanner_adapter_skips_stale_via_pop_front() {
1202 // After a known_replica_vlsn advance, push some entries
1203 // that are below the new floor — the adapter should
1204 // discard them via pop_front().
1205 let source = Arc::new(PeerLogScanner::new());
1206 for v in 1u64..=5 {
1207 source.push(v, 0, vec![]);
1208 }
1209 let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 3);
1210 // First call returns vlsn=3, skipping 1 and 2.
1211 let r = adapter.next_entry(3);
1212 assert!(r.is_some());
1213 let (vlsn, _, _) = r.unwrap();
1214 assert_eq!(vlsn, 3);
1215 // 4 next.
1216 let (vlsn, _, _) = adapter.next_entry(4).unwrap();
1217 assert_eq!(vlsn, 4);
1218 }
1219}