Skip to main content

noxu_rep/stream/
peer_feeder.rs

1//! Peer-to-peer log distribution — master-feeder pattern.
2//!
3//! Each replica maintains a log range `[first_vlsn,
4//! last_vlsn]` and can act as a feeder for other replicas that need entries
5//! in that range.  This avoids routing all log-shipping traffic through the
6//! master, which would otherwise be the sole bottleneck.
7//!
8//! ## Architecture
9//!
10//! ```text
11//! Master ──► FeederRunner (master feeder) ──► Replica A
12//!                                        ──► Replica B ──► PeerFeederRunner ──► Replica C
13//! ```
14//!
15//! `PeerLogScanner` is a `LogScanner` implementation backed by an
16//! in-memory `VecDeque<(vlsn, entry_type, payload)>`.  It is populated by
17//! the local `ReplicaReceiver` as entries arrive from the master.
18//!
19//! `PeerFeederRunner` wraps a `FeederRunner` and a `PeerLogScanner`.  The
20//! `GroupService` is queried to find peers that hold the needed VLSN range,
21//! and a `FeederRunner` is spawned to stream entries to the requesting node.
22//!
23//! ## CBVLSN
24//!
25//! The Cleaner Barrier VLSN is the global minimum `known_vlsn` across all
26//! active electable replicas.  The log cleaner uses this to decide which
27//! log files it is safe to reclaim.  It is maintained by `GroupService`
28//! and updated on every heartbeat / ack.
29//!
30//! Corresponds to `FeederReplicaSyncup`, `LocalCBVLSNUpdater`, and
31//! `RepGroupImpl.getCBVLSN()` in the implementation.
32
33use std::collections::VecDeque;
34use std::sync::Arc;
35
36use noxu_sync::Mutex;
37
38use crate::error::{RepError, Result};
39use crate::net::channel::Channel;
40use crate::net::service_dispatcher::ServiceHandler;
41use crate::stream::feeder::{FeederRunner, LogScanner};
42
43/// Service name registered with `TcpServiceDispatcher` for peer log feeds.
44pub const PEER_FEEDER_SERVICE_NAME: &str = "PEER_FEEDER";
45
46// ---------------------------------------------------------------------------
47// PeerLogScanner
48// ---------------------------------------------------------------------------
49
50/// Default maximum number of entries retained in a `PeerLogScanner`
51/// in-memory queue.
52///
53/// Without a bound, every replicated entry stays in RAM until it is
54/// consumed by a downstream peer. A long-running replica with no
55/// downstream consumers therefore accumulates one VecDeque entry per
56/// replicated record forever (audit finding F10).
57///
58/// 16 384 entries is enough headroom for the slowest expected
59/// downstream peer to drain while keeping resident memory bounded
60/// (assuming sub-MiB entries, this caps the queue at ~16 GiB worst
61/// case; the byte-size cap is the harder bound).
62pub const DEFAULT_PEER_SCANNER_MAX_ENTRIES: usize = 16_384;
63
64/// Default maximum total payload size, in bytes, retained in a
65/// `PeerLogScanner` queue.  Once the cumulative payload bytes exceed
66/// this threshold, the oldest entries are evicted on each `push`.
67///
68/// 64 MiB matches the channel's `MAX_FRAME_PAYLOAD` and is large
69/// enough to absorb a large in-flight transaction without being
70/// large enough to OOM a small replica box.
71pub const DEFAULT_PEER_SCANNER_MAX_BYTES: usize = 64 * 1024 * 1024;
72
73/// A [`LogScanner`] backed by an in-memory queue of `(vlsn, type, payload)`
74/// entries.
75///
76/// Entries are pushed by the `ReplicaReceiver` as they arrive from the
77/// master (or from another peer).  The `PeerFeederRunner` consumes entries
78/// from this queue and streams them to a downstream replica.
79///
80/// ## Bounded memory (F10)
81///
82/// The queue has two configurable bounds:
83///
84/// - `max_entries`: maximum entry count (default
85///   [`DEFAULT_PEER_SCANNER_MAX_ENTRIES`]).
86/// - `max_bytes`: maximum cumulative payload size in bytes
87///   (default [`DEFAULT_PEER_SCANNER_MAX_BYTES`]).
88///
89/// On `push`, if either bound is exceeded the oldest entries are
90/// evicted from the front of the queue until both bounds are
91/// satisfied.  The evicted entries are no longer available for peer
92/// streaming through this scanner; downstream peers that fall behind
93/// the eviction window must catch up via the on-disk
94/// `EnvironmentLogScanner` or via network restore.  This matches
95/// HA semantics where peer-to-peer log distribution is
96/// best-effort and the on-disk log is the durable source.
97///
98/// Closes finding F10 of `docs/src/internal/api-audit-2026-05-rep.md`.
99///
100/// Thread safety: the queue is protected by a `Mutex` so that the receiver
101/// thread (writer) and the feeder thread (reader) can operate concurrently.
102pub struct PeerLogScanner {
103    queue: Mutex<VecDeque<(u64, u8, Vec<u8>)>>,
104    /// The VLSN range currently held in `queue`: `(first, last)`.
105    /// Updated lazily on `push`; used by `GroupService` callers to determine
106    /// whether this scanner can serve a given VLSN.
107    first_vlsn: Mutex<u64>,
108    last_vlsn: Mutex<u64>,
109    /// Maximum entry count before oldest-evicting begins.
110    max_entries: usize,
111    /// Maximum cumulative payload bytes before oldest-evicting begins.
112    max_bytes: usize,
113    /// Current cumulative payload bytes (updated on every push/evict).
114    current_bytes: Mutex<usize>,
115    /// Cumulative count of entries evicted by the F10 bound (for
116    /// observability and tests).
117    evicted_count: std::sync::atomic::AtomicU64,
118}
119
120impl PeerLogScanner {
121    /// Create an empty scanner with the default F10 bounds.
122    pub fn new() -> Self {
123        Self::with_capacity(
124            DEFAULT_PEER_SCANNER_MAX_ENTRIES,
125            DEFAULT_PEER_SCANNER_MAX_BYTES,
126        )
127    }
128
129    /// Create an empty scanner with explicit bounds.
130    ///
131    /// `max_entries` and `max_bytes` are both honoured; whichever is
132    /// breached first triggers oldest-evicting on subsequent `push`
133    /// calls.  Passing `usize::MAX` disables the corresponding bound
134    /// (not recommended in production).
135    pub fn with_capacity(max_entries: usize, max_bytes: usize) -> Self {
136        Self {
137            queue: Mutex::new(VecDeque::new()),
138            first_vlsn: Mutex::new(0),
139            last_vlsn: Mutex::new(0),
140            max_entries: max_entries.max(1),
141            max_bytes: max_bytes.max(1),
142            current_bytes: Mutex::new(0),
143            evicted_count: std::sync::atomic::AtomicU64::new(0),
144        }
145    }
146
147    /// Push a log entry into the scanner's queue.
148    ///
149    /// Called by the `ReplicaReceiver` each time an entry is applied.
150    /// Entries are expected to be pushed in VLSN order, but this method is
151    /// not enforcing: every entry is appended to the queue unconditionally
152    /// and the cached `(first_vlsn, last_vlsn)` range is widened to cover
153    /// the new VLSN. Out-of-order or duplicate entries are filtered later
154    /// by [`LogScanner::next_entry`](crate::stream::feeder::LogScanner),
155    /// which skips entries with `vlsn < from_vlsn`.
156    ///
157    /// **F10 bound**: after the new entry is appended, if the queue
158    /// exceeds either `max_entries` or `max_bytes`, the oldest entries
159    /// are evicted from the front until both bounds are satisfied. The
160    /// retained `first_vlsn` is updated to the new front-of-queue VLSN
161    /// so downstream peers that ask for an evicted VLSN range observe
162    /// `log_range().first > from_vlsn` and know they must catch up via
163    /// the durable log.
164    pub fn push(&self, vlsn: u64, entry_type: u8, payload: Vec<u8>) {
165        let payload_len = payload.len();
166        {
167            let mut last = self.last_vlsn.lock();
168            if vlsn > *last {
169                *last = vlsn;
170            }
171        }
172        let mut q = self.queue.lock();
173        let mut current_bytes = self.current_bytes.lock();
174        // Append unconditionally.
175        q.push_back((vlsn, entry_type, payload));
176        *current_bytes += payload_len;
177
178        // F10 eviction: drop oldest until both bounds are honoured.
179        let mut evicted = 0u64;
180        while q.len() > self.max_entries || *current_bytes > self.max_bytes {
181            if let Some((_evicted_vlsn, _ty, evicted_payload)) = q.pop_front() {
182                *current_bytes =
183                    current_bytes.saturating_sub(evicted_payload.len());
184                evicted += 1;
185            } else {
186                break;
187            }
188        }
189        if evicted > 0 {
190            self.evicted_count
191                .fetch_add(evicted, std::sync::atomic::Ordering::Relaxed);
192        }
193        // Refresh first_vlsn from the (possibly mutated) queue front.
194        let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
195        drop(current_bytes);
196        drop(q);
197        *self.first_vlsn.lock() = new_first;
198    }
199
200    /// Cumulative number of entries dropped by the F10 bound since
201    /// scanner construction.  Useful for monitoring whether downstream
202    /// peers are keeping up.
203    pub fn evicted_count(&self) -> u64 {
204        self.evicted_count.load(std::sync::atomic::Ordering::Relaxed)
205    }
206
207    /// Current cumulative payload size in bytes (live snapshot).
208    pub fn current_bytes(&self) -> usize {
209        *self.current_bytes.lock()
210    }
211
212    /// Return the VLSN range currently held in this scanner.
213    ///
214    /// Returns `None` if the scanner is empty (no entries pushed yet).
215    pub fn log_range(&self) -> Option<(u64, u64)> {
216        let first = *self.first_vlsn.lock();
217        let last = *self.last_vlsn.lock();
218        if first == 0 { None } else { Some((first, last)) }
219    }
220
221    /// Return the number of entries currently queued.
222    pub fn len(&self) -> usize {
223        self.queue.lock().len()
224    }
225
226    /// Returns true if no entries are queued.
227    pub fn is_empty(&self) -> bool {
228        self.queue.lock().is_empty()
229    }
230}
231
232impl Default for PeerLogScanner {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237
238impl LogScanner for PeerLogScanner {
239    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
240        let mut q = self.queue.lock();
241        // Skip entries with VLSN < from_vlsn (they were already seen by the
242        // downstream replica). Track byte-budget reduction so the F10
243        // bound stays accurate.
244        let mut current_bytes = self.current_bytes.lock();
245        while let Some(&(vlsn, _, _)) = q.front() {
246            if vlsn >= from_vlsn {
247                let entry = q.pop_front();
248                if let Some((_, _, ref payload)) = entry {
249                    *current_bytes =
250                        current_bytes.saturating_sub(payload.len());
251                }
252                let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
253                drop(current_bytes);
254                drop(q);
255                *self.first_vlsn.lock() = new_first;
256                return entry;
257            }
258            if let Some((_, _, evicted_payload)) = q.pop_front() {
259                *current_bytes =
260                    current_bytes.saturating_sub(evicted_payload.len());
261            }
262        }
263        let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
264        drop(current_bytes);
265        drop(q);
266        *self.first_vlsn.lock() = new_first;
267        None
268    }
269}
270
271// ---------------------------------------------------------------------------
272// PeerFeederSource — Arc-wrapped PeerLogScanner that implements LogScanner
273// ---------------------------------------------------------------------------
274
275/// A shared, `Arc`-wrapped `PeerLogScanner` that can be passed between
276/// threads.
277///
278/// The `ReplicaReceiver` holds an `Arc<PeerFeederSource>` and calls
279/// `push()` as entries arrive. A `PeerFeederRunner` holds another clone
280/// and calls `next_entry()` to stream those entries downstream.
281pub struct PeerFeederSource(pub Arc<PeerLogScanner>);
282
283impl PeerFeederSource {
284    /// Create a new `PeerFeederSource` backed by a fresh `PeerLogScanner`.
285    pub fn new() -> Self {
286        Self(Arc::new(PeerLogScanner::new()))
287    }
288
289    /// Return a clone of the inner `Arc<PeerLogScanner>` for the receiver
290    /// thread to use when pushing entries.
291    pub fn clone_scanner(&self) -> Arc<PeerLogScanner> {
292        Arc::clone(&self.0)
293    }
294}
295
296impl Default for PeerFeederSource {
297    fn default() -> Self {
298        Self::new()
299    }
300}
301
302/// Adapter so `PeerFeederSource` can be used directly as a `LogScanner`.
303///
304/// We implement `LogScanner` on the *mutable reference* side: since
305/// `PeerFeederSource` is `Arc`-wrapped, we implement on a thin wrapper
306/// struct that holds an `Arc` and a local cursor.
307pub struct PeerScannerAdapter {
308    source: Arc<PeerLogScanner>,
309    cursor_vlsn: u64,
310}
311
312impl PeerScannerAdapter {
313    /// Create a new adapter starting from `start_vlsn`.
314    pub fn new(source: Arc<PeerLogScanner>, start_vlsn: u64) -> Self {
315        Self { source, cursor_vlsn: start_vlsn }
316    }
317}
318
319impl LogScanner for PeerScannerAdapter {
320    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
321        let effective_from = self.cursor_vlsn.max(from_vlsn);
322        let entry = {
323            let mut q = self.source.queue.lock();
324            let mut result = None;
325            while let Some(&(vlsn, _, _)) = q.front() {
326                if vlsn >= effective_from {
327                    result = q.pop_front();
328                    break;
329                }
330                q.pop_front(); // discard stale entries
331            }
332            result
333        };
334        if let Some((vlsn, _, _)) = &entry {
335            self.cursor_vlsn = vlsn + 1;
336        }
337        entry
338    }
339}
340
341// ---------------------------------------------------------------------------
342// PeerFeederRunner
343// ---------------------------------------------------------------------------
344
345/// Streams log entries from a `PeerLogScanner` to a downstream replica.
346///
347/// This is a thin wrapper around `FeederRunner` that uses a
348/// `PeerScannerAdapter` as the log source instead of reading from disk.
349///
350/// Corresponds to 's peer-to-peer feeder path in `FeederReplicaSyncup`.
351pub struct PeerFeederRunner {
352    inner: FeederRunner,
353    source: Arc<PeerLogScanner>,
354    start_vlsn: u64,
355}
356
357impl PeerFeederRunner {
358    /// Create a new peer feeder that streams entries from `source` to
359    /// `channel`, starting at `start_vlsn`.
360    pub fn new(
361        channel: Arc<dyn Channel>,
362        source: Arc<PeerLogScanner>,
363        start_vlsn: u64,
364    ) -> Self {
365        let inner = FeederRunner::new(channel, start_vlsn);
366        Self { inner, source, start_vlsn }
367    }
368
369    /// Run the peer feeder loop.
370    ///
371    /// Streams entries from the `PeerLogScanner` to the downstream replica.
372    /// Returns when the channel is closed or an I/O error occurs.
373    pub fn run(&self) -> Result<()> {
374        let mut adapter =
375            PeerScannerAdapter::new(Arc::clone(&self.source), self.start_vlsn);
376        self.inner.run(&mut adapter)
377    }
378
379    /// Return the last VLSN acknowledged by the downstream replica.
380    pub fn known_replica_vlsn(&self) -> u64 {
381        self.inner.known_replica_vlsn()
382    }
383}
384
385// ---------------------------------------------------------------------------
386// Syncup helpers
387// ---------------------------------------------------------------------------
388
389/// Result of a peer syncup negotiation.
390///
391///  HA, `FeederReplicaSyncup` finds the highest VLSN that is committed
392/// on BOTH the feeder and the replica (the "matchpoint").  The feeder then
393/// streams entries from matchpoint+1 onwards.
394///
395/// We model this as a simple VLSN range comparison: if the peer holds
396/// `[peer_first, peer_last]` and the replica needs `replica_needs` onwards,
397/// we can serve if `peer_first <= replica_needs <= peer_last`.
398#[derive(Debug, Clone, PartialEq, Eq)]
399pub enum SyncupResult {
400    /// The peer holds the needed range; stream from `start_vlsn`.
401    CanServe { start_vlsn: u64 },
402    /// The peer does not hold the needed VLSN; fall back to master or
403    /// network restore.
404    NeedsRestore,
405}
406
407/// Determine whether `peer_range` can serve a replica that needs
408/// log entries starting from `replica_needs`.
409pub fn negotiate_syncup(
410    peer_range: Option<(u64, u64)>,
411    replica_needs: u64,
412) -> SyncupResult {
413    match peer_range {
414        Some((first, last))
415            if first <= replica_needs && replica_needs <= last =>
416        {
417            SyncupResult::CanServe { start_vlsn: replica_needs }
418        }
419        _ => SyncupResult::NeedsRestore,
420    }
421}
422
423// ---------------------------------------------------------------------------
424// PeerFeederService — TCP ServiceHandler backed by a PeerLogScanner
425// ---------------------------------------------------------------------------
426
427/// A [`ServiceHandler`] that streams peer-held log entries to a requesting
428/// downstream replica over the `"PEER_FEEDER"` service.
429///
430/// The service is registered on the `TcpServiceDispatcher` at startup.
431/// When a downstream replica connects, the protocol is:
432///
433/// 1. The downstream sends `[start_vlsn: u64 LE]` (8 bytes) — the first VLSN
434///    it needs.
435/// 2. The server negotiates via `negotiate_syncup()`.
436/// 3. If the range is available, a `PeerFeederRunner` is spawned in a new
437///    thread and streams entries until the channel closes.
438/// 4. If the range is not available, the server responds with
439///    `[NEEDS_RESTORE: u8 = 1]` and closes the connection.
440///
441/// The `PeerLogScanner` (`source`) is populated by the node's own
442/// `ReplicaReceiver` as entries arrive from the master.
443pub struct PeerFeederService {
444    source: Arc<PeerLogScanner>,
445}
446
447impl PeerFeederService {
448    /// Create a new service backed by `source`.
449    pub fn new(source: Arc<PeerLogScanner>) -> Self {
450        Self { source }
451    }
452}
453
454/// Wire-level response codes sent by the server.
455const PEER_FEEDER_CAN_SERVE: u8 = 0;
456const PEER_FEEDER_NEEDS_RESTORE: u8 = 1;
457
458impl ServiceHandler for PeerFeederService {
459    fn service_name(&self) -> &str {
460        PEER_FEEDER_SERVICE_NAME
461    }
462
463    fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
464        use std::time::Duration;
465
466        // 1. Read the 8-byte start_vlsn from the downstream replica.
467        let msg =
468            channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
469                RepError::NetworkError(
470                    "PEER_FEEDER: no start_vlsn received".into(),
471                )
472            })?;
473
474        if msg.len() < 8 {
475            return Err(RepError::NetworkError(format!(
476                "PEER_FEEDER: short handshake ({} bytes)",
477                msg.len()
478            )));
479        }
480        let start_vlsn =
481            u64::from_le_bytes(msg[..8].try_into().expect("slice of 8 bytes"));
482
483        // 2. Negotiate: do we hold the requested VLSN range?
484        let range = self.source.log_range();
485        match negotiate_syncup(range, start_vlsn) {
486            SyncupResult::CanServe { start_vlsn: sv } => {
487                // Tell the downstream it can proceed.
488                channel.send(&[PEER_FEEDER_CAN_SERVE])?;
489
490                // 3. Stream entries in this thread (the dispatcher already
491                //    called us from a per-connection thread).
492                let channel_arc: Arc<dyn Channel> = Arc::from(channel);
493                let runner = PeerFeederRunner::new(
494                    channel_arc,
495                    Arc::clone(&self.source),
496                    sv,
497                );
498                let _ = runner.run();
499                Ok(())
500            }
501            SyncupResult::NeedsRestore => {
502                channel.send(&[PEER_FEEDER_NEEDS_RESTORE])?;
503                Err(RepError::NetworkError(format!(
504                    "PEER_FEEDER: cannot serve vlsn={start_vlsn}, \
505                     range={range:?}"
506                )))
507            }
508        }
509    }
510}
511
512// ---------------------------------------------------------------------------
513// Client-side peer catch-up
514// ---------------------------------------------------------------------------
515
516/// Connect to a peer node's `PEER_FEEDER` service and pull log entries
517/// starting from `start_vlsn`.
518///
519/// This is the client counterpart to [`PeerFeederService`].  It is called
520/// by a replica that is behind and wants to catch up from a peer that holds
521/// the needed VLSN range (rather than routing all traffic through the master).
522///
523/// Protocol (matches `PeerFeederService::handle`):
524///   1. Open a TCP connection and request the `"PEER_FEEDER"` service via
525///      `service_dispatcher::connect_to_service()`.
526///   2. Send `[start_vlsn: u64 LE]`.
527///   3. Read the one-byte response:
528///      - `0` (`PEER_FEEDER_CAN_SERVE`) — peer has the range; proceed.
529///      - `1` (`PEER_FEEDER_NEEDS_RESTORE`) — peer cannot serve; return
530///        `Ok(false)` so the caller can fall back to the master.
531///   4. Start a `ReplicaReceiver` loop on the channel, passing each entry
532///      to `log_writer`.  Returns `Ok(true)` when the peer closes the
533///      channel (i.e. the catch-up is complete).
534///
535/// # Pipelining
536///
537/// `catch_up_from_peer` is intentionally non-async.  Call it from a
538/// dedicated thread per peer.  To pipeline catch-up from multiple peers
539/// simultaneously, spawn one thread per peer (e.g. from a `ThreadPool`).
540/// The [`MultiPeerCatchUp`] helper below manages this.
541pub fn catch_up_from_peer(
542    peer_addr: std::net::SocketAddr,
543    start_vlsn: u64,
544    log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
545) -> Result<bool> {
546    use crate::net::service_dispatcher::connect_to_service;
547    use crate::stream::replica_stream::ReplicaReceiver;
548    use std::sync::Arc;
549    use std::time::Duration;
550
551    // Connect and request the PEER_FEEDER service.
552    let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
553
554    // Send start_vlsn.
555    channel.send(&start_vlsn.to_le_bytes())?;
556
557    // Read the one-byte response.
558    let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
559        RepError::NetworkError("no response from peer feeder".into())
560    })?;
561    if resp.is_empty() {
562        return Err(RepError::NetworkError(
563            "empty response from peer feeder".into(),
564        ));
565    }
566    match resp[0] {
567        PEER_FEEDER_CAN_SERVE => {}
568        PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
569        other => {
570            return Err(RepError::ProtocolError(format!(
571                "peer feeder unknown response byte: {other:#x}"
572            )));
573        }
574    }
575
576    // Run the replica receive loop.
577    let channel_arc: Arc<dyn Channel> = Arc::from(channel);
578    let receiver = ReplicaReceiver::new(channel_arc);
579    receiver.run(log_writer)?;
580
581    Ok(true)
582}
583
584/// Pipelined catch-up from multiple peer nodes simultaneously.
585///
586/// Spawns one thread per peer in `peers` and waits for all to finish (or
587/// for the first to succeed).  Returns the name of the peer that supplied
588/// the entries, or `None` if no peer could serve the range.
589///
590/// The `log_writer_factory` closure is called once per thread to produce a
591/// per-thread `LogWriter`.  The factory must be `Send + Sync`.
592pub struct MultiPeerCatchUp {
593    peers: Vec<(String, std::net::SocketAddr)>,
594    start_vlsn: u64,
595}
596
597impl MultiPeerCatchUp {
598    /// Create a new multi-peer catch-up request.
599    ///
600    /// `peers` is a list of `(node_name, socket_addr)` pairs to try.
601    pub fn new(
602        peers: Vec<(String, std::net::SocketAddr)>,
603        start_vlsn: u64,
604    ) -> Self {
605        Self { peers, start_vlsn }
606    }
607
608    /// Run pipelined catch-up.
609    ///
610    /// Spawns one thread per peer and waits for the first to succeed.
611    /// Each thread calls `catch_up_from_peer`; the winning thread's entries
612    /// are applied through `make_writer()`.  Other threads are joined once
613    /// the first succeeds.
614    ///
615    /// Returns the name of the first peer that successfully served the range,
616    /// or `None` if all peers declined.
617    pub fn run<F, W>(self, make_writer: F) -> Option<String>
618    where
619        F: Fn() -> W + Send + Sync + 'static,
620        W: crate::stream::replica_stream::LogWriter + Send + 'static,
621    {
622        use std::sync::atomic::{AtomicBool, Ordering};
623        let make_writer = std::sync::Arc::new(make_writer);
624        let done = std::sync::Arc::new(AtomicBool::new(false));
625        let winner: std::sync::Arc<noxu_sync::Mutex<Option<String>>> =
626            std::sync::Arc::new(noxu_sync::Mutex::new(None));
627
628        let mut handles = Vec::new();
629
630        for (name, addr) in self.peers {
631            let make_writer = std::sync::Arc::clone(&make_writer);
632            let done = std::sync::Arc::clone(&done);
633            let winner = std::sync::Arc::clone(&winner);
634            let start_vlsn = self.start_vlsn;
635            let name_clone = name.clone();
636
637            let handle = std::thread::Builder::new()
638                .name(format!("noxu-peer-catchup-{}", name))
639                .spawn(move || {
640                    if done.load(Ordering::Acquire) {
641                        return; // another peer already won
642                    }
643                    let mut writer = make_writer();
644                    match catch_up_from_peer(addr, start_vlsn, &mut writer) {
645                        Ok(true) => {
646                            if !done.swap(true, Ordering::AcqRel) {
647                                *winner.lock() = Some(name_clone);
648                            }
649                        }
650                        Ok(false) => {
651                            log::debug!(
652                                "peer '{}' cannot serve vlsn={start_vlsn}",
653                                name
654                            );
655                        }
656                        Err(e) => {
657                            log::warn!(
658                                "catch-up from peer '{}' failed: {e}",
659                                name
660                            );
661                        }
662                    }
663                })
664                .expect("failed to spawn peer catch-up thread");
665
666            handles.push(handle);
667        }
668
669        for h in handles {
670            let _ = h.join();
671        }
672
673        winner.lock().clone()
674    }
675}
676
677// ---------------------------------------------------------------------------
678// Tests
679// ---------------------------------------------------------------------------
680
681#[cfg(test)]
682mod tests {
683    use super::*;
684    use crate::net::channel::LocalChannelPair;
685    use std::time::Duration;
686
687    // -----------------------------------------------------------------------
688    // PeerLogScanner unit tests
689    // -----------------------------------------------------------------------
690
691    #[test]
692    fn test_peer_scanner_push_and_log_range() {
693        let scanner = PeerLogScanner::new();
694        assert!(scanner.is_empty());
695        assert!(scanner.log_range().is_none());
696
697        scanner.push(5, 1, b"entry5".to_vec());
698        scanner.push(6, 1, b"entry6".to_vec());
699        scanner.push(10, 1, b"entry10".to_vec());
700
701        assert_eq!(scanner.len(), 3);
702        assert_eq!(scanner.log_range(), Some((5, 10)));
703    }
704
705    #[test]
706    fn test_peer_scanner_next_entry_in_order() {
707        let mut scanner = PeerLogScanner::new();
708        for vlsn in [3u64, 4, 5, 6, 7] {
709            scanner.push(vlsn, 1, vlsn.to_le_bytes().to_vec());
710        }
711
712        // Ask from_vlsn=4 — should skip 3 and return 4, 5, 6, 7.
713        let mut results = Vec::new();
714        while let Some((vlsn, _, _)) = scanner.next_entry(4) {
715            results.push(vlsn);
716        }
717        assert_eq!(results, vec![4, 5, 6, 7]);
718    }
719
720    #[test]
721    fn test_peer_scanner_skips_stale_entries() {
722        let mut scanner = PeerLogScanner::new();
723        for v in [1u64, 2, 3, 10, 11] {
724            scanner.push(v, 1, vec![v as u8]);
725        }
726        // Ask from 10 — entries 1, 2, 3 are stale.
727        let entry = scanner.next_entry(10);
728        assert_eq!(entry.map(|(v, _, _)| v), Some(10));
729    }
730
731    #[test]
732    fn test_peer_scanner_empty_returns_none() {
733        let mut scanner = PeerLogScanner::new();
734        assert!(scanner.next_entry(1).is_none());
735    }
736
737    // -----------------------------------------------------------------------
738    // PeerScannerAdapter unit tests
739    // -----------------------------------------------------------------------
740
741    #[test]
742    fn test_peer_scanner_adapter_cursor_advances() {
743        let source = Arc::new(PeerLogScanner::new());
744        for v in [1u64, 2, 3, 4, 5] {
745            source.push(v, 1, vec![v as u8]);
746        }
747
748        let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 1);
749        let mut seen = Vec::new();
750        while let Some((v, _, _)) = adapter.next_entry(1) {
751            seen.push(v);
752        }
753        assert_eq!(seen, vec![1, 2, 3, 4, 5]);
754    }
755
756    // -----------------------------------------------------------------------
757    // PeerFeederRunner integration test
758    // -----------------------------------------------------------------------
759
760    #[test]
761    fn test_peer_feeder_runner_streams_to_replica() {
762        let source = Arc::new(PeerLogScanner::new());
763        for v in [10u64, 11, 12, 13, 14] {
764            source.push(v, 1, format!("payload-{v}").into_bytes());
765        }
766
767        let pair = LocalChannelPair::new();
768        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
769        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
770
771        // Receiver: collect all frames.
772        let recv_handle = {
773            let receiver = Arc::clone(&receiver);
774            std::thread::spawn(move || {
775                let mut vlsns = Vec::new();
776                // Expect 5 frames then a timeout.
777                for _ in 0..5 {
778                    let frame = receiver
779                        .receive(Duration::from_secs(5))
780                        .unwrap()
781                        .unwrap();
782                    let vlsn =
783                        u64::from_le_bytes(frame[0..8].try_into().unwrap());
784                    vlsns.push(vlsn);
785                    // Send ack.
786                    let _ = receiver.send(&vlsn.to_le_bytes());
787                }
788                vlsns
789            })
790        };
791
792        let runner =
793            PeerFeederRunner::new(Arc::clone(&sender), Arc::clone(&source), 10);
794        let sender_clone = Arc::clone(&sender);
795        let run_handle = std::thread::spawn(move || {
796            let _ = runner.run();
797        });
798
799        // Wait for receiver to collect all 5 frames.
800        let vlsns = recv_handle.join().unwrap();
801        assert_eq!(vlsns, vec![10, 11, 12, 13, 14]);
802
803        sender_clone.close().unwrap();
804        let _ = run_handle.join();
805    }
806
807    // -----------------------------------------------------------------------
808    // negotiate_syncup tests
809    // -----------------------------------------------------------------------
810
811    #[test]
812    fn test_negotiate_syncup_can_serve() {
813        assert_eq!(
814            negotiate_syncup(Some((5, 20)), 10),
815            SyncupResult::CanServe { start_vlsn: 10 }
816        );
817        // Exact boundary.
818        assert_eq!(
819            negotiate_syncup(Some((10, 10)), 10),
820            SyncupResult::CanServe { start_vlsn: 10 }
821        );
822    }
823
824    #[test]
825    fn test_negotiate_syncup_needs_restore_too_early() {
826        // Replica needs VLSN 3 but peer only has [5, 20].
827        assert_eq!(
828            negotiate_syncup(Some((5, 20)), 3),
829            SyncupResult::NeedsRestore
830        );
831    }
832
833    #[test]
834    fn test_negotiate_syncup_needs_restore_too_late() {
835        // Replica needs VLSN 25 but peer only has [5, 20].
836        assert_eq!(
837            negotiate_syncup(Some((5, 20)), 25),
838            SyncupResult::NeedsRestore
839        );
840    }
841
842    #[test]
843    fn test_negotiate_syncup_no_range() {
844        // Peer has no log range (just joined or restoring).
845        assert_eq!(negotiate_syncup(None, 10), SyncupResult::NeedsRestore);
846    }
847
848    // -----------------------------------------------------------------------
849    // GroupService CBVLSN integration
850    // -----------------------------------------------------------------------
851
852    #[test]
853    fn test_group_service_cbvlsn_tracks_minimum() {
854        use crate::group_service::{GroupService, NodeInfo};
855        use crate::node_type::NodeType;
856        use std::time::Instant;
857
858        let gs = GroupService::new("test_group".to_string());
859
860        // Add 3 electable nodes.
861        for (name, port) in [("n1", 5001u16), ("n2", 5002), ("n3", 5003)] {
862            gs.add_node(NodeInfo {
863                name: name.to_string(),
864                node_type: NodeType::Electable,
865                host: "localhost".to_string(),
866                port,
867                node_id: port as u32,
868                joined_at: Instant::now(),
869                last_seen: Instant::now(),
870                is_active: true,
871                known_vlsn: 0,
872                log_range: None,
873                read_capacity_pct: 100,
874                write_capacity_pct: 100,
875                latency_hint_ms: 1,
876            })
877            .unwrap();
878        }
879
880        // Initially all known_vlsn = 0 → CBVLSN = 0.
881        assert_eq!(gs.get_cbvlsn(), 0);
882
883        // Update n1 and n2 but not n3 → CBVLSN = min(50, 40, 0) = 0.
884        gs.update_node_vlsn("n1", 50);
885        gs.update_node_vlsn("n2", 40);
886        assert_eq!(gs.get_cbvlsn(), 0, "n3 still at 0, CBVLSN must be 0");
887
888        // Now n3 also updates → CBVLSN = min(50, 40, 30) = 30.
889        gs.update_node_vlsn("n3", 30);
890        assert_eq!(gs.get_cbvlsn(), 30);
891
892        // n3 advances → min(50, 40, 45) = 40.
893        gs.update_node_vlsn("n3", 45);
894        assert_eq!(gs.get_cbvlsn(), 40);
895    }
896
897    #[test]
898    fn test_group_service_cbvlsn_monotone_nondecreasing() {
899        use crate::group_service::{GroupService, NodeInfo};
900        use crate::node_type::NodeType;
901        use std::time::Instant;
902
903        let gs = GroupService::new("cbvlsn_monotone".to_string());
904
905        for (name, port) in [("a", 5001u16), ("b", 5002)] {
906            gs.add_node(NodeInfo {
907                name: name.to_string(),
908                node_type: NodeType::Electable,
909                host: "localhost".to_string(),
910                port,
911                node_id: port as u32,
912                joined_at: Instant::now(),
913                last_seen: Instant::now(),
914                is_active: true,
915                known_vlsn: 0,
916                log_range: None,
917                read_capacity_pct: 100,
918                write_capacity_pct: 100,
919                latency_hint_ms: 1,
920            })
921            .unwrap();
922        }
923
924        // CBVLSN must never decrease.
925        let mut prev = 0u64;
926        for (na, va, nb, vb) in [
927            ("a", 10u64, "b", 5u64),
928            ("a", 20, "b", 15),
929            ("a", 25, "b", 22),
930            ("a", 30, "b", 28),
931        ] {
932            gs.update_node_vlsn(na, va);
933            gs.update_node_vlsn(nb, vb);
934            let cbvlsn = gs.get_cbvlsn();
935            assert!(
936                cbvlsn >= prev,
937                "CBVLSN must not decrease: was {prev}, now {cbvlsn}"
938            );
939            prev = cbvlsn;
940        }
941    }
942
943    #[test]
944    fn test_group_service_find_peers_with_vlsn() {
945        use crate::group_service::{GroupService, NodeInfo};
946        use crate::node_type::NodeType;
947        use std::time::Instant;
948
949        let gs = GroupService::new("peer_select".to_string());
950
951        // Node a: holds [1, 100]
952        // Node b: holds [50, 200]
953        // Node c: no range
954        for (name, port, range) in [
955            ("a", 5001u16, Some((1u64, 100u64))),
956            ("b", 5002, Some((50, 200))),
957            ("c", 5003, None),
958        ] {
959            let mut info = NodeInfo {
960                name: name.to_string(),
961                node_type: NodeType::Electable,
962                host: "localhost".to_string(),
963                port,
964                node_id: port as u32,
965                joined_at: Instant::now(),
966                last_seen: Instant::now(),
967                is_active: true,
968                known_vlsn: 0,
969                log_range: range,
970                read_capacity_pct: 100,
971                write_capacity_pct: 100,
972                latency_hint_ms: 1,
973            };
974            // Set last_seen differently so we can check sort order.
975            info.last_seen = Instant::now()
976                - std::time::Duration::from_millis(port as u64 * 10);
977            gs.add_node(info).unwrap();
978        }
979
980        // VLSN 75: only a and b hold it.
981        let peers = gs.find_peers_with_vlsn(75);
982        assert!(peers.contains(&"a".to_string()));
983        assert!(peers.contains(&"b".to_string()));
984        assert!(!peers.contains(&"c".to_string()));
985
986        // VLSN 150: only b holds it.
987        let peers = gs.find_peers_with_vlsn(150);
988        assert_eq!(peers, vec!["b".to_string()]);
989
990        // VLSN 201: nobody holds it.
991        assert!(gs.find_peers_with_vlsn(201).is_empty());
992    }
993
994    #[test]
995    fn test_group_service_update_log_range() {
996        use crate::group_service::{GroupService, NodeInfo};
997        use crate::node_type::NodeType;
998        use std::time::Instant;
999
1000        let gs = GroupService::new("log_range_test".to_string());
1001        gs.add_node(NodeInfo {
1002            name: "r1".to_string(),
1003            node_type: NodeType::Electable,
1004            host: "localhost".to_string(),
1005            port: 5001,
1006            node_id: 1,
1007            joined_at: Instant::now(),
1008            last_seen: Instant::now(),
1009            is_active: true,
1010            known_vlsn: 0,
1011            log_range: None,
1012            read_capacity_pct: 100,
1013            write_capacity_pct: 100,
1014            latency_hint_ms: 1,
1015        })
1016        .unwrap();
1017
1018        // Initially no range.
1019        assert!(gs.get_node("r1").unwrap().log_range.is_none());
1020
1021        // Update range.
1022        gs.update_node_log_range("r1", 100, 500);
1023        assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 500)));
1024
1025        // Extend range.
1026        gs.update_node_log_range("r1", 100, 800);
1027        assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 800)));
1028    }
1029
1030    // -----------------------------------------------------------------------
1031    // PeerFeederService::handle paths
1032    // -----------------------------------------------------------------------
1033
1034    #[test]
1035    fn test_peer_feeder_service_can_serve() {
1036        use crate::net::channel::LocalChannelPair;
1037
1038        // Source has range [10, 20]. Client requests start_vlsn=15
1039        // (in range). Service should respond with CAN_SERVE then
1040        // stream entries until close.
1041        let source = Arc::new(PeerLogScanner::new());
1042        for v in 10u64..=20 {
1043            source.push(v, 0, format!("e{}", v).into_bytes());
1044        }
1045        let svc = PeerFeederService::new(Arc::clone(&source));
1046
1047        let pair = LocalChannelPair::new();
1048        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1049        let client_ch = pair.channel_b;
1050
1051        // Client sends start_vlsn=15.
1052        client_ch.send(&15u64.to_le_bytes()).unwrap();
1053
1054        // Service runs in another thread so we can observe the
1055        // streaming side.
1056        let svc_handle = std::thread::spawn(move || svc.handle(server_ch));
1057
1058        // Read the 1-byte response.
1059        let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1060        assert_eq!(
1061            resp,
1062            vec![PEER_FEEDER_CAN_SERVE],
1063            "service should reply CAN_SERVE for in-range start_vlsn"
1064        );
1065
1066        // Drain a few frames then close to terminate the runner.
1067        let mut frames = 0;
1068        while let Ok(Some(_)) = client_ch.receive(Duration::from_millis(80)) {
1069            frames += 1;
1070            if frames >= 3 {
1071                break;
1072            }
1073        }
1074        // Close client side so runner returns.
1075        client_ch.close().unwrap();
1076        let _ = svc_handle.join().unwrap();
1077        assert!(frames >= 1, "service must have streamed at least one frame");
1078    }
1079
1080    #[test]
1081    fn test_peer_feeder_service_needs_restore() {
1082        use crate::net::channel::LocalChannelPair;
1083
1084        // Source has range [10, 20]. Client requests start_vlsn=5
1085        // (too early). Service replies NEEDS_RESTORE and errors.
1086        let source = Arc::new(PeerLogScanner::new());
1087        for v in 10u64..=20 {
1088            source.push(v, 0, vec![]);
1089        }
1090        let svc = PeerFeederService::new(Arc::clone(&source));
1091
1092        let pair = LocalChannelPair::new();
1093        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1094        let client_ch = pair.channel_b;
1095
1096        client_ch.send(&5u64.to_le_bytes()).unwrap();
1097
1098        let r = svc.handle(server_ch);
1099        assert!(r.is_err(), "service must return Err on NEEDS_RESTORE");
1100        let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1101        assert_eq!(
1102            resp,
1103            vec![PEER_FEEDER_NEEDS_RESTORE],
1104            "service should reply NEEDS_RESTORE for out-of-range start_vlsn"
1105        );
1106    }
1107
1108    #[test]
1109    fn test_peer_feeder_service_short_handshake_errors() {
1110        use crate::net::channel::LocalChannelPair;
1111
1112        let source = Arc::new(PeerLogScanner::new());
1113        let svc = PeerFeederService::new(Arc::clone(&source));
1114
1115        let pair = LocalChannelPair::new();
1116        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1117        let client_ch = pair.channel_b;
1118
1119        // Send only 4 bytes — too short to be a valid u64
1120        // start_vlsn. Service must Err with "short handshake".
1121        client_ch.send(&[0u8; 4]).unwrap();
1122
1123        let r = svc.handle(server_ch);
1124        assert!(r.is_err(), "short handshake must error");
1125        let msg = format!("{}", r.err().unwrap());
1126        assert!(
1127            msg.contains("short handshake"),
1128            "expected 'short handshake' in error, got: {msg}"
1129        );
1130    }
1131
1132    #[test]
1133    fn test_peer_feeder_service_no_handshake_errors() {
1134        use crate::net::channel::LocalChannelPair;
1135
1136        let source = Arc::new(PeerLogScanner::new());
1137        let svc = PeerFeederService::new(Arc::clone(&source));
1138
1139        let pair = LocalChannelPair::new();
1140        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1141        // Drop client side so receive returns None (no message).
1142        drop(pair.channel_b);
1143
1144        let r = svc.handle(server_ch);
1145        assert!(r.is_err(), "no-handshake must error");
1146    }
1147
1148    #[test]
1149    fn test_peer_feeder_service_name() {
1150        let source = Arc::new(PeerLogScanner::new());
1151        let svc = PeerFeederService::new(source);
1152        assert_eq!(
1153            svc.service_name(),
1154            PEER_FEEDER_SERVICE_NAME,
1155            "service_name must match the protocol const"
1156        );
1157    }
1158
1159    // -----------------------------------------------------------------------
1160    // PeerFeederSource and adapter constructors
1161    // -----------------------------------------------------------------------
1162
1163    #[test]
1164    fn test_peer_feeder_source_default_and_clone_scanner() {
1165        let src1 = PeerFeederSource::new();
1166        // default() and new() produce the same shape.
1167        let src2 = PeerFeederSource::default();
1168        let s1 = src1.clone_scanner();
1169        let s2 = src2.clone_scanner();
1170        // Distinct underlying scanners (each PeerFeederSource owns
1171        // its own Arc).
1172        s1.push(1, 0, b"a".to_vec());
1173        assert_eq!(s1.len(), 1);
1174        assert_eq!(s2.len(), 0);
1175    }
1176
1177    #[test]
1178    fn test_peer_log_scanner_default_is_empty() {
1179        let s = PeerLogScanner::default();
1180        assert!(s.is_empty());
1181        assert_eq!(s.len(), 0);
1182        assert!(s.log_range().is_none());
1183    }
1184
1185    #[test]
1186    fn test_peer_feeder_runner_known_replica_vlsn_initial_zero() {
1187        use crate::net::channel::LocalChannelPair;
1188
1189        let pair = LocalChannelPair::new();
1190        let channel: Arc<dyn Channel> = Arc::new(pair.channel_a);
1191        let source = Arc::new(PeerLogScanner::new());
1192        let runner = PeerFeederRunner::new(channel, source, 1);
1193        assert_eq!(runner.known_replica_vlsn(), 0);
1194    }
1195
1196    // -----------------------------------------------------------------------
1197    // PeerScannerAdapter: stale-entry skipping
1198    // -----------------------------------------------------------------------
1199
1200    #[test]
1201    fn test_peer_scanner_adapter_skips_stale_via_pop_front() {
1202        // After a known_replica_vlsn advance, push some entries
1203        // that are below the new floor — the adapter should
1204        // discard them via pop_front().
1205        let source = Arc::new(PeerLogScanner::new());
1206        for v in 1u64..=5 {
1207            source.push(v, 0, vec![]);
1208        }
1209        let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 3);
1210        // First call returns vlsn=3, skipping 1 and 2.
1211        let r = adapter.next_entry(3);
1212        assert!(r.is_some());
1213        let (vlsn, _, _) = r.unwrap();
1214        assert_eq!(vlsn, 3);
1215        // 4 next.
1216        let (vlsn, _, _) = adapter.next_entry(4).unwrap();
1217        assert_eq!(vlsn, 4);
1218    }
1219}