Skip to main content

noxu_rep/stream/
peer_feeder.rs

1//! Peer-to-peer log distribution — JE's single feeder mechanism.
2//!
3//! `PeerFeederService` is this node's service-side feeder accept loop —
4//! Noxu's [`com.sleepycat.je.rep.impl.node.FeederManager`]: it is registered
5//! once per node on the TCP dispatcher, and on every inbound connection it
6//! runs ONE [`FeederRunner`] (JE [`com.sleepycat.je.rep.impl.node.Feeder`])
7//! driving a single [`LogScanner`] (JE
8//! [`com.sleepycat.je.rep.stream.FeederSource`]).
9//!
10//! ## One mechanism, master OR replica (JE fidelity)
11//!
12//! ```text
13//!   master ──PEER_FEEDER──► R1 ──PEER_FEEDER──► R2
14//!     │  FeederRunner+EnvironmentLogScanner   │  FeederRunner+EnvironmentLogScanner
15//!     └── reads master's WAL ─────────────────┴── reads R1's OWN WAL
16//! ```
17//!
18//! JE `FeederSource.java` documents the source as "a real Master OR a
19//! Replica in a Replica chain that is replaying log records it received
20//! from some other source".  JE `Feeder.initMasterFeederSource(startVLSN)`
21//! builds `new MasterFeederSource(repImpl, repNode.getVLSNIndex(), …)`
22//! regardless of node role, and its output loop pulls
23//! `feederSource.getWireRecord(feederVLSN, heartbeatMs)`
24//! (`Feeder.java:1282`).  Noxu mirrors this exactly:
25//!
26//! | JE                                   | Noxu                                  |
27//! |--------------------------------------|---------------------------------------|
28//! | `FeederManager` (per-node accept)    | [`PeerFeederService`] (per-node accept) |
29//! | `Feeder` thread + output loop        | [`FeederRunner`] (`run`)              |
30//! | `FeederSource` / `MasterFeederSource`| [`LogScanner`] / [`EnvironmentLogScanner`] |
31//! | `FeederReader` (VLSNIndex+WAL cursor)| [`EnvironmentLogScanner`]             |
32//! | `getWireRecord(vlsn, waitTime)`      | [`LogScanner::next_entry`]            |
33//!
34//! A replica cascading to a downstream replica is the IDENTICAL mechanism as
35//! the master feeding a replica — it just reads from the replica's own WAL
36//! (which carries the entries it received + persisted via
37//! [`crate::stream::replica_stream::EnvironmentLogWriter::log_with_vlsn`]).
38//! Both the master ([`ReplicatedEnvironment::become_master`]) and a
39//! cascading replica ([`ReplicatedEnvironment::become_replica`] with
40//! `cascade_feeding`) register [`PeerFeederService::with_wal_source`], so
41//! both serve downstream via `FeederRunner + EnvironmentLogScanner`.
42//!
43//! [`ReplicatedEnvironment::become_master`]: crate::ReplicatedEnvironment::become_master
44//! [`ReplicatedEnvironment::become_replica`]: crate::ReplicatedEnvironment::become_replica
45//!
46//! ## In-memory queue — non-JE convenience, NOT a production feed
47//!
48//! [`PeerLogScanner`] is a `LogScanner` backed by an in-memory
49//! `VecDeque<(vlsn, entry_type, payload)>`.  It exists ONLY as a fallback
50//! for nodes that have no live `EnvironmentImpl` wired (no WAL to scan) —
51//! e.g. the `replicate_entry` test convenience.  It is NEVER on a
52//! production durability path: every node opened through `with_environment`
53//! registers a WAL source and serves from the WAL.  It still feeds through
54//! the SAME [`FeederRunner`] loop ([`PeerScannerAdapter`] is just another
55//! `LogScanner`), so there is no second feeder mechanism — only a second,
56//! non-JE, non-durable *source* for the env-less case.
57//!
58//! ## CBVLSN
59//!
60//! The Cleaner Barrier VLSN is the global minimum `known_vlsn` across all
61//! active electable replicas.  The log cleaner uses this to decide which
62//! log files it is safe to reclaim.  It is maintained by `GroupService`
63//! and updated on every heartbeat / ack.
64//!
65//! Corresponds to `FeederReplicaSyncup`, `LocalCBVLSNUpdater`, and
66//! `RepGroupImpl.getCBVLSN()` in the implementation.
67
68use std::collections::VecDeque;
69use std::sync::Arc;
70
71use noxu_sync::Mutex;
72
73use crate::error::{RepError, Result};
74use crate::net::channel::Channel;
75use crate::net::service_dispatcher::ServiceHandler;
76use crate::stream::feeder::{EnvironmentLogScanner, FeederRunner, LogScanner};
77
78/// Service name registered with `TcpServiceDispatcher` for peer log feeds.
79pub const PEER_FEEDER_SERVICE_NAME: &str = "PEER_FEEDER";
80
81// ---------------------------------------------------------------------------
82// PeerLogScanner
83// ---------------------------------------------------------------------------
84
85/// Default maximum number of entries retained in a `PeerLogScanner`
86/// in-memory queue.
87///
88/// Without a bound, every replicated entry stays in RAM until it is
89/// consumed by a downstream peer. A long-running replica with no
90/// downstream consumers therefore accumulates one VecDeque entry per
91/// replicated record forever (audit finding F10).
92///
93/// 16 384 entries is enough headroom for the slowest expected
94/// downstream peer to drain while keeping resident memory bounded
95/// (assuming sub-MiB entries, this caps the queue at ~16 GiB worst
96/// case; the byte-size cap is the harder bound).
97pub const DEFAULT_PEER_SCANNER_MAX_ENTRIES: usize = 16_384;
98
99/// Default maximum total payload size, in bytes, retained in a
100/// `PeerLogScanner` queue.  Once the cumulative payload bytes exceed
101/// this threshold, the oldest entries are evicted on each `push`.
102///
103/// 64 MiB matches the channel's `MAX_FRAME_PAYLOAD` and is large
104/// enough to absorb a large in-flight transaction without being
105/// large enough to OOM a small replica box.
106pub const DEFAULT_PEER_SCANNER_MAX_BYTES: usize = 64 * 1024 * 1024;
107
108/// A [`LogScanner`] backed by an in-memory queue of `(vlsn, type, payload)`
109/// entries.
110///
111/// Entries are pushed by the `ReplicaReceiver` as they arrive from the
112/// master (or from another peer).  A [`FeederRunner`] driving a
113/// [`PeerScannerAdapter`] consumes entries from this queue and streams them
114/// to a downstream replica (the in-memory, env-less convenience source).
115///
116/// ## Bounded memory (F10)
117///
118/// The queue has two configurable bounds:
119///
120/// - `max_entries`: maximum entry count (default
121///   [`DEFAULT_PEER_SCANNER_MAX_ENTRIES`]).
122/// - `max_bytes`: maximum cumulative payload size in bytes
123///   (default [`DEFAULT_PEER_SCANNER_MAX_BYTES`]).
124///
125/// On `push`, if either bound is exceeded the oldest entries are
126/// evicted from the front of the queue until both bounds are
127/// satisfied.  The evicted entries are no longer available for peer
128/// streaming through this scanner; downstream peers that fall behind
129/// the eviction window must catch up via the on-disk
130/// `EnvironmentLogScanner` or via network restore.  This matches
131/// HA semantics where peer-to-peer log distribution is
132/// best-effort and the on-disk log is the durable source.
133///
134/// Closes finding F10 of the 2026 review.
135///
136/// Thread safety: the queue is protected by a `Mutex` so that the receiver
137/// thread (writer) and the feeder thread (reader) can operate concurrently.
138pub struct PeerLogScanner {
139    queue: Mutex<VecDeque<(u64, u8, Vec<u8>)>>,
140    /// The VLSN range currently held in `queue`: `(first, last)`.
141    /// Updated lazily on `push`; used by `GroupService` callers to determine
142    /// whether this scanner can serve a given VLSN.
143    first_vlsn: Mutex<u64>,
144    last_vlsn: Mutex<u64>,
145    /// Maximum entry count before oldest-evicting begins.
146    max_entries: usize,
147    /// Maximum cumulative payload bytes before oldest-evicting begins.
148    max_bytes: usize,
149    /// Current cumulative payload bytes (updated on every push/evict).
150    current_bytes: Mutex<usize>,
151    /// Cumulative count of entries evicted by the F10 bound (for
152    /// observability and tests).
153    evicted_count: std::sync::atomic::AtomicU64,
154}
155
156impl PeerLogScanner {
157    /// Create an empty scanner with the default F10 bounds.
158    pub fn new() -> Self {
159        Self::with_capacity(
160            DEFAULT_PEER_SCANNER_MAX_ENTRIES,
161            DEFAULT_PEER_SCANNER_MAX_BYTES,
162        )
163    }
164
165    /// Create an empty scanner with explicit bounds.
166    ///
167    /// `max_entries` and `max_bytes` are both honoured; whichever is
168    /// breached first triggers oldest-evicting on subsequent `push`
169    /// calls.  Passing `usize::MAX` disables the corresponding bound
170    /// (not recommended in production).
171    pub fn with_capacity(max_entries: usize, max_bytes: usize) -> Self {
172        Self {
173            queue: Mutex::new(VecDeque::new()),
174            first_vlsn: Mutex::new(0),
175            last_vlsn: Mutex::new(0),
176            max_entries: max_entries.max(1),
177            max_bytes: max_bytes.max(1),
178            current_bytes: Mutex::new(0),
179            evicted_count: std::sync::atomic::AtomicU64::new(0),
180        }
181    }
182
183    /// Push a log entry into the scanner's queue.
184    ///
185    /// Called by the `ReplicaReceiver` each time an entry is applied.
186    /// Entries are expected to be pushed in VLSN order, but this method is
187    /// not enforcing: every entry is appended to the queue unconditionally
188    /// and the cached `(first_vlsn, last_vlsn)` range is widened to cover
189    /// the new VLSN. Out-of-order or duplicate entries are filtered later
190    /// by [`LogScanner::next_entry`](crate::stream::feeder::LogScanner),
191    /// which skips entries with `vlsn < from_vlsn`.
192    ///
193    /// **F10 bound**: after the new entry is appended, if the queue
194    /// exceeds either `max_entries` or `max_bytes`, the oldest entries
195    /// are evicted from the front until both bounds are satisfied. The
196    /// retained `first_vlsn` is updated to the new front-of-queue VLSN
197    /// so downstream peers that ask for an evicted VLSN range observe
198    /// `log_range().first > from_vlsn` and know they must catch up via
199    /// the durable log.
200    pub fn push(&self, vlsn: u64, entry_type: u8, payload: Vec<u8>) {
201        let payload_len = payload.len();
202        {
203            let mut last = self.last_vlsn.lock();
204            if vlsn > *last {
205                *last = vlsn;
206            }
207        }
208        let mut q = self.queue.lock();
209        let mut current_bytes = self.current_bytes.lock();
210        // Append unconditionally.
211        q.push_back((vlsn, entry_type, payload));
212        *current_bytes += payload_len;
213
214        // F10 eviction: drop oldest until both bounds are honoured.
215        let mut evicted = 0u64;
216        while q.len() > self.max_entries || *current_bytes > self.max_bytes {
217            if let Some((_evicted_vlsn, _ty, evicted_payload)) = q.pop_front() {
218                *current_bytes =
219                    current_bytes.saturating_sub(evicted_payload.len());
220                evicted += 1;
221            } else {
222                break;
223            }
224        }
225        if evicted > 0 {
226            self.evicted_count
227                .fetch_add(evicted, std::sync::atomic::Ordering::Relaxed);
228        }
229        // Refresh first_vlsn from the (possibly mutated) queue front.
230        let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
231        drop(current_bytes);
232        drop(q);
233        *self.first_vlsn.lock() = new_first;
234    }
235
236    /// Cumulative number of entries dropped by the F10 bound since
237    /// scanner construction.  Useful for monitoring whether downstream
238    /// peers are keeping up.
239    pub fn evicted_count(&self) -> u64 {
240        self.evicted_count.load(std::sync::atomic::Ordering::Relaxed)
241    }
242
243    /// Current cumulative payload size in bytes (live snapshot).
244    pub fn current_bytes(&self) -> usize {
245        *self.current_bytes.lock()
246    }
247
248    /// Return the VLSN range currently held in this scanner.
249    ///
250    /// Returns `None` if the scanner is empty (no entries pushed yet).
251    pub fn log_range(&self) -> Option<(u64, u64)> {
252        let first = *self.first_vlsn.lock();
253        let last = *self.last_vlsn.lock();
254        if first == 0 { None } else { Some((first, last)) }
255    }
256
257    /// Return the number of entries currently queued.
258    pub fn len(&self) -> usize {
259        self.queue.lock().len()
260    }
261
262    /// Returns true if no entries are queued.
263    pub fn is_empty(&self) -> bool {
264        self.queue.lock().is_empty()
265    }
266}
267
268impl Default for PeerLogScanner {
269    fn default() -> Self {
270        Self::new()
271    }
272}
273
274impl LogScanner for PeerLogScanner {
275    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
276        let mut q = self.queue.lock();
277        // Skip entries with VLSN < from_vlsn (they were already seen by the
278        // downstream replica). Track byte-budget reduction so the F10
279        // bound stays accurate.
280        let mut current_bytes = self.current_bytes.lock();
281        while let Some(&(vlsn, _, _)) = q.front() {
282            if vlsn >= from_vlsn {
283                let entry = q.pop_front();
284                if let Some((_, _, ref payload)) = entry {
285                    *current_bytes =
286                        current_bytes.saturating_sub(payload.len());
287                }
288                let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
289                drop(current_bytes);
290                drop(q);
291                *self.first_vlsn.lock() = new_first;
292                return entry;
293            }
294            if let Some((_, _, evicted_payload)) = q.pop_front() {
295                *current_bytes =
296                    current_bytes.saturating_sub(evicted_payload.len());
297            }
298        }
299        let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
300        drop(current_bytes);
301        drop(q);
302        *self.first_vlsn.lock() = new_first;
303        None
304    }
305}
306
307// ---------------------------------------------------------------------------
308// PeerFeederSource — Arc-wrapped PeerLogScanner that implements LogScanner
309// ---------------------------------------------------------------------------
310
311/// A shared, `Arc`-wrapped `PeerLogScanner` that can be passed between
312/// threads.
313///
314/// The `ReplicaReceiver` holds an `Arc<PeerFeederSource>` and calls
315/// `push()` as entries arrive. A [`FeederRunner`] driving a
316/// [`PeerScannerAdapter`] holds another clone of the inner scanner to stream
317/// the entries to the downstream.
318/// and calls `next_entry()` to stream those entries downstream.
319pub struct PeerFeederSource(pub Arc<PeerLogScanner>);
320
321impl PeerFeederSource {
322    /// Create a new `PeerFeederSource` backed by a fresh `PeerLogScanner`.
323    pub fn new() -> Self {
324        Self(Arc::new(PeerLogScanner::new()))
325    }
326
327    /// Return a clone of the inner `Arc<PeerLogScanner>` for the receiver
328    /// thread to use when pushing entries.
329    pub fn clone_scanner(&self) -> Arc<PeerLogScanner> {
330        Arc::clone(&self.0)
331    }
332}
333
334impl Default for PeerFeederSource {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340/// Adapter so `PeerFeederSource` can be used directly as a `LogScanner`.
341///
342/// We implement `LogScanner` on the *mutable reference* side: since
343/// `PeerFeederSource` is `Arc`-wrapped, we implement on a thin wrapper
344/// struct that holds an `Arc` and a local cursor.
345pub struct PeerScannerAdapter {
346    source: Arc<PeerLogScanner>,
347    cursor_vlsn: u64,
348}
349
350impl PeerScannerAdapter {
351    /// Create a new adapter starting from `start_vlsn`.
352    pub fn new(source: Arc<PeerLogScanner>, start_vlsn: u64) -> Self {
353        Self { source, cursor_vlsn: start_vlsn }
354    }
355}
356
357impl LogScanner for PeerScannerAdapter {
358    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
359        let effective_from = self.cursor_vlsn.max(from_vlsn);
360        let entry = {
361            let mut q = self.source.queue.lock();
362            let mut result = None;
363            while let Some(&(vlsn, _, _)) = q.front() {
364                if vlsn >= effective_from {
365                    result = q.pop_front();
366                    break;
367                }
368                q.pop_front(); // discard stale entries
369            }
370            result
371        };
372        if let Some((vlsn, _, _)) = &entry {
373            self.cursor_vlsn = vlsn + 1;
374        }
375        entry
376    }
377}
378
379// ---------------------------------------------------------------------------
380// Syncup helpers
381// ---------------------------------------------------------------------------
382
383/// Result of a peer syncup negotiation.
384///
385///  HA, `FeederReplicaSyncup` finds the highest VLSN that is committed
386/// on BOTH the feeder and the replica (the "matchpoint").  The feeder then
387/// streams entries from matchpoint+1 onwards.
388///
389/// We model this as a simple VLSN range comparison: if the peer holds
390/// `[peer_first, peer_last]` and the replica needs `replica_needs` onwards,
391/// we can serve if `peer_first <= replica_needs <= peer_last`.
392#[derive(Debug, Clone, PartialEq, Eq)]
393pub enum SyncupResult {
394    /// The peer holds the needed range; stream from `start_vlsn`.
395    CanServe { start_vlsn: u64 },
396    /// The peer does not hold the needed VLSN; fall back to master or
397    /// network restore.
398    NeedsRestore,
399}
400
401/// Determine whether `peer_range` can serve a replica that needs
402/// log entries starting from `replica_needs`.
403///
404/// This is the range-availability check only (CanServe/NeedsRestore). The
405/// diverged-tail matchpoint search + rollback decision (REP-1 STEP 5) lives in
406/// [`crate::stream::syncup`] (`find_matchpoint` + `verify_rollback`, ported
407/// from JE `ReplicaFeederSyncup`); the live driver replaces this range check
408/// with that decision core once the syncup wire protocol + backward reader
409/// land. See `docs/src/maintainer/design-decisions.md` (REP-1).
410pub fn negotiate_syncup(
411    peer_range: Option<(u64, u64)>,
412    replica_needs: u64,
413) -> SyncupResult {
414    match peer_range {
415        Some((first, last))
416            if first <= replica_needs && replica_needs <= last =>
417        {
418            SyncupResult::CanServe { start_vlsn: replica_needs }
419        }
420        _ => SyncupResult::NeedsRestore,
421    }
422}
423
424// ---------------------------------------------------------------------------
425// PeerFeederService — TCP ServiceHandler backed by a PeerLogScanner
426// ---------------------------------------------------------------------------
427
428/// A [`ServiceHandler`] that streams peer-held log entries to a requesting
429/// downstream replica over the `"PEER_FEEDER"` service.
430///
431/// The service is registered on the `TcpServiceDispatcher` at startup.
432/// When a downstream replica connects, the protocol is:
433///
434/// 1. The downstream sends `[start_vlsn: u64 LE]` (8 bytes) — the first VLSN
435///    it needs.
436/// 2. The server negotiates via `negotiate_syncup()`.
437/// 3. If the range is available, a [`FeederRunner`] (driving an
438///    [`EnvironmentLogScanner`] for the WAL path or a [`PeerScannerAdapter`]
439///    for the in-memory path) streams entries until the channel closes.
440/// 4. If the range is not available, the server responds with
441///    `[NEEDS_RESTORE: u8 = 1]` and closes the connection.
442///
443/// The `PeerLogScanner` (`source`) is populated by the node's own
444/// `ReplicaReceiver` as entries arrive from the master.
445pub struct PeerFeederService {
446    source: Arc<PeerLogScanner>,
447    /// Optional WAL-backed feeder source for chained replication.
448    ///
449    /// When `Some`, the service serves the requested VLSN range from this
450    /// node's OWN WAL via an [`EnvironmentLogScanner`] driven by a
451    /// [`FeederRunner`] — the *same* machinery the master uses
452    /// ([`crate::stream::feeder`]).  This is JE's cascading-feeder model:
453    /// `FeederSource` is "a real Master OR a Replica in a Replica chain that
454    /// is replaying log records it received from some other source"
455    /// (`FeederSource.java`); `MasterFeederSource` reads the VLSNIndex + log
456    /// on whatever node hosts it.
457    ///
458    /// `None` (default) preserves the in-memory `PeerLogScanner` pull path.
459    wal_source: Option<WalFeederSource>,
460    /// Count of downstream connections served via the WAL `FeederRunner` +
461    /// `EnvironmentLogScanner` path (the JE `Feeder` + `MasterFeederSource`
462    /// mechanism).  Lets the owning [`crate::ReplicatedEnvironment`] — and
463    /// tests — PROVE that a cascading replica feeds downstream via the same
464    /// mechanism the master uses, not the in-memory pull fallback.
465    wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
466}
467
468/// WAL-backed feeder source for a chained (replica-to-replica) feed.
469///
470/// Holds a live [`EnvironmentImpl`] (whose WAL carries VLSN-tagged 22-byte
471/// headers written by [`crate::stream::replica_stream::EnvironmentLogWriter`])
472/// and the shared [`VlsnIndex`] used to negotiate the available VLSN range.
473///
474/// Faithful to JE `MasterFeederSource(repImpl, vlsnIndex, startVLSN)` — the
475/// feeder source is constructed from the environment + VLSN index regardless
476/// of whether the node is master or replica.
477#[derive(Clone)]
478pub struct WalFeederSource {
479    env: Arc<noxu_dbi::EnvironmentImpl>,
480    vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
481}
482
483impl WalFeederSource {
484    /// Create a WAL-backed feeder source.
485    pub fn new(
486        env: Arc<noxu_dbi::EnvironmentImpl>,
487        vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
488    ) -> Self {
489        Self { env, vlsn_index }
490    }
491}
492
493impl PeerFeederService {
494    /// Create a new service backed by an in-memory `source` (pull path).
495    pub fn new(source: Arc<PeerLogScanner>) -> Self {
496        Self {
497            source,
498            wal_source: None,
499            wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
500        }
501    }
502
503    /// Create a service that ALSO serves the chained-replication WAL feed.
504    ///
505    /// When a downstream replica connects, the service prefers the WAL
506    /// source: it negotiates the VLSN range from the [`VlsnIndex`] and, if
507    /// it can serve, streams entries from this node's WAL via an
508    /// [`EnvironmentLogScanner`] + [`FeederRunner`] (the same path the
509    /// master uses).  If the WAL cannot serve the requested range it falls
510    /// back to the in-memory `source`, then to `NEEDS_RESTORE`.
511    ///
512    /// Used by [`crate::ReplicatedEnvironment::become_replica`] when
513    /// `cascade_feeding` is enabled and a live `EnvironmentImpl` is wired.
514    pub fn with_wal_source(
515        source: Arc<PeerLogScanner>,
516        wal_source: WalFeederSource,
517    ) -> Self {
518        Self {
519            source,
520            wal_source: Some(wal_source),
521            wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
522        }
523    }
524
525    /// Like [`Self::with_wal_source`] but shares `wal_feeds_served` with the
526    /// owning [`crate::ReplicatedEnvironment`] so it (and tests) can PROVE
527    /// the node served a downstream via the JE `Feeder`/`MasterFeederSource`
528    /// path (`FeederRunner + EnvironmentLogScanner`).
529    pub fn with_wal_source_counted(
530        source: Arc<PeerLogScanner>,
531        wal_source: WalFeederSource,
532        wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
533    ) -> Self {
534        Self { source, wal_source: Some(wal_source), wal_feeds_served }
535    }
536
537    /// Number of downstream connections this service has served via the JE
538    /// `Feeder`/`MasterFeederSource` path (`FeederRunner +
539    /// EnvironmentLogScanner`).  `0` means no cascade/WAL feed has run yet.
540    pub fn wal_feeds_served(&self) -> u64 {
541        self.wal_feeds_served.load(std::sync::atomic::Ordering::SeqCst)
542    }
543}
544
545/// Wire-level response codes sent by the server.
546const PEER_FEEDER_CAN_SERVE: u8 = 0;
547const PEER_FEEDER_NEEDS_RESTORE: u8 = 1;
548
549impl ServiceHandler for PeerFeederService {
550    fn service_name(&self) -> &str {
551        PEER_FEEDER_SERVICE_NAME
552    }
553
554    fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
555        use std::time::Duration;
556
557        // 1. Read the 8-byte start_vlsn from the downstream replica.
558        let msg =
559            channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
560                RepError::NetworkError(
561                    "PEER_FEEDER: no start_vlsn received".into(),
562                )
563            })?;
564
565        if msg.len() < 8 {
566            return Err(RepError::NetworkError(format!(
567                "PEER_FEEDER: short handshake ({} bytes)",
568                msg.len()
569            )));
570        }
571        let start_vlsn =
572            u64::from_le_bytes(msg[..8].try_into().expect("slice of 8 bytes"));
573
574        // 2a. Chained replication (cascade): if a WAL source is wired, prefer
575        //     serving the downstream from THIS node's own WAL via the same
576        //     EnvironmentLogScanner + FeederRunner the master uses.  Faithful
577        //     to JE's cascading-feeder model (see `WalFeederSource`).
578        //
579        //     The downstream sends start_vlsn=0 to mean "from the beginning";
580        //     we serve from the first VLSN our VLSNIndex holds.  We can serve
581        //     iff the requested start falls within `[first, last]` (or the
582        //     range is non-empty and start_vlsn==0).
583        if let Some(wal) = &self.wal_source {
584            let range = wal.vlsn_index.get_range();
585            let (first, last) = (range.first(), range.last());
586            let have_data = last > 0 && first > 0;
587            let effective_start =
588                if start_vlsn == 0 { first } else { start_vlsn };
589            let can_serve = have_data
590                && effective_start >= first
591                && effective_start <= last;
592            if can_serve {
593                channel.send(&[PEER_FEEDER_CAN_SERVE])?;
594                let channel_arc: Arc<dyn Channel> = Arc::from(channel);
595                // EnvironmentLogScanner starts at the log beginning and skips
596                // entries with vlsn < effective_start; the FeederRunner then
597                // streams VLSN-ordered entries exactly as the master does.
598                //
599                // JE fidelity: this is JE `Feeder` running `MasterFeederSource`
600                // — `Feeder.initMasterFeederSource(startVLSN)` builds
601                // `new MasterFeederSource(repImpl, repNode.getVLSNIndex(), …)`
602                // and the output loop pulls
603                // `feederSource.getWireRecord(feederVLSN, heartbeatMs)`
604                // (`Feeder.java:1282`).  Here `FeederRunner` IS that loop and
605                // `EnvironmentLogScanner` IS that `MasterFeederSource`
606                // (a `FeederReader` over the VLSNIndex+WAL).  A cascading
607                // replica reaches this branch with `wal.env` = its OWN env, so
608                // it feeds downstream by the IDENTICAL mechanism the master
609                // uses — reading its own WAL.
610                let mut scanner =
611                    match EnvironmentLogScanner::new(&wal.env, None) {
612                        Some(s) => s,
613                        None => {
614                            // WAL scanner unavailable (read-only env / no log):
615                            // the CAN_SERVE byte was already sent, so close the
616                            // channel; the downstream will retry / fall back.
617                            return Err(RepError::NetworkError(
618                                "PEER_FEEDER: WAL scanner unavailable".into(),
619                            ));
620                        }
621                    };
622                // Record (for the env + tests) that THIS connection was served
623                // by the JE `Feeder`/`MasterFeederSource` path — the proof
624                // that the cascade uses the same mechanism as the master.
625                self.wal_feeds_served
626                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
627                let runner = FeederRunner::new(channel_arc, effective_start);
628                let _ = runner.run(&mut scanner);
629                return Ok(());
630            }
631            // WAL cannot serve the requested range: fall through to the
632            // in-memory pull path, then to NEEDS_RESTORE.  A downstream that
633            // asks for an evicted/old range that the mid-tier no longer holds
634            // must catch up via network restore or fall back to the master.
635        }
636
637        // 2. Negotiate: do we hold the requested VLSN range in memory?
638        let range = self.source.log_range();
639        match negotiate_syncup(range, start_vlsn) {
640            SyncupResult::CanServe { start_vlsn: sv } => {
641                // Tell the downstream it can proceed.
642                channel.send(&[PEER_FEEDER_CAN_SERVE])?;
643
644                // 3. Stream entries in this thread (the dispatcher already
645                //    called us from a per-connection thread).
646                //
647                //    SAME feeder loop as the WAL/cascade path: ONE
648                //    `FeederRunner` (JE `Feeder`) driving ONE `LogScanner`
649                //    (JE `FeederSource`).  Here the source is the in-memory
650                //    `PeerScannerAdapter` (non-JE convenience for env-less
651                //    nodes), not the WAL `EnvironmentLogScanner`.  There is no
652                //    second feeder mechanism — only a second source.
653                let channel_arc: Arc<dyn Channel> = Arc::from(channel);
654                let mut source =
655                    PeerScannerAdapter::new(Arc::clone(&self.source), sv);
656                let runner = FeederRunner::new(channel_arc, sv);
657                let _ = runner.run(&mut source);
658                Ok(())
659            }
660            SyncupResult::NeedsRestore => {
661                channel.send(&[PEER_FEEDER_NEEDS_RESTORE])?;
662                Err(RepError::NetworkError(format!(
663                    "PEER_FEEDER: cannot serve vlsn={start_vlsn}, \
664                     range={range:?}"
665                )))
666            }
667        }
668    }
669}
670
671// ---------------------------------------------------------------------------
672// Client-side peer catch-up
673// ---------------------------------------------------------------------------
674
675/// Connect to a peer node's `PEER_FEEDER` service and pull log entries
676/// starting from `start_vlsn`.
677///
678/// This is the client counterpart to [`PeerFeederService`].  It is called
679/// by a replica that is behind and wants to catch up from a peer that holds
680/// the needed VLSN range (rather than routing all traffic through the master).
681///
682/// Protocol (matches `PeerFeederService::handle`):
683///   1. Open a TCP connection and request the `"PEER_FEEDER"` service via
684///      `service_dispatcher::connect_to_service()`.
685///   2. Send `[start_vlsn: u64 LE]`.
686///   3. Read the one-byte response:
687///      - `0` (`PEER_FEEDER_CAN_SERVE`) — peer has the range; proceed.
688///      - `1` (`PEER_FEEDER_NEEDS_RESTORE`) — peer cannot serve; return
689///        `Ok(false)` so the caller can fall back to the master.
690///   4. Start a `ReplicaReceiver` loop on the channel, passing each entry
691///      to `log_writer`.  Returns `Ok(true)` when the peer closes the
692///      channel (i.e. the catch-up is complete).
693///
694/// # Pipelining
695///
696/// `catch_up_from_peer` is intentionally non-async.  Call it from a
697/// dedicated thread per peer.  To pipeline catch-up from multiple peers
698/// simultaneously, spawn one thread per peer (e.g. from a `ThreadPool`).
699/// The [`MultiPeerCatchUp`] helper below manages this.
700pub fn catch_up_from_peer(
701    peer_addr: std::net::SocketAddr,
702    start_vlsn: u64,
703    log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
704) -> Result<bool> {
705    use crate::net::service_dispatcher::connect_to_service;
706    use crate::stream::replica_stream::ReplicaReceiver;
707    use std::sync::Arc;
708    use std::time::Duration;
709
710    // Connect and request the PEER_FEEDER service.
711    let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
712
713    // Send start_vlsn.
714    channel.send(&start_vlsn.to_le_bytes())?;
715
716    // Read the one-byte response.
717    let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
718        RepError::NetworkError("no response from peer feeder".into())
719    })?;
720    if resp.is_empty() {
721        return Err(RepError::NetworkError(
722            "empty response from peer feeder".into(),
723        ));
724    }
725    match resp[0] {
726        PEER_FEEDER_CAN_SERVE => {}
727        PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
728        other => {
729            return Err(RepError::ProtocolError(format!(
730                "peer feeder unknown response byte: {other:#x}"
731            )));
732        }
733    }
734
735    // Run the replica receive loop.
736    let channel_arc: Arc<dyn Channel> = Arc::from(channel);
737    let receiver = ReplicaReceiver::new(channel_arc);
738    receiver.run(log_writer)?;
739
740    Ok(true)
741}
742
743/// Like [`catch_up_from_peer`] but polls `shutdown` so the receive loop can
744/// exit promptly when the environment is closing (used by
745/// [`crate::ReplicatedEnvironment::become_replica`]'s I/O thread so
746/// `close()` can join it even while the upstream feeder stays connected).
747pub fn catch_up_from_peer_until(
748    peer_addr: std::net::SocketAddr,
749    start_vlsn: u64,
750    log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
751    shutdown: &std::sync::atomic::AtomicBool,
752) -> Result<bool> {
753    use crate::net::service_dispatcher::connect_to_service;
754    use crate::stream::replica_stream::ReplicaReceiver;
755    use std::sync::Arc;
756    use std::time::Duration;
757
758    let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
759    channel.send(&start_vlsn.to_le_bytes())?;
760    let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
761        RepError::NetworkError("no response from peer feeder".into())
762    })?;
763    if resp.is_empty() {
764        return Err(RepError::NetworkError(
765            "empty response from peer feeder".into(),
766        ));
767    }
768    match resp[0] {
769        PEER_FEEDER_CAN_SERVE => {}
770        PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
771        other => {
772            return Err(RepError::ProtocolError(format!(
773                "peer feeder unknown response byte: {other:#x}"
774            )));
775        }
776    }
777
778    let channel_arc: Arc<dyn Channel> = Arc::from(channel);
779    let receiver = ReplicaReceiver::new(channel_arc);
780    receiver.run_until(log_writer, Some(shutdown))?;
781
782    Ok(true)
783}
784
785/// Pipelined catch-up from multiple peer nodes simultaneously.
786///
787/// Spawns one thread per peer in `peers` and waits for all to finish (or
788/// for the first to succeed).  Returns the name of the peer that supplied
789/// the entries, or `None` if no peer could serve the range.
790///
791/// The `log_writer_factory` closure is called once per thread to produce a
792/// per-thread `LogWriter`.  The factory must be `Send + Sync`.
793pub struct MultiPeerCatchUp {
794    peers: Vec<(String, std::net::SocketAddr)>,
795    start_vlsn: u64,
796}
797
798impl MultiPeerCatchUp {
799    /// Create a new multi-peer catch-up request.
800    ///
801    /// `peers` is a list of `(node_name, socket_addr)` pairs to try.
802    pub fn new(
803        peers: Vec<(String, std::net::SocketAddr)>,
804        start_vlsn: u64,
805    ) -> Self {
806        Self { peers, start_vlsn }
807    }
808
809    /// Run pipelined catch-up.
810    ///
811    /// Spawns one thread per peer and waits for the first to succeed.
812    /// Each thread calls `catch_up_from_peer`; the winning thread's entries
813    /// are applied through `make_writer()`.  Other threads are joined once
814    /// the first succeeds.
815    ///
816    /// Returns the name of the first peer that successfully served the range,
817    /// or `None` if all peers declined.
818    pub fn run<F, W>(self, make_writer: F) -> Option<String>
819    where
820        F: Fn() -> W + Send + Sync + 'static,
821        W: crate::stream::replica_stream::LogWriter + Send + 'static,
822    {
823        use std::sync::atomic::{AtomicBool, Ordering};
824        let make_writer = std::sync::Arc::new(make_writer);
825        let done = std::sync::Arc::new(AtomicBool::new(false));
826        let winner: std::sync::Arc<noxu_sync::Mutex<Option<String>>> =
827            std::sync::Arc::new(noxu_sync::Mutex::new(None));
828
829        let mut handles = Vec::new();
830
831        for (name, addr) in self.peers {
832            let make_writer = std::sync::Arc::clone(&make_writer);
833            let done = std::sync::Arc::clone(&done);
834            let winner = std::sync::Arc::clone(&winner);
835            let start_vlsn = self.start_vlsn;
836            let name_clone = name.clone();
837
838            let handle = std::thread::Builder::new()
839                .name(format!("noxu-peer-catchup-{}", name))
840                .spawn(move || {
841                    if done.load(Ordering::Acquire) {
842                        return; // another peer already won
843                    }
844                    let mut writer = make_writer();
845                    match catch_up_from_peer(addr, start_vlsn, &mut writer) {
846                        Ok(true) => {
847                            if !done.swap(true, Ordering::AcqRel) {
848                                *winner.lock() = Some(name_clone);
849                            }
850                        }
851                        Ok(false) => {
852                            log::debug!(
853                                "peer '{}' cannot serve vlsn={start_vlsn}",
854                                name
855                            );
856                        }
857                        Err(e) => {
858                            log::warn!(
859                                "catch-up from peer '{}' failed: {e}",
860                                name
861                            );
862                        }
863                    }
864                })
865                .expect("failed to spawn peer catch-up thread");
866
867            handles.push(handle);
868        }
869
870        for h in handles {
871            let _ = h.join();
872        }
873
874        winner.lock().clone()
875    }
876}
877
878// ---------------------------------------------------------------------------
879// Tests
880// ---------------------------------------------------------------------------
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885    use crate::net::channel::LocalChannelPair;
886    use std::time::Duration;
887
888    // -----------------------------------------------------------------------
889    // PeerLogScanner unit tests
890    // -----------------------------------------------------------------------
891
892    #[test]
893    fn test_peer_scanner_push_and_log_range() {
894        let scanner = PeerLogScanner::new();
895        assert!(scanner.is_empty());
896        assert!(scanner.log_range().is_none());
897
898        scanner.push(5, 1, b"entry5".to_vec());
899        scanner.push(6, 1, b"entry6".to_vec());
900        scanner.push(10, 1, b"entry10".to_vec());
901
902        assert_eq!(scanner.len(), 3);
903        assert_eq!(scanner.log_range(), Some((5, 10)));
904    }
905
906    #[test]
907    fn test_peer_scanner_next_entry_in_order() {
908        let mut scanner = PeerLogScanner::new();
909        for vlsn in [3u64, 4, 5, 6, 7] {
910            scanner.push(vlsn, 1, vlsn.to_le_bytes().to_vec());
911        }
912
913        // Ask from_vlsn=4 — should skip 3 and return 4, 5, 6, 7.
914        let mut results = Vec::new();
915        while let Some((vlsn, _, _)) = scanner.next_entry(4) {
916            results.push(vlsn);
917        }
918        assert_eq!(results, vec![4, 5, 6, 7]);
919    }
920
921    #[test]
922    fn test_peer_scanner_skips_stale_entries() {
923        let mut scanner = PeerLogScanner::new();
924        for v in [1u64, 2, 3, 10, 11] {
925            scanner.push(v, 1, vec![v as u8]);
926        }
927        // Ask from 10 — entries 1, 2, 3 are stale.
928        let entry = scanner.next_entry(10);
929        assert_eq!(entry.map(|(v, _, _)| v), Some(10));
930    }
931
932    #[test]
933    fn test_peer_scanner_empty_returns_none() {
934        let mut scanner = PeerLogScanner::new();
935        assert!(scanner.next_entry(1).is_none());
936    }
937
938    // -----------------------------------------------------------------------
939    // PeerScannerAdapter unit tests
940    // -----------------------------------------------------------------------
941
942    #[test]
943    fn test_peer_scanner_adapter_cursor_advances() {
944        let source = Arc::new(PeerLogScanner::new());
945        for v in [1u64, 2, 3, 4, 5] {
946            source.push(v, 1, vec![v as u8]);
947        }
948
949        let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 1);
950        let mut seen = Vec::new();
951        while let Some((v, _, _)) = adapter.next_entry(1) {
952            seen.push(v);
953        }
954        assert_eq!(seen, vec![1, 2, 3, 4, 5]);
955    }
956
957    // -----------------------------------------------------------------------
958    // In-memory source streamed via the shared FeederRunner loop
959    // -----------------------------------------------------------------------
960
961    #[test]
962    fn test_in_memory_source_streams_to_replica_via_feeder_runner() {
963        // The in-memory `PeerLogScanner` feeds through the SAME `FeederRunner`
964        // loop the WAL path uses — only the `LogScanner` source differs.
965        let source = Arc::new(PeerLogScanner::new());
966        for v in [10u64, 11, 12, 13, 14] {
967            source.push(v, 1, format!("payload-{v}").into_bytes());
968        }
969
970        let pair = LocalChannelPair::new();
971        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
972        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
973
974        // Receiver: collect all frames.
975        let recv_handle = {
976            let receiver = Arc::clone(&receiver);
977            std::thread::spawn(move || {
978                let mut vlsns = Vec::new();
979                // Expect 5 frames then a timeout.
980                for _ in 0..5 {
981                    let frame = receiver
982                        .receive(Duration::from_secs(5))
983                        .unwrap()
984                        .unwrap();
985                    let vlsn =
986                        u64::from_le_bytes(frame[0..8].try_into().unwrap());
987                    vlsns.push(vlsn);
988                    // Send ack.
989                    let _ = receiver.send(&vlsn.to_le_bytes());
990                }
991                vlsns
992            })
993        };
994
995        let sender_clone = Arc::clone(&sender);
996        let run_handle = std::thread::spawn(move || {
997            let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 10);
998            let runner = FeederRunner::new(sender, 10);
999            let _ = runner.run(&mut adapter);
1000        });
1001
1002        // Wait for receiver to collect all 5 frames.
1003        let vlsns = recv_handle.join().unwrap();
1004        assert_eq!(vlsns, vec![10, 11, 12, 13, 14]);
1005
1006        sender_clone.close().unwrap();
1007        let _ = run_handle.join();
1008    }
1009
1010    // -----------------------------------------------------------------------
1011    // negotiate_syncup tests
1012    // -----------------------------------------------------------------------
1013
1014    #[test]
1015    fn test_negotiate_syncup_can_serve() {
1016        assert_eq!(
1017            negotiate_syncup(Some((5, 20)), 10),
1018            SyncupResult::CanServe { start_vlsn: 10 }
1019        );
1020        // Exact boundary.
1021        assert_eq!(
1022            negotiate_syncup(Some((10, 10)), 10),
1023            SyncupResult::CanServe { start_vlsn: 10 }
1024        );
1025    }
1026
1027    #[test]
1028    fn test_negotiate_syncup_needs_restore_too_early() {
1029        // Replica needs VLSN 3 but peer only has [5, 20].
1030        assert_eq!(
1031            negotiate_syncup(Some((5, 20)), 3),
1032            SyncupResult::NeedsRestore
1033        );
1034    }
1035
1036    #[test]
1037    fn test_negotiate_syncup_needs_restore_too_late() {
1038        // Replica needs VLSN 25 but peer only has [5, 20].
1039        assert_eq!(
1040            negotiate_syncup(Some((5, 20)), 25),
1041            SyncupResult::NeedsRestore
1042        );
1043    }
1044
1045    #[test]
1046    fn test_negotiate_syncup_no_range() {
1047        // Peer has no log range (just joined or restoring).
1048        assert_eq!(negotiate_syncup(None, 10), SyncupResult::NeedsRestore);
1049    }
1050
1051    // -----------------------------------------------------------------------
1052    // GroupService CBVLSN integration
1053    // -----------------------------------------------------------------------
1054
1055    #[test]
1056    fn test_group_service_cbvlsn_tracks_minimum() {
1057        use crate::group_service::{GroupService, NodeInfo};
1058        use crate::node_type::NodeType;
1059        use std::time::Instant;
1060
1061        let gs = GroupService::new("test_group".to_string());
1062
1063        // Add 3 electable nodes.
1064        for (name, port) in [("n1", 5001u16), ("n2", 5002), ("n3", 5003)] {
1065            gs.add_node(NodeInfo {
1066                name: name.to_string(),
1067                node_type: NodeType::Electable,
1068                host: "localhost".to_string(),
1069                port,
1070                node_id: port as u32,
1071                joined_at: Instant::now(),
1072                last_seen: Instant::now(),
1073                is_active: true,
1074                known_vlsn: 0,
1075                log_range: None,
1076                read_capacity_pct: 100,
1077                write_capacity_pct: 100,
1078                latency_hint_ms: 1,
1079            })
1080            .unwrap();
1081        }
1082
1083        // Initially all known_vlsn = 0 → CBVLSN = 0.
1084        assert_eq!(gs.get_cbvlsn(), 0);
1085
1086        // Update n1 and n2 but not n3 → CBVLSN = min(50, 40, 0) = 0.
1087        gs.update_node_vlsn("n1", 50);
1088        gs.update_node_vlsn("n2", 40);
1089        assert_eq!(gs.get_cbvlsn(), 0, "n3 still at 0, CBVLSN must be 0");
1090
1091        // Now n3 also updates → CBVLSN = min(50, 40, 30) = 30.
1092        gs.update_node_vlsn("n3", 30);
1093        assert_eq!(gs.get_cbvlsn(), 30);
1094
1095        // n3 advances → min(50, 40, 45) = 40.
1096        gs.update_node_vlsn("n3", 45);
1097        assert_eq!(gs.get_cbvlsn(), 40);
1098    }
1099
1100    #[test]
1101    fn test_group_service_cbvlsn_monotone_nondecreasing() {
1102        use crate::group_service::{GroupService, NodeInfo};
1103        use crate::node_type::NodeType;
1104        use std::time::Instant;
1105
1106        let gs = GroupService::new("cbvlsn_monotone".to_string());
1107
1108        for (name, port) in [("a", 5001u16), ("b", 5002)] {
1109            gs.add_node(NodeInfo {
1110                name: name.to_string(),
1111                node_type: NodeType::Electable,
1112                host: "localhost".to_string(),
1113                port,
1114                node_id: port as u32,
1115                joined_at: Instant::now(),
1116                last_seen: Instant::now(),
1117                is_active: true,
1118                known_vlsn: 0,
1119                log_range: None,
1120                read_capacity_pct: 100,
1121                write_capacity_pct: 100,
1122                latency_hint_ms: 1,
1123            })
1124            .unwrap();
1125        }
1126
1127        // CBVLSN must never decrease.
1128        let mut prev = 0u64;
1129        for (na, va, nb, vb) in [
1130            ("a", 10u64, "b", 5u64),
1131            ("a", 20, "b", 15),
1132            ("a", 25, "b", 22),
1133            ("a", 30, "b", 28),
1134        ] {
1135            gs.update_node_vlsn(na, va);
1136            gs.update_node_vlsn(nb, vb);
1137            let cbvlsn = gs.get_cbvlsn();
1138            assert!(
1139                cbvlsn >= prev,
1140                "CBVLSN must not decrease: was {prev}, now {cbvlsn}"
1141            );
1142            prev = cbvlsn;
1143        }
1144    }
1145
1146    #[test]
1147    fn test_group_service_find_peers_with_vlsn() {
1148        use crate::group_service::{GroupService, NodeInfo};
1149        use crate::node_type::NodeType;
1150        use std::time::Instant;
1151
1152        let gs = GroupService::new("peer_select".to_string());
1153
1154        // Node a: holds [1, 100]
1155        // Node b: holds [50, 200]
1156        // Node c: no range
1157        for (name, port, range) in [
1158            ("a", 5001u16, Some((1u64, 100u64))),
1159            ("b", 5002, Some((50, 200))),
1160            ("c", 5003, None),
1161        ] {
1162            let mut info = NodeInfo {
1163                name: name.to_string(),
1164                node_type: NodeType::Electable,
1165                host: "localhost".to_string(),
1166                port,
1167                node_id: port as u32,
1168                joined_at: Instant::now(),
1169                last_seen: Instant::now(),
1170                is_active: true,
1171                known_vlsn: 0,
1172                log_range: range,
1173                read_capacity_pct: 100,
1174                write_capacity_pct: 100,
1175                latency_hint_ms: 1,
1176            };
1177            // Set last_seen differently so we can check sort order.
1178            info.last_seen = Instant::now()
1179                - std::time::Duration::from_millis(port as u64 * 10);
1180            gs.add_node(info).unwrap();
1181        }
1182
1183        // VLSN 75: only a and b hold it.
1184        let peers = gs.find_peers_with_vlsn(75);
1185        assert!(peers.contains(&"a".to_string()));
1186        assert!(peers.contains(&"b".to_string()));
1187        assert!(!peers.contains(&"c".to_string()));
1188
1189        // VLSN 150: only b holds it.
1190        let peers = gs.find_peers_with_vlsn(150);
1191        assert_eq!(peers, vec!["b".to_string()]);
1192
1193        // VLSN 201: nobody holds it.
1194        assert!(gs.find_peers_with_vlsn(201).is_empty());
1195    }
1196
1197    #[test]
1198    fn test_group_service_update_log_range() {
1199        use crate::group_service::{GroupService, NodeInfo};
1200        use crate::node_type::NodeType;
1201        use std::time::Instant;
1202
1203        let gs = GroupService::new("log_range_test".to_string());
1204        gs.add_node(NodeInfo {
1205            name: "r1".to_string(),
1206            node_type: NodeType::Electable,
1207            host: "localhost".to_string(),
1208            port: 5001,
1209            node_id: 1,
1210            joined_at: Instant::now(),
1211            last_seen: Instant::now(),
1212            is_active: true,
1213            known_vlsn: 0,
1214            log_range: None,
1215            read_capacity_pct: 100,
1216            write_capacity_pct: 100,
1217            latency_hint_ms: 1,
1218        })
1219        .unwrap();
1220
1221        // Initially no range.
1222        assert!(gs.get_node("r1").unwrap().log_range.is_none());
1223
1224        // Update range.
1225        gs.update_node_log_range("r1", 100, 500);
1226        assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 500)));
1227
1228        // Extend range.
1229        gs.update_node_log_range("r1", 100, 800);
1230        assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 800)));
1231    }
1232
1233    // -----------------------------------------------------------------------
1234    // PeerFeederService::handle paths
1235    // -----------------------------------------------------------------------
1236
1237    #[test]
1238    fn test_peer_feeder_service_can_serve() {
1239        use crate::net::channel::LocalChannelPair;
1240
1241        // Source has range [10, 20]. Client requests start_vlsn=15
1242        // (in range). Service should respond with CAN_SERVE then
1243        // stream entries until close.
1244        let source = Arc::new(PeerLogScanner::new());
1245        for v in 10u64..=20 {
1246            source.push(v, 0, format!("e{}", v).into_bytes());
1247        }
1248        let svc = PeerFeederService::new(Arc::clone(&source));
1249
1250        let pair = LocalChannelPair::new();
1251        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1252        let client_ch = pair.channel_b;
1253
1254        // Client sends start_vlsn=15.
1255        client_ch.send(&15u64.to_le_bytes()).unwrap();
1256
1257        // Service runs in another thread so we can observe the
1258        // streaming side.
1259        let svc_handle = std::thread::spawn(move || svc.handle(server_ch));
1260
1261        // Read the 1-byte response.
1262        let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1263        assert_eq!(
1264            resp,
1265            vec![PEER_FEEDER_CAN_SERVE],
1266            "service should reply CAN_SERVE for in-range start_vlsn"
1267        );
1268
1269        // Drain a few frames then close to terminate the runner.
1270        let mut frames = 0;
1271        while let Ok(Some(_)) = client_ch.receive(Duration::from_millis(80)) {
1272            frames += 1;
1273            if frames >= 3 {
1274                break;
1275            }
1276        }
1277        // Close client side so runner returns.
1278        client_ch.close().unwrap();
1279        let _ = svc_handle.join().unwrap();
1280        assert!(frames >= 1, "service must have streamed at least one frame");
1281    }
1282
1283    #[test]
1284    fn test_peer_feeder_service_needs_restore() {
1285        use crate::net::channel::LocalChannelPair;
1286
1287        // Source has range [10, 20]. Client requests start_vlsn=5
1288        // (too early). Service replies NEEDS_RESTORE and errors.
1289        let source = Arc::new(PeerLogScanner::new());
1290        for v in 10u64..=20 {
1291            source.push(v, 0, vec![]);
1292        }
1293        let svc = PeerFeederService::new(Arc::clone(&source));
1294
1295        let pair = LocalChannelPair::new();
1296        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1297        let client_ch = pair.channel_b;
1298
1299        client_ch.send(&5u64.to_le_bytes()).unwrap();
1300
1301        let r = svc.handle(server_ch);
1302        assert!(r.is_err(), "service must return Err on NEEDS_RESTORE");
1303        let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1304        assert_eq!(
1305            resp,
1306            vec![PEER_FEEDER_NEEDS_RESTORE],
1307            "service should reply NEEDS_RESTORE for out-of-range start_vlsn"
1308        );
1309    }
1310
1311    #[test]
1312    fn test_peer_feeder_service_short_handshake_errors() {
1313        use crate::net::channel::LocalChannelPair;
1314
1315        let source = Arc::new(PeerLogScanner::new());
1316        let svc = PeerFeederService::new(Arc::clone(&source));
1317
1318        let pair = LocalChannelPair::new();
1319        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1320        let client_ch = pair.channel_b;
1321
1322        // Send only 4 bytes — too short to be a valid u64
1323        // start_vlsn. Service must Err with "short handshake".
1324        client_ch.send(&[0u8; 4]).unwrap();
1325
1326        let r = svc.handle(server_ch);
1327        assert!(r.is_err(), "short handshake must error");
1328        let msg = format!("{}", r.err().unwrap());
1329        assert!(
1330            msg.contains("short handshake"),
1331            "expected 'short handshake' in error, got: {msg}"
1332        );
1333    }
1334
1335    #[test]
1336    fn test_peer_feeder_service_no_handshake_errors() {
1337        use crate::net::channel::LocalChannelPair;
1338
1339        let source = Arc::new(PeerLogScanner::new());
1340        let svc = PeerFeederService::new(Arc::clone(&source));
1341
1342        let pair = LocalChannelPair::new();
1343        let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1344        // Drop client side so receive returns None (no message).
1345        drop(pair.channel_b);
1346
1347        let r = svc.handle(server_ch);
1348        assert!(r.is_err(), "no-handshake must error");
1349    }
1350
1351    #[test]
1352    fn test_peer_feeder_service_name() {
1353        let source = Arc::new(PeerLogScanner::new());
1354        let svc = PeerFeederService::new(source);
1355        assert_eq!(
1356            svc.service_name(),
1357            PEER_FEEDER_SERVICE_NAME,
1358            "service_name must match the protocol const"
1359        );
1360    }
1361
1362    // -----------------------------------------------------------------------
1363    // PeerFeederSource and adapter constructors
1364    // -----------------------------------------------------------------------
1365
1366    #[test]
1367    fn test_peer_feeder_source_default_and_clone_scanner() {
1368        let src1 = PeerFeederSource::new();
1369        // default() and new() produce the same shape.
1370        let src2 = PeerFeederSource::default();
1371        let s1 = src1.clone_scanner();
1372        let s2 = src2.clone_scanner();
1373        // Distinct underlying scanners (each PeerFeederSource owns
1374        // its own Arc).
1375        s1.push(1, 0, b"a".to_vec());
1376        assert_eq!(s1.len(), 1);
1377        assert_eq!(s2.len(), 0);
1378    }
1379
1380    #[test]
1381    fn test_peer_log_scanner_default_is_empty() {
1382        let s = PeerLogScanner::default();
1383        assert!(s.is_empty());
1384        assert_eq!(s.len(), 0);
1385        assert!(s.log_range().is_none());
1386    }
1387
1388    #[test]
1389    fn test_feeder_runner_known_replica_vlsn_initial_zero() {
1390        use crate::net::channel::LocalChannelPair;
1391
1392        let pair = LocalChannelPair::new();
1393        let channel: Arc<dyn Channel> = Arc::new(pair.channel_a);
1394        let runner = FeederRunner::new(channel, 1);
1395        assert_eq!(runner.known_replica_vlsn(), 0);
1396    }
1397
1398    // -----------------------------------------------------------------------
1399    // PeerScannerAdapter: stale-entry skipping
1400    // -----------------------------------------------------------------------
1401
1402    #[test]
1403    fn test_peer_scanner_adapter_skips_stale_via_pop_front() {
1404        // After a known_replica_vlsn advance, push some entries
1405        // that are below the new floor — the adapter should
1406        // discard them via pop_front().
1407        let source = Arc::new(PeerLogScanner::new());
1408        for v in 1u64..=5 {
1409            source.push(v, 0, vec![]);
1410        }
1411        let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 3);
1412        // First call returns vlsn=3, skipping 1 and 2.
1413        let r = adapter.next_entry(3);
1414        assert!(r.is_some());
1415        let (vlsn, _, _) = r.unwrap();
1416        assert_eq!(vlsn, 3);
1417        // 4 next.
1418        let (vlsn, _, _) = adapter.next_entry(4).unwrap();
1419        assert_eq!(vlsn, 4);
1420    }
1421}