noxu_rep/stream/peer_feeder.rs
1//! Peer-to-peer log distribution — JE's single feeder mechanism.
2//!
3//! `PeerFeederService` is this node's service-side feeder accept loop —
4//! Noxu's [`com.sleepycat.je.rep.impl.node.FeederManager`]: it is registered
5//! once per node on the TCP dispatcher, and on every inbound connection it
6//! runs ONE [`FeederRunner`] (JE [`com.sleepycat.je.rep.impl.node.Feeder`])
7//! driving a single [`LogScanner`] (JE
8//! [`com.sleepycat.je.rep.stream.FeederSource`]).
9//!
10//! ## One mechanism, master OR replica (JE fidelity)
11//!
12//! ```text
13//! master ──PEER_FEEDER──► R1 ──PEER_FEEDER──► R2
14//! │ FeederRunner+EnvironmentLogScanner │ FeederRunner+EnvironmentLogScanner
15//! └── reads master's WAL ─────────────────┴── reads R1's OWN WAL
16//! ```
17//!
18//! JE `FeederSource.java` documents the source as "a real Master OR a
19//! Replica in a Replica chain that is replaying log records it received
20//! from some other source". JE `Feeder.initMasterFeederSource(startVLSN)`
21//! builds `new MasterFeederSource(repImpl, repNode.getVLSNIndex(), …)`
22//! regardless of node role, and its output loop pulls
23//! `feederSource.getWireRecord(feederVLSN, heartbeatMs)`
24//! (`Feeder.java:1282`). Noxu mirrors this exactly:
25//!
26//! | JE | Noxu |
27//! |--------------------------------------|---------------------------------------|
28//! | `FeederManager` (per-node accept) | [`PeerFeederService`] (per-node accept) |
29//! | `Feeder` thread + output loop | [`FeederRunner`] (`run`) |
30//! | `FeederSource` / `MasterFeederSource`| [`LogScanner`] / [`EnvironmentLogScanner`] |
31//! | `FeederReader` (VLSNIndex+WAL cursor)| [`EnvironmentLogScanner`] |
32//! | `getWireRecord(vlsn, waitTime)` | [`LogScanner::next_entry`] |
33//!
34//! A replica cascading to a downstream replica is the IDENTICAL mechanism as
35//! the master feeding a replica — it just reads from the replica's own WAL
36//! (which carries the entries it received + persisted via
37//! [`crate::stream::replica_stream::EnvironmentLogWriter::log_with_vlsn`]).
38//! Both the master ([`ReplicatedEnvironment::become_master`]) and a
39//! cascading replica ([`ReplicatedEnvironment::become_replica`] with
40//! `cascade_feeding`) register [`PeerFeederService::with_wal_source`], so
41//! both serve downstream via `FeederRunner + EnvironmentLogScanner`.
42//!
43//! [`ReplicatedEnvironment::become_master`]: crate::ReplicatedEnvironment::become_master
44//! [`ReplicatedEnvironment::become_replica`]: crate::ReplicatedEnvironment::become_replica
45//!
46//! ## In-memory queue — non-JE convenience, NOT a production feed
47//!
48//! [`PeerLogScanner`] is a `LogScanner` backed by an in-memory
49//! `VecDeque<(vlsn, entry_type, payload)>`. It exists ONLY as a fallback
50//! for nodes that have no live `EnvironmentImpl` wired (no WAL to scan) —
51//! e.g. the `replicate_entry` test convenience. It is NEVER on a
52//! production durability path: every node opened through `with_environment`
53//! registers a WAL source and serves from the WAL. It still feeds through
54//! the SAME [`FeederRunner`] loop ([`PeerScannerAdapter`] is just another
55//! `LogScanner`), so there is no second feeder mechanism — only a second,
56//! non-JE, non-durable *source* for the env-less case.
57//!
58//! ## CBVLSN
59//!
60//! The Cleaner Barrier VLSN is the global minimum `known_vlsn` across all
61//! active electable replicas. The log cleaner uses this to decide which
62//! log files it is safe to reclaim. It is maintained by `GroupService`
63//! and updated on every heartbeat / ack.
64//!
65//! Corresponds to `FeederReplicaSyncup`, `LocalCBVLSNUpdater`, and
66//! `RepGroupImpl.getCBVLSN()` in the implementation.
67
68use std::collections::VecDeque;
69use std::sync::Arc;
70
71use noxu_sync::Mutex;
72
73use crate::error::{RepError, Result};
74use crate::net::channel::Channel;
75use crate::net::service_dispatcher::ServiceHandler;
76use crate::stream::feeder::{EnvironmentLogScanner, FeederRunner, LogScanner};
77
78/// Service name registered with `TcpServiceDispatcher` for peer log feeds.
79pub const PEER_FEEDER_SERVICE_NAME: &str = "PEER_FEEDER";
80
81// ---------------------------------------------------------------------------
82// PeerLogScanner
83// ---------------------------------------------------------------------------
84
85/// Default maximum number of entries retained in a `PeerLogScanner`
86/// in-memory queue.
87///
88/// Without a bound, every replicated entry stays in RAM until it is
89/// consumed by a downstream peer. A long-running replica with no
90/// downstream consumers therefore accumulates one VecDeque entry per
91/// replicated record forever (audit finding F10).
92///
93/// 16 384 entries is enough headroom for the slowest expected
94/// downstream peer to drain while keeping resident memory bounded
95/// (assuming sub-MiB entries, this caps the queue at ~16 GiB worst
96/// case; the byte-size cap is the harder bound).
97pub const DEFAULT_PEER_SCANNER_MAX_ENTRIES: usize = 16_384;
98
99/// Default maximum total payload size, in bytes, retained in a
100/// `PeerLogScanner` queue. Once the cumulative payload bytes exceed
101/// this threshold, the oldest entries are evicted on each `push`.
102///
103/// 64 MiB matches the channel's `MAX_FRAME_PAYLOAD` and is large
104/// enough to absorb a large in-flight transaction without being
105/// large enough to OOM a small replica box.
106pub const DEFAULT_PEER_SCANNER_MAX_BYTES: usize = 64 * 1024 * 1024;
107
108/// A [`LogScanner`] backed by an in-memory queue of `(vlsn, type, payload)`
109/// entries.
110///
111/// Entries are pushed by the `ReplicaReceiver` as they arrive from the
112/// master (or from another peer). A [`FeederRunner`] driving a
113/// [`PeerScannerAdapter`] consumes entries from this queue and streams them
114/// to a downstream replica (the in-memory, env-less convenience source).
115///
116/// ## Bounded memory (F10)
117///
118/// The queue has two configurable bounds:
119///
120/// - `max_entries`: maximum entry count (default
121/// [`DEFAULT_PEER_SCANNER_MAX_ENTRIES`]).
122/// - `max_bytes`: maximum cumulative payload size in bytes
123/// (default [`DEFAULT_PEER_SCANNER_MAX_BYTES`]).
124///
125/// On `push`, if either bound is exceeded the oldest entries are
126/// evicted from the front of the queue until both bounds are
127/// satisfied. The evicted entries are no longer available for peer
128/// streaming through this scanner; downstream peers that fall behind
129/// the eviction window must catch up via the on-disk
130/// `EnvironmentLogScanner` or via network restore. This matches
131/// HA semantics where peer-to-peer log distribution is
132/// best-effort and the on-disk log is the durable source.
133///
134/// Closes finding F10 of the 2026 review.
135///
136/// Thread safety: the queue is protected by a `Mutex` so that the receiver
137/// thread (writer) and the feeder thread (reader) can operate concurrently.
138pub struct PeerLogScanner {
139 queue: Mutex<VecDeque<(u64, u8, Vec<u8>)>>,
140 /// The VLSN range currently held in `queue`: `(first, last)`.
141 /// Updated lazily on `push`; used by `GroupService` callers to determine
142 /// whether this scanner can serve a given VLSN.
143 first_vlsn: Mutex<u64>,
144 last_vlsn: Mutex<u64>,
145 /// Maximum entry count before oldest-evicting begins.
146 max_entries: usize,
147 /// Maximum cumulative payload bytes before oldest-evicting begins.
148 max_bytes: usize,
149 /// Current cumulative payload bytes (updated on every push/evict).
150 current_bytes: Mutex<usize>,
151 /// Cumulative count of entries evicted by the F10 bound (for
152 /// observability and tests).
153 evicted_count: std::sync::atomic::AtomicU64,
154}
155
156impl PeerLogScanner {
157 /// Create an empty scanner with the default F10 bounds.
158 pub fn new() -> Self {
159 Self::with_capacity(
160 DEFAULT_PEER_SCANNER_MAX_ENTRIES,
161 DEFAULT_PEER_SCANNER_MAX_BYTES,
162 )
163 }
164
165 /// Create an empty scanner with explicit bounds.
166 ///
167 /// `max_entries` and `max_bytes` are both honoured; whichever is
168 /// breached first triggers oldest-evicting on subsequent `push`
169 /// calls. Passing `usize::MAX` disables the corresponding bound
170 /// (not recommended in production).
171 pub fn with_capacity(max_entries: usize, max_bytes: usize) -> Self {
172 Self {
173 queue: Mutex::new(VecDeque::new()),
174 first_vlsn: Mutex::new(0),
175 last_vlsn: Mutex::new(0),
176 max_entries: max_entries.max(1),
177 max_bytes: max_bytes.max(1),
178 current_bytes: Mutex::new(0),
179 evicted_count: std::sync::atomic::AtomicU64::new(0),
180 }
181 }
182
183 /// Push a log entry into the scanner's queue.
184 ///
185 /// Called by the `ReplicaReceiver` each time an entry is applied.
186 /// Entries are expected to be pushed in VLSN order, but this method is
187 /// not enforcing: every entry is appended to the queue unconditionally
188 /// and the cached `(first_vlsn, last_vlsn)` range is widened to cover
189 /// the new VLSN. Out-of-order or duplicate entries are filtered later
190 /// by [`LogScanner::next_entry`](crate::stream::feeder::LogScanner),
191 /// which skips entries with `vlsn < from_vlsn`.
192 ///
193 /// **F10 bound**: after the new entry is appended, if the queue
194 /// exceeds either `max_entries` or `max_bytes`, the oldest entries
195 /// are evicted from the front until both bounds are satisfied. The
196 /// retained `first_vlsn` is updated to the new front-of-queue VLSN
197 /// so downstream peers that ask for an evicted VLSN range observe
198 /// `log_range().first > from_vlsn` and know they must catch up via
199 /// the durable log.
200 pub fn push(&self, vlsn: u64, entry_type: u8, payload: Vec<u8>) {
201 let payload_len = payload.len();
202 {
203 let mut last = self.last_vlsn.lock();
204 if vlsn > *last {
205 *last = vlsn;
206 }
207 }
208 let mut q = self.queue.lock();
209 let mut current_bytes = self.current_bytes.lock();
210 // Append unconditionally.
211 q.push_back((vlsn, entry_type, payload));
212 *current_bytes += payload_len;
213
214 // F10 eviction: drop oldest until both bounds are honoured.
215 let mut evicted = 0u64;
216 while q.len() > self.max_entries || *current_bytes > self.max_bytes {
217 if let Some((_evicted_vlsn, _ty, evicted_payload)) = q.pop_front() {
218 *current_bytes =
219 current_bytes.saturating_sub(evicted_payload.len());
220 evicted += 1;
221 } else {
222 break;
223 }
224 }
225 if evicted > 0 {
226 self.evicted_count
227 .fetch_add(evicted, std::sync::atomic::Ordering::Relaxed);
228 }
229 // Refresh first_vlsn from the (possibly mutated) queue front.
230 let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
231 drop(current_bytes);
232 drop(q);
233 *self.first_vlsn.lock() = new_first;
234 }
235
236 /// Cumulative number of entries dropped by the F10 bound since
237 /// scanner construction. Useful for monitoring whether downstream
238 /// peers are keeping up.
239 pub fn evicted_count(&self) -> u64 {
240 self.evicted_count.load(std::sync::atomic::Ordering::Relaxed)
241 }
242
243 /// Current cumulative payload size in bytes (live snapshot).
244 pub fn current_bytes(&self) -> usize {
245 *self.current_bytes.lock()
246 }
247
248 /// Return the VLSN range currently held in this scanner.
249 ///
250 /// Returns `None` if the scanner is empty (no entries pushed yet).
251 pub fn log_range(&self) -> Option<(u64, u64)> {
252 let first = *self.first_vlsn.lock();
253 let last = *self.last_vlsn.lock();
254 if first == 0 { None } else { Some((first, last)) }
255 }
256
257 /// Return the number of entries currently queued.
258 pub fn len(&self) -> usize {
259 self.queue.lock().len()
260 }
261
262 /// Returns true if no entries are queued.
263 pub fn is_empty(&self) -> bool {
264 self.queue.lock().is_empty()
265 }
266}
267
268impl Default for PeerLogScanner {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274impl LogScanner for PeerLogScanner {
275 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
276 let mut q = self.queue.lock();
277 // Skip entries with VLSN < from_vlsn (they were already seen by the
278 // downstream replica). Track byte-budget reduction so the F10
279 // bound stays accurate.
280 let mut current_bytes = self.current_bytes.lock();
281 while let Some(&(vlsn, _, _)) = q.front() {
282 if vlsn >= from_vlsn {
283 let entry = q.pop_front();
284 if let Some((_, _, ref payload)) = entry {
285 *current_bytes =
286 current_bytes.saturating_sub(payload.len());
287 }
288 let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
289 drop(current_bytes);
290 drop(q);
291 *self.first_vlsn.lock() = new_first;
292 return entry;
293 }
294 if let Some((_, _, evicted_payload)) = q.pop_front() {
295 *current_bytes =
296 current_bytes.saturating_sub(evicted_payload.len());
297 }
298 }
299 let new_first = q.front().map(|(v, _, _)| *v).unwrap_or(0);
300 drop(current_bytes);
301 drop(q);
302 *self.first_vlsn.lock() = new_first;
303 None
304 }
305}
306
307// ---------------------------------------------------------------------------
308// PeerFeederSource — Arc-wrapped PeerLogScanner that implements LogScanner
309// ---------------------------------------------------------------------------
310
311/// A shared, `Arc`-wrapped `PeerLogScanner` that can be passed between
312/// threads.
313///
314/// The `ReplicaReceiver` holds an `Arc<PeerFeederSource>` and calls
315/// `push()` as entries arrive. A [`FeederRunner`] driving a
316/// [`PeerScannerAdapter`] holds another clone of the inner scanner to stream
317/// the entries to the downstream.
318/// and calls `next_entry()` to stream those entries downstream.
319pub struct PeerFeederSource(pub Arc<PeerLogScanner>);
320
321impl PeerFeederSource {
322 /// Create a new `PeerFeederSource` backed by a fresh `PeerLogScanner`.
323 pub fn new() -> Self {
324 Self(Arc::new(PeerLogScanner::new()))
325 }
326
327 /// Return a clone of the inner `Arc<PeerLogScanner>` for the receiver
328 /// thread to use when pushing entries.
329 pub fn clone_scanner(&self) -> Arc<PeerLogScanner> {
330 Arc::clone(&self.0)
331 }
332}
333
334impl Default for PeerFeederSource {
335 fn default() -> Self {
336 Self::new()
337 }
338}
339
340/// Adapter so `PeerFeederSource` can be used directly as a `LogScanner`.
341///
342/// We implement `LogScanner` on the *mutable reference* side: since
343/// `PeerFeederSource` is `Arc`-wrapped, we implement on a thin wrapper
344/// struct that holds an `Arc` and a local cursor.
345pub struct PeerScannerAdapter {
346 source: Arc<PeerLogScanner>,
347 cursor_vlsn: u64,
348}
349
350impl PeerScannerAdapter {
351 /// Create a new adapter starting from `start_vlsn`.
352 pub fn new(source: Arc<PeerLogScanner>, start_vlsn: u64) -> Self {
353 Self { source, cursor_vlsn: start_vlsn }
354 }
355}
356
357impl LogScanner for PeerScannerAdapter {
358 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
359 let effective_from = self.cursor_vlsn.max(from_vlsn);
360 let entry = {
361 let mut q = self.source.queue.lock();
362 let mut result = None;
363 while let Some(&(vlsn, _, _)) = q.front() {
364 if vlsn >= effective_from {
365 result = q.pop_front();
366 break;
367 }
368 q.pop_front(); // discard stale entries
369 }
370 result
371 };
372 if let Some((vlsn, _, _)) = &entry {
373 self.cursor_vlsn = vlsn + 1;
374 }
375 entry
376 }
377}
378
379// ---------------------------------------------------------------------------
380// Syncup helpers
381// ---------------------------------------------------------------------------
382
383/// Result of a peer syncup negotiation.
384///
385/// HA, `FeederReplicaSyncup` finds the highest VLSN that is committed
386/// on BOTH the feeder and the replica (the "matchpoint"). The feeder then
387/// streams entries from matchpoint+1 onwards.
388///
389/// We model this as a simple VLSN range comparison: if the peer holds
390/// `[peer_first, peer_last]` and the replica needs `replica_needs` onwards,
391/// we can serve if `peer_first <= replica_needs <= peer_last`.
392#[derive(Debug, Clone, PartialEq, Eq)]
393pub enum SyncupResult {
394 /// The peer holds the needed range; stream from `start_vlsn`.
395 CanServe { start_vlsn: u64 },
396 /// The peer does not hold the needed VLSN; fall back to master or
397 /// network restore.
398 NeedsRestore,
399}
400
401/// Determine whether `peer_range` can serve a replica that needs
402/// log entries starting from `replica_needs`.
403///
404/// This is the range-availability check only (CanServe/NeedsRestore). The
405/// diverged-tail matchpoint search + rollback decision (REP-1 STEP 5) lives in
406/// [`crate::stream::syncup`] (`find_matchpoint` + `verify_rollback`, ported
407/// from JE `ReplicaFeederSyncup`); the live driver replaces this range check
408/// with that decision core once the syncup wire protocol + backward reader
409/// land. See `docs/src/maintainer/design-decisions.md` (REP-1).
410pub fn negotiate_syncup(
411 peer_range: Option<(u64, u64)>,
412 replica_needs: u64,
413) -> SyncupResult {
414 match peer_range {
415 Some((first, last))
416 if first <= replica_needs && replica_needs <= last =>
417 {
418 SyncupResult::CanServe { start_vlsn: replica_needs }
419 }
420 _ => SyncupResult::NeedsRestore,
421 }
422}
423
424// ---------------------------------------------------------------------------
425// PeerFeederService — TCP ServiceHandler backed by a PeerLogScanner
426// ---------------------------------------------------------------------------
427
428/// A [`ServiceHandler`] that streams peer-held log entries to a requesting
429/// downstream replica over the `"PEER_FEEDER"` service.
430///
431/// The service is registered on the `TcpServiceDispatcher` at startup.
432/// When a downstream replica connects, the protocol is:
433///
434/// 1. The downstream sends `[start_vlsn: u64 LE]` (8 bytes) — the first VLSN
435/// it needs.
436/// 2. The server negotiates via `negotiate_syncup()`.
437/// 3. If the range is available, a [`FeederRunner`] (driving an
438/// [`EnvironmentLogScanner`] for the WAL path or a [`PeerScannerAdapter`]
439/// for the in-memory path) streams entries until the channel closes.
440/// 4. If the range is not available, the server responds with
441/// `[NEEDS_RESTORE: u8 = 1]` and closes the connection.
442///
443/// The `PeerLogScanner` (`source`) is populated by the node's own
444/// `ReplicaReceiver` as entries arrive from the master.
445pub struct PeerFeederService {
446 source: Arc<PeerLogScanner>,
447 /// Optional WAL-backed feeder source for chained replication.
448 ///
449 /// When `Some`, the service serves the requested VLSN range from this
450 /// node's OWN WAL via an [`EnvironmentLogScanner`] driven by a
451 /// [`FeederRunner`] — the *same* machinery the master uses
452 /// ([`crate::stream::feeder`]). This is JE's cascading-feeder model:
453 /// `FeederSource` is "a real Master OR a Replica in a Replica chain that
454 /// is replaying log records it received from some other source"
455 /// (`FeederSource.java`); `MasterFeederSource` reads the VLSNIndex + log
456 /// on whatever node hosts it.
457 ///
458 /// `None` (default) preserves the in-memory `PeerLogScanner` pull path.
459 wal_source: Option<WalFeederSource>,
460 /// Count of downstream connections served via the WAL `FeederRunner` +
461 /// `EnvironmentLogScanner` path (the JE `Feeder` + `MasterFeederSource`
462 /// mechanism). Lets the owning [`crate::ReplicatedEnvironment`] — and
463 /// tests — PROVE that a cascading replica feeds downstream via the same
464 /// mechanism the master uses, not the in-memory pull fallback.
465 wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
466}
467
468/// WAL-backed feeder source for a chained (replica-to-replica) feed.
469///
470/// Holds a live [`EnvironmentImpl`] (whose WAL carries VLSN-tagged 22-byte
471/// headers written by [`crate::stream::replica_stream::EnvironmentLogWriter`])
472/// and the shared [`VlsnIndex`] used to negotiate the available VLSN range.
473///
474/// Faithful to JE `MasterFeederSource(repImpl, vlsnIndex, startVLSN)` — the
475/// feeder source is constructed from the environment + VLSN index regardless
476/// of whether the node is master or replica.
477#[derive(Clone)]
478pub struct WalFeederSource {
479 env: Arc<noxu_dbi::EnvironmentImpl>,
480 vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
481}
482
483impl WalFeederSource {
484 /// Create a WAL-backed feeder source.
485 pub fn new(
486 env: Arc<noxu_dbi::EnvironmentImpl>,
487 vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
488 ) -> Self {
489 Self { env, vlsn_index }
490 }
491}
492
493impl PeerFeederService {
494 /// Create a new service backed by an in-memory `source` (pull path).
495 pub fn new(source: Arc<PeerLogScanner>) -> Self {
496 Self {
497 source,
498 wal_source: None,
499 wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
500 }
501 }
502
503 /// Create a service that ALSO serves the chained-replication WAL feed.
504 ///
505 /// When a downstream replica connects, the service prefers the WAL
506 /// source: it negotiates the VLSN range from the [`VlsnIndex`] and, if
507 /// it can serve, streams entries from this node's WAL via an
508 /// [`EnvironmentLogScanner`] + [`FeederRunner`] (the same path the
509 /// master uses). If the WAL cannot serve the requested range it falls
510 /// back to the in-memory `source`, then to `NEEDS_RESTORE`.
511 ///
512 /// Used by [`crate::ReplicatedEnvironment::become_replica`] when
513 /// `cascade_feeding` is enabled and a live `EnvironmentImpl` is wired.
514 pub fn with_wal_source(
515 source: Arc<PeerLogScanner>,
516 wal_source: WalFeederSource,
517 ) -> Self {
518 Self {
519 source,
520 wal_source: Some(wal_source),
521 wal_feeds_served: Arc::new(std::sync::atomic::AtomicU64::new(0)),
522 }
523 }
524
525 /// Like [`Self::with_wal_source`] but shares `wal_feeds_served` with the
526 /// owning [`crate::ReplicatedEnvironment`] so it (and tests) can PROVE
527 /// the node served a downstream via the JE `Feeder`/`MasterFeederSource`
528 /// path (`FeederRunner + EnvironmentLogScanner`).
529 pub fn with_wal_source_counted(
530 source: Arc<PeerLogScanner>,
531 wal_source: WalFeederSource,
532 wal_feeds_served: Arc<std::sync::atomic::AtomicU64>,
533 ) -> Self {
534 Self { source, wal_source: Some(wal_source), wal_feeds_served }
535 }
536
537 /// Number of downstream connections this service has served via the JE
538 /// `Feeder`/`MasterFeederSource` path (`FeederRunner +
539 /// EnvironmentLogScanner`). `0` means no cascade/WAL feed has run yet.
540 pub fn wal_feeds_served(&self) -> u64 {
541 self.wal_feeds_served.load(std::sync::atomic::Ordering::SeqCst)
542 }
543}
544
545/// Wire-level response codes sent by the server.
546const PEER_FEEDER_CAN_SERVE: u8 = 0;
547const PEER_FEEDER_NEEDS_RESTORE: u8 = 1;
548
549impl ServiceHandler for PeerFeederService {
550 fn service_name(&self) -> &str {
551 PEER_FEEDER_SERVICE_NAME
552 }
553
554 fn handle(&self, channel: Box<dyn Channel>) -> Result<()> {
555 use std::time::Duration;
556
557 // 1. Read the 8-byte start_vlsn from the downstream replica.
558 let msg =
559 channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
560 RepError::NetworkError(
561 "PEER_FEEDER: no start_vlsn received".into(),
562 )
563 })?;
564
565 if msg.len() < 8 {
566 return Err(RepError::NetworkError(format!(
567 "PEER_FEEDER: short handshake ({} bytes)",
568 msg.len()
569 )));
570 }
571 let start_vlsn =
572 u64::from_le_bytes(msg[..8].try_into().expect("slice of 8 bytes"));
573
574 // 2a. Chained replication (cascade): if a WAL source is wired, prefer
575 // serving the downstream from THIS node's own WAL via the same
576 // EnvironmentLogScanner + FeederRunner the master uses. Faithful
577 // to JE's cascading-feeder model (see `WalFeederSource`).
578 //
579 // The downstream sends start_vlsn=0 to mean "from the beginning";
580 // we serve from the first VLSN our VLSNIndex holds. We can serve
581 // iff the requested start falls within `[first, last]` (or the
582 // range is non-empty and start_vlsn==0).
583 if let Some(wal) = &self.wal_source {
584 let range = wal.vlsn_index.get_range();
585 let (first, last) = (range.first(), range.last());
586 let have_data = last > 0 && first > 0;
587 let effective_start =
588 if start_vlsn == 0 { first } else { start_vlsn };
589 let can_serve = have_data
590 && effective_start >= first
591 && effective_start <= last;
592 if can_serve {
593 channel.send(&[PEER_FEEDER_CAN_SERVE])?;
594 let channel_arc: Arc<dyn Channel> = Arc::from(channel);
595 // EnvironmentLogScanner starts at the log beginning and skips
596 // entries with vlsn < effective_start; the FeederRunner then
597 // streams VLSN-ordered entries exactly as the master does.
598 //
599 // JE fidelity: this is JE `Feeder` running `MasterFeederSource`
600 // — `Feeder.initMasterFeederSource(startVLSN)` builds
601 // `new MasterFeederSource(repImpl, repNode.getVLSNIndex(), …)`
602 // and the output loop pulls
603 // `feederSource.getWireRecord(feederVLSN, heartbeatMs)`
604 // (`Feeder.java:1282`). Here `FeederRunner` IS that loop and
605 // `EnvironmentLogScanner` IS that `MasterFeederSource`
606 // (a `FeederReader` over the VLSNIndex+WAL). A cascading
607 // replica reaches this branch with `wal.env` = its OWN env, so
608 // it feeds downstream by the IDENTICAL mechanism the master
609 // uses — reading its own WAL.
610 let mut scanner =
611 match EnvironmentLogScanner::new(&wal.env, None) {
612 Some(s) => s,
613 None => {
614 // WAL scanner unavailable (read-only env / no log):
615 // the CAN_SERVE byte was already sent, so close the
616 // channel; the downstream will retry / fall back.
617 return Err(RepError::NetworkError(
618 "PEER_FEEDER: WAL scanner unavailable".into(),
619 ));
620 }
621 };
622 // Record (for the env + tests) that THIS connection was served
623 // by the JE `Feeder`/`MasterFeederSource` path — the proof
624 // that the cascade uses the same mechanism as the master.
625 self.wal_feeds_served
626 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
627 let runner = FeederRunner::new(channel_arc, effective_start);
628 let _ = runner.run(&mut scanner);
629 return Ok(());
630 }
631 // WAL cannot serve the requested range: fall through to the
632 // in-memory pull path, then to NEEDS_RESTORE. A downstream that
633 // asks for an evicted/old range that the mid-tier no longer holds
634 // must catch up via network restore or fall back to the master.
635 }
636
637 // 2. Negotiate: do we hold the requested VLSN range in memory?
638 let range = self.source.log_range();
639 match negotiate_syncup(range, start_vlsn) {
640 SyncupResult::CanServe { start_vlsn: sv } => {
641 // Tell the downstream it can proceed.
642 channel.send(&[PEER_FEEDER_CAN_SERVE])?;
643
644 // 3. Stream entries in this thread (the dispatcher already
645 // called us from a per-connection thread).
646 //
647 // SAME feeder loop as the WAL/cascade path: ONE
648 // `FeederRunner` (JE `Feeder`) driving ONE `LogScanner`
649 // (JE `FeederSource`). Here the source is the in-memory
650 // `PeerScannerAdapter` (non-JE convenience for env-less
651 // nodes), not the WAL `EnvironmentLogScanner`. There is no
652 // second feeder mechanism — only a second source.
653 let channel_arc: Arc<dyn Channel> = Arc::from(channel);
654 let mut source =
655 PeerScannerAdapter::new(Arc::clone(&self.source), sv);
656 let runner = FeederRunner::new(channel_arc, sv);
657 let _ = runner.run(&mut source);
658 Ok(())
659 }
660 SyncupResult::NeedsRestore => {
661 channel.send(&[PEER_FEEDER_NEEDS_RESTORE])?;
662 Err(RepError::NetworkError(format!(
663 "PEER_FEEDER: cannot serve vlsn={start_vlsn}, \
664 range={range:?}"
665 )))
666 }
667 }
668 }
669}
670
671// ---------------------------------------------------------------------------
672// Client-side peer catch-up
673// ---------------------------------------------------------------------------
674
675/// Connect to a peer node's `PEER_FEEDER` service and pull log entries
676/// starting from `start_vlsn`.
677///
678/// This is the client counterpart to [`PeerFeederService`]. It is called
679/// by a replica that is behind and wants to catch up from a peer that holds
680/// the needed VLSN range (rather than routing all traffic through the master).
681///
682/// Protocol (matches `PeerFeederService::handle`):
683/// 1. Open a TCP connection and request the `"PEER_FEEDER"` service via
684/// `service_dispatcher::connect_to_service()`.
685/// 2. Send `[start_vlsn: u64 LE]`.
686/// 3. Read the one-byte response:
687/// - `0` (`PEER_FEEDER_CAN_SERVE`) — peer has the range; proceed.
688/// - `1` (`PEER_FEEDER_NEEDS_RESTORE`) — peer cannot serve; return
689/// `Ok(false)` so the caller can fall back to the master.
690/// 4. Start a `ReplicaReceiver` loop on the channel, passing each entry
691/// to `log_writer`. Returns `Ok(true)` when the peer closes the
692/// channel (i.e. the catch-up is complete).
693///
694/// # Pipelining
695///
696/// `catch_up_from_peer` is intentionally non-async. Call it from a
697/// dedicated thread per peer. To pipeline catch-up from multiple peers
698/// simultaneously, spawn one thread per peer (e.g. from a `ThreadPool`).
699/// The [`MultiPeerCatchUp`] helper below manages this.
700pub fn catch_up_from_peer(
701 peer_addr: std::net::SocketAddr,
702 start_vlsn: u64,
703 log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
704) -> Result<bool> {
705 use crate::net::service_dispatcher::connect_to_service;
706 use crate::stream::replica_stream::ReplicaReceiver;
707 use std::sync::Arc;
708 use std::time::Duration;
709
710 // Connect and request the PEER_FEEDER service.
711 let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
712
713 // Send start_vlsn.
714 channel.send(&start_vlsn.to_le_bytes())?;
715
716 // Read the one-byte response.
717 let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
718 RepError::NetworkError("no response from peer feeder".into())
719 })?;
720 if resp.is_empty() {
721 return Err(RepError::NetworkError(
722 "empty response from peer feeder".into(),
723 ));
724 }
725 match resp[0] {
726 PEER_FEEDER_CAN_SERVE => {}
727 PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
728 other => {
729 return Err(RepError::ProtocolError(format!(
730 "peer feeder unknown response byte: {other:#x}"
731 )));
732 }
733 }
734
735 // Run the replica receive loop.
736 let channel_arc: Arc<dyn Channel> = Arc::from(channel);
737 let receiver = ReplicaReceiver::new(channel_arc);
738 receiver.run(log_writer)?;
739
740 Ok(true)
741}
742
743/// Like [`catch_up_from_peer`] but polls `shutdown` so the receive loop can
744/// exit promptly when the environment is closing (used by
745/// [`crate::ReplicatedEnvironment::become_replica`]'s I/O thread so
746/// `close()` can join it even while the upstream feeder stays connected).
747pub fn catch_up_from_peer_until(
748 peer_addr: std::net::SocketAddr,
749 start_vlsn: u64,
750 log_writer: &mut dyn crate::stream::replica_stream::LogWriter,
751 shutdown: &std::sync::atomic::AtomicBool,
752) -> Result<bool> {
753 use crate::net::service_dispatcher::connect_to_service;
754 use crate::stream::replica_stream::ReplicaReceiver;
755 use std::sync::Arc;
756 use std::time::Duration;
757
758 let channel = connect_to_service(peer_addr, PEER_FEEDER_SERVICE_NAME)?;
759 channel.send(&start_vlsn.to_le_bytes())?;
760 let resp = channel.receive(Duration::from_secs(30))?.ok_or_else(|| {
761 RepError::NetworkError("no response from peer feeder".into())
762 })?;
763 if resp.is_empty() {
764 return Err(RepError::NetworkError(
765 "empty response from peer feeder".into(),
766 ));
767 }
768 match resp[0] {
769 PEER_FEEDER_CAN_SERVE => {}
770 PEER_FEEDER_NEEDS_RESTORE => return Ok(false),
771 other => {
772 return Err(RepError::ProtocolError(format!(
773 "peer feeder unknown response byte: {other:#x}"
774 )));
775 }
776 }
777
778 let channel_arc: Arc<dyn Channel> = Arc::from(channel);
779 let receiver = ReplicaReceiver::new(channel_arc);
780 receiver.run_until(log_writer, Some(shutdown))?;
781
782 Ok(true)
783}
784
785/// Pipelined catch-up from multiple peer nodes simultaneously.
786///
787/// Spawns one thread per peer in `peers` and waits for all to finish (or
788/// for the first to succeed). Returns the name of the peer that supplied
789/// the entries, or `None` if no peer could serve the range.
790///
791/// The `log_writer_factory` closure is called once per thread to produce a
792/// per-thread `LogWriter`. The factory must be `Send + Sync`.
793pub struct MultiPeerCatchUp {
794 peers: Vec<(String, std::net::SocketAddr)>,
795 start_vlsn: u64,
796}
797
798impl MultiPeerCatchUp {
799 /// Create a new multi-peer catch-up request.
800 ///
801 /// `peers` is a list of `(node_name, socket_addr)` pairs to try.
802 pub fn new(
803 peers: Vec<(String, std::net::SocketAddr)>,
804 start_vlsn: u64,
805 ) -> Self {
806 Self { peers, start_vlsn }
807 }
808
809 /// Run pipelined catch-up.
810 ///
811 /// Spawns one thread per peer and waits for the first to succeed.
812 /// Each thread calls `catch_up_from_peer`; the winning thread's entries
813 /// are applied through `make_writer()`. Other threads are joined once
814 /// the first succeeds.
815 ///
816 /// Returns the name of the first peer that successfully served the range,
817 /// or `None` if all peers declined.
818 pub fn run<F, W>(self, make_writer: F) -> Option<String>
819 where
820 F: Fn() -> W + Send + Sync + 'static,
821 W: crate::stream::replica_stream::LogWriter + Send + 'static,
822 {
823 use std::sync::atomic::{AtomicBool, Ordering};
824 let make_writer = std::sync::Arc::new(make_writer);
825 let done = std::sync::Arc::new(AtomicBool::new(false));
826 let winner: std::sync::Arc<noxu_sync::Mutex<Option<String>>> =
827 std::sync::Arc::new(noxu_sync::Mutex::new(None));
828
829 let mut handles = Vec::new();
830
831 for (name, addr) in self.peers {
832 let make_writer = std::sync::Arc::clone(&make_writer);
833 let done = std::sync::Arc::clone(&done);
834 let winner = std::sync::Arc::clone(&winner);
835 let start_vlsn = self.start_vlsn;
836 let name_clone = name.clone();
837
838 let handle = std::thread::Builder::new()
839 .name(format!("noxu-peer-catchup-{}", name))
840 .spawn(move || {
841 if done.load(Ordering::Acquire) {
842 return; // another peer already won
843 }
844 let mut writer = make_writer();
845 match catch_up_from_peer(addr, start_vlsn, &mut writer) {
846 Ok(true) => {
847 if !done.swap(true, Ordering::AcqRel) {
848 *winner.lock() = Some(name_clone);
849 }
850 }
851 Ok(false) => {
852 log::debug!(
853 "peer '{}' cannot serve vlsn={start_vlsn}",
854 name
855 );
856 }
857 Err(e) => {
858 log::warn!(
859 "catch-up from peer '{}' failed: {e}",
860 name
861 );
862 }
863 }
864 })
865 .expect("failed to spawn peer catch-up thread");
866
867 handles.push(handle);
868 }
869
870 for h in handles {
871 let _ = h.join();
872 }
873
874 winner.lock().clone()
875 }
876}
877
878// ---------------------------------------------------------------------------
879// Tests
880// ---------------------------------------------------------------------------
881
882#[cfg(test)]
883mod tests {
884 use super::*;
885 use crate::net::channel::LocalChannelPair;
886 use std::time::Duration;
887
888 // -----------------------------------------------------------------------
889 // PeerLogScanner unit tests
890 // -----------------------------------------------------------------------
891
892 #[test]
893 fn test_peer_scanner_push_and_log_range() {
894 let scanner = PeerLogScanner::new();
895 assert!(scanner.is_empty());
896 assert!(scanner.log_range().is_none());
897
898 scanner.push(5, 1, b"entry5".to_vec());
899 scanner.push(6, 1, b"entry6".to_vec());
900 scanner.push(10, 1, b"entry10".to_vec());
901
902 assert_eq!(scanner.len(), 3);
903 assert_eq!(scanner.log_range(), Some((5, 10)));
904 }
905
906 #[test]
907 fn test_peer_scanner_next_entry_in_order() {
908 let mut scanner = PeerLogScanner::new();
909 for vlsn in [3u64, 4, 5, 6, 7] {
910 scanner.push(vlsn, 1, vlsn.to_le_bytes().to_vec());
911 }
912
913 // Ask from_vlsn=4 — should skip 3 and return 4, 5, 6, 7.
914 let mut results = Vec::new();
915 while let Some((vlsn, _, _)) = scanner.next_entry(4) {
916 results.push(vlsn);
917 }
918 assert_eq!(results, vec![4, 5, 6, 7]);
919 }
920
921 #[test]
922 fn test_peer_scanner_skips_stale_entries() {
923 let mut scanner = PeerLogScanner::new();
924 for v in [1u64, 2, 3, 10, 11] {
925 scanner.push(v, 1, vec![v as u8]);
926 }
927 // Ask from 10 — entries 1, 2, 3 are stale.
928 let entry = scanner.next_entry(10);
929 assert_eq!(entry.map(|(v, _, _)| v), Some(10));
930 }
931
932 #[test]
933 fn test_peer_scanner_empty_returns_none() {
934 let mut scanner = PeerLogScanner::new();
935 assert!(scanner.next_entry(1).is_none());
936 }
937
938 // -----------------------------------------------------------------------
939 // PeerScannerAdapter unit tests
940 // -----------------------------------------------------------------------
941
942 #[test]
943 fn test_peer_scanner_adapter_cursor_advances() {
944 let source = Arc::new(PeerLogScanner::new());
945 for v in [1u64, 2, 3, 4, 5] {
946 source.push(v, 1, vec![v as u8]);
947 }
948
949 let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 1);
950 let mut seen = Vec::new();
951 while let Some((v, _, _)) = adapter.next_entry(1) {
952 seen.push(v);
953 }
954 assert_eq!(seen, vec![1, 2, 3, 4, 5]);
955 }
956
957 // -----------------------------------------------------------------------
958 // In-memory source streamed via the shared FeederRunner loop
959 // -----------------------------------------------------------------------
960
961 #[test]
962 fn test_in_memory_source_streams_to_replica_via_feeder_runner() {
963 // The in-memory `PeerLogScanner` feeds through the SAME `FeederRunner`
964 // loop the WAL path uses — only the `LogScanner` source differs.
965 let source = Arc::new(PeerLogScanner::new());
966 for v in [10u64, 11, 12, 13, 14] {
967 source.push(v, 1, format!("payload-{v}").into_bytes());
968 }
969
970 let pair = LocalChannelPair::new();
971 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
972 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
973
974 // Receiver: collect all frames.
975 let recv_handle = {
976 let receiver = Arc::clone(&receiver);
977 std::thread::spawn(move || {
978 let mut vlsns = Vec::new();
979 // Expect 5 frames then a timeout.
980 for _ in 0..5 {
981 let frame = receiver
982 .receive(Duration::from_secs(5))
983 .unwrap()
984 .unwrap();
985 let vlsn =
986 u64::from_le_bytes(frame[0..8].try_into().unwrap());
987 vlsns.push(vlsn);
988 // Send ack.
989 let _ = receiver.send(&vlsn.to_le_bytes());
990 }
991 vlsns
992 })
993 };
994
995 let sender_clone = Arc::clone(&sender);
996 let run_handle = std::thread::spawn(move || {
997 let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 10);
998 let runner = FeederRunner::new(sender, 10);
999 let _ = runner.run(&mut adapter);
1000 });
1001
1002 // Wait for receiver to collect all 5 frames.
1003 let vlsns = recv_handle.join().unwrap();
1004 assert_eq!(vlsns, vec![10, 11, 12, 13, 14]);
1005
1006 sender_clone.close().unwrap();
1007 let _ = run_handle.join();
1008 }
1009
1010 // -----------------------------------------------------------------------
1011 // negotiate_syncup tests
1012 // -----------------------------------------------------------------------
1013
1014 #[test]
1015 fn test_negotiate_syncup_can_serve() {
1016 assert_eq!(
1017 negotiate_syncup(Some((5, 20)), 10),
1018 SyncupResult::CanServe { start_vlsn: 10 }
1019 );
1020 // Exact boundary.
1021 assert_eq!(
1022 negotiate_syncup(Some((10, 10)), 10),
1023 SyncupResult::CanServe { start_vlsn: 10 }
1024 );
1025 }
1026
1027 #[test]
1028 fn test_negotiate_syncup_needs_restore_too_early() {
1029 // Replica needs VLSN 3 but peer only has [5, 20].
1030 assert_eq!(
1031 negotiate_syncup(Some((5, 20)), 3),
1032 SyncupResult::NeedsRestore
1033 );
1034 }
1035
1036 #[test]
1037 fn test_negotiate_syncup_needs_restore_too_late() {
1038 // Replica needs VLSN 25 but peer only has [5, 20].
1039 assert_eq!(
1040 negotiate_syncup(Some((5, 20)), 25),
1041 SyncupResult::NeedsRestore
1042 );
1043 }
1044
1045 #[test]
1046 fn test_negotiate_syncup_no_range() {
1047 // Peer has no log range (just joined or restoring).
1048 assert_eq!(negotiate_syncup(None, 10), SyncupResult::NeedsRestore);
1049 }
1050
1051 // -----------------------------------------------------------------------
1052 // GroupService CBVLSN integration
1053 // -----------------------------------------------------------------------
1054
1055 #[test]
1056 fn test_group_service_cbvlsn_tracks_minimum() {
1057 use crate::group_service::{GroupService, NodeInfo};
1058 use crate::node_type::NodeType;
1059 use std::time::Instant;
1060
1061 let gs = GroupService::new("test_group".to_string());
1062
1063 // Add 3 electable nodes.
1064 for (name, port) in [("n1", 5001u16), ("n2", 5002), ("n3", 5003)] {
1065 gs.add_node(NodeInfo {
1066 name: name.to_string(),
1067 node_type: NodeType::Electable,
1068 host: "localhost".to_string(),
1069 port,
1070 node_id: port as u32,
1071 joined_at: Instant::now(),
1072 last_seen: Instant::now(),
1073 is_active: true,
1074 known_vlsn: 0,
1075 log_range: None,
1076 read_capacity_pct: 100,
1077 write_capacity_pct: 100,
1078 latency_hint_ms: 1,
1079 })
1080 .unwrap();
1081 }
1082
1083 // Initially all known_vlsn = 0 → CBVLSN = 0.
1084 assert_eq!(gs.get_cbvlsn(), 0);
1085
1086 // Update n1 and n2 but not n3 → CBVLSN = min(50, 40, 0) = 0.
1087 gs.update_node_vlsn("n1", 50);
1088 gs.update_node_vlsn("n2", 40);
1089 assert_eq!(gs.get_cbvlsn(), 0, "n3 still at 0, CBVLSN must be 0");
1090
1091 // Now n3 also updates → CBVLSN = min(50, 40, 30) = 30.
1092 gs.update_node_vlsn("n3", 30);
1093 assert_eq!(gs.get_cbvlsn(), 30);
1094
1095 // n3 advances → min(50, 40, 45) = 40.
1096 gs.update_node_vlsn("n3", 45);
1097 assert_eq!(gs.get_cbvlsn(), 40);
1098 }
1099
1100 #[test]
1101 fn test_group_service_cbvlsn_monotone_nondecreasing() {
1102 use crate::group_service::{GroupService, NodeInfo};
1103 use crate::node_type::NodeType;
1104 use std::time::Instant;
1105
1106 let gs = GroupService::new("cbvlsn_monotone".to_string());
1107
1108 for (name, port) in [("a", 5001u16), ("b", 5002)] {
1109 gs.add_node(NodeInfo {
1110 name: name.to_string(),
1111 node_type: NodeType::Electable,
1112 host: "localhost".to_string(),
1113 port,
1114 node_id: port as u32,
1115 joined_at: Instant::now(),
1116 last_seen: Instant::now(),
1117 is_active: true,
1118 known_vlsn: 0,
1119 log_range: None,
1120 read_capacity_pct: 100,
1121 write_capacity_pct: 100,
1122 latency_hint_ms: 1,
1123 })
1124 .unwrap();
1125 }
1126
1127 // CBVLSN must never decrease.
1128 let mut prev = 0u64;
1129 for (na, va, nb, vb) in [
1130 ("a", 10u64, "b", 5u64),
1131 ("a", 20, "b", 15),
1132 ("a", 25, "b", 22),
1133 ("a", 30, "b", 28),
1134 ] {
1135 gs.update_node_vlsn(na, va);
1136 gs.update_node_vlsn(nb, vb);
1137 let cbvlsn = gs.get_cbvlsn();
1138 assert!(
1139 cbvlsn >= prev,
1140 "CBVLSN must not decrease: was {prev}, now {cbvlsn}"
1141 );
1142 prev = cbvlsn;
1143 }
1144 }
1145
1146 #[test]
1147 fn test_group_service_find_peers_with_vlsn() {
1148 use crate::group_service::{GroupService, NodeInfo};
1149 use crate::node_type::NodeType;
1150 use std::time::Instant;
1151
1152 let gs = GroupService::new("peer_select".to_string());
1153
1154 // Node a: holds [1, 100]
1155 // Node b: holds [50, 200]
1156 // Node c: no range
1157 for (name, port, range) in [
1158 ("a", 5001u16, Some((1u64, 100u64))),
1159 ("b", 5002, Some((50, 200))),
1160 ("c", 5003, None),
1161 ] {
1162 let mut info = NodeInfo {
1163 name: name.to_string(),
1164 node_type: NodeType::Electable,
1165 host: "localhost".to_string(),
1166 port,
1167 node_id: port as u32,
1168 joined_at: Instant::now(),
1169 last_seen: Instant::now(),
1170 is_active: true,
1171 known_vlsn: 0,
1172 log_range: range,
1173 read_capacity_pct: 100,
1174 write_capacity_pct: 100,
1175 latency_hint_ms: 1,
1176 };
1177 // Set last_seen differently so we can check sort order.
1178 info.last_seen = Instant::now()
1179 - std::time::Duration::from_millis(port as u64 * 10);
1180 gs.add_node(info).unwrap();
1181 }
1182
1183 // VLSN 75: only a and b hold it.
1184 let peers = gs.find_peers_with_vlsn(75);
1185 assert!(peers.contains(&"a".to_string()));
1186 assert!(peers.contains(&"b".to_string()));
1187 assert!(!peers.contains(&"c".to_string()));
1188
1189 // VLSN 150: only b holds it.
1190 let peers = gs.find_peers_with_vlsn(150);
1191 assert_eq!(peers, vec!["b".to_string()]);
1192
1193 // VLSN 201: nobody holds it.
1194 assert!(gs.find_peers_with_vlsn(201).is_empty());
1195 }
1196
1197 #[test]
1198 fn test_group_service_update_log_range() {
1199 use crate::group_service::{GroupService, NodeInfo};
1200 use crate::node_type::NodeType;
1201 use std::time::Instant;
1202
1203 let gs = GroupService::new("log_range_test".to_string());
1204 gs.add_node(NodeInfo {
1205 name: "r1".to_string(),
1206 node_type: NodeType::Electable,
1207 host: "localhost".to_string(),
1208 port: 5001,
1209 node_id: 1,
1210 joined_at: Instant::now(),
1211 last_seen: Instant::now(),
1212 is_active: true,
1213 known_vlsn: 0,
1214 log_range: None,
1215 read_capacity_pct: 100,
1216 write_capacity_pct: 100,
1217 latency_hint_ms: 1,
1218 })
1219 .unwrap();
1220
1221 // Initially no range.
1222 assert!(gs.get_node("r1").unwrap().log_range.is_none());
1223
1224 // Update range.
1225 gs.update_node_log_range("r1", 100, 500);
1226 assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 500)));
1227
1228 // Extend range.
1229 gs.update_node_log_range("r1", 100, 800);
1230 assert_eq!(gs.get_node("r1").unwrap().log_range, Some((100, 800)));
1231 }
1232
1233 // -----------------------------------------------------------------------
1234 // PeerFeederService::handle paths
1235 // -----------------------------------------------------------------------
1236
1237 #[test]
1238 fn test_peer_feeder_service_can_serve() {
1239 use crate::net::channel::LocalChannelPair;
1240
1241 // Source has range [10, 20]. Client requests start_vlsn=15
1242 // (in range). Service should respond with CAN_SERVE then
1243 // stream entries until close.
1244 let source = Arc::new(PeerLogScanner::new());
1245 for v in 10u64..=20 {
1246 source.push(v, 0, format!("e{}", v).into_bytes());
1247 }
1248 let svc = PeerFeederService::new(Arc::clone(&source));
1249
1250 let pair = LocalChannelPair::new();
1251 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1252 let client_ch = pair.channel_b;
1253
1254 // Client sends start_vlsn=15.
1255 client_ch.send(&15u64.to_le_bytes()).unwrap();
1256
1257 // Service runs in another thread so we can observe the
1258 // streaming side.
1259 let svc_handle = std::thread::spawn(move || svc.handle(server_ch));
1260
1261 // Read the 1-byte response.
1262 let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1263 assert_eq!(
1264 resp,
1265 vec![PEER_FEEDER_CAN_SERVE],
1266 "service should reply CAN_SERVE for in-range start_vlsn"
1267 );
1268
1269 // Drain a few frames then close to terminate the runner.
1270 let mut frames = 0;
1271 while let Ok(Some(_)) = client_ch.receive(Duration::from_millis(80)) {
1272 frames += 1;
1273 if frames >= 3 {
1274 break;
1275 }
1276 }
1277 // Close client side so runner returns.
1278 client_ch.close().unwrap();
1279 let _ = svc_handle.join().unwrap();
1280 assert!(frames >= 1, "service must have streamed at least one frame");
1281 }
1282
1283 #[test]
1284 fn test_peer_feeder_service_needs_restore() {
1285 use crate::net::channel::LocalChannelPair;
1286
1287 // Source has range [10, 20]. Client requests start_vlsn=5
1288 // (too early). Service replies NEEDS_RESTORE and errors.
1289 let source = Arc::new(PeerLogScanner::new());
1290 for v in 10u64..=20 {
1291 source.push(v, 0, vec![]);
1292 }
1293 let svc = PeerFeederService::new(Arc::clone(&source));
1294
1295 let pair = LocalChannelPair::new();
1296 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1297 let client_ch = pair.channel_b;
1298
1299 client_ch.send(&5u64.to_le_bytes()).unwrap();
1300
1301 let r = svc.handle(server_ch);
1302 assert!(r.is_err(), "service must return Err on NEEDS_RESTORE");
1303 let resp = client_ch.receive(Duration::from_secs(2)).unwrap().unwrap();
1304 assert_eq!(
1305 resp,
1306 vec![PEER_FEEDER_NEEDS_RESTORE],
1307 "service should reply NEEDS_RESTORE for out-of-range start_vlsn"
1308 );
1309 }
1310
1311 #[test]
1312 fn test_peer_feeder_service_short_handshake_errors() {
1313 use crate::net::channel::LocalChannelPair;
1314
1315 let source = Arc::new(PeerLogScanner::new());
1316 let svc = PeerFeederService::new(Arc::clone(&source));
1317
1318 let pair = LocalChannelPair::new();
1319 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1320 let client_ch = pair.channel_b;
1321
1322 // Send only 4 bytes — too short to be a valid u64
1323 // start_vlsn. Service must Err with "short handshake".
1324 client_ch.send(&[0u8; 4]).unwrap();
1325
1326 let r = svc.handle(server_ch);
1327 assert!(r.is_err(), "short handshake must error");
1328 let msg = format!("{}", r.err().unwrap());
1329 assert!(
1330 msg.contains("short handshake"),
1331 "expected 'short handshake' in error, got: {msg}"
1332 );
1333 }
1334
1335 #[test]
1336 fn test_peer_feeder_service_no_handshake_errors() {
1337 use crate::net::channel::LocalChannelPair;
1338
1339 let source = Arc::new(PeerLogScanner::new());
1340 let svc = PeerFeederService::new(Arc::clone(&source));
1341
1342 let pair = LocalChannelPair::new();
1343 let server_ch: Box<dyn Channel> = Box::new(pair.channel_a);
1344 // Drop client side so receive returns None (no message).
1345 drop(pair.channel_b);
1346
1347 let r = svc.handle(server_ch);
1348 assert!(r.is_err(), "no-handshake must error");
1349 }
1350
1351 #[test]
1352 fn test_peer_feeder_service_name() {
1353 let source = Arc::new(PeerLogScanner::new());
1354 let svc = PeerFeederService::new(source);
1355 assert_eq!(
1356 svc.service_name(),
1357 PEER_FEEDER_SERVICE_NAME,
1358 "service_name must match the protocol const"
1359 );
1360 }
1361
1362 // -----------------------------------------------------------------------
1363 // PeerFeederSource and adapter constructors
1364 // -----------------------------------------------------------------------
1365
1366 #[test]
1367 fn test_peer_feeder_source_default_and_clone_scanner() {
1368 let src1 = PeerFeederSource::new();
1369 // default() and new() produce the same shape.
1370 let src2 = PeerFeederSource::default();
1371 let s1 = src1.clone_scanner();
1372 let s2 = src2.clone_scanner();
1373 // Distinct underlying scanners (each PeerFeederSource owns
1374 // its own Arc).
1375 s1.push(1, 0, b"a".to_vec());
1376 assert_eq!(s1.len(), 1);
1377 assert_eq!(s2.len(), 0);
1378 }
1379
1380 #[test]
1381 fn test_peer_log_scanner_default_is_empty() {
1382 let s = PeerLogScanner::default();
1383 assert!(s.is_empty());
1384 assert_eq!(s.len(), 0);
1385 assert!(s.log_range().is_none());
1386 }
1387
1388 #[test]
1389 fn test_feeder_runner_known_replica_vlsn_initial_zero() {
1390 use crate::net::channel::LocalChannelPair;
1391
1392 let pair = LocalChannelPair::new();
1393 let channel: Arc<dyn Channel> = Arc::new(pair.channel_a);
1394 let runner = FeederRunner::new(channel, 1);
1395 assert_eq!(runner.known_replica_vlsn(), 0);
1396 }
1397
1398 // -----------------------------------------------------------------------
1399 // PeerScannerAdapter: stale-entry skipping
1400 // -----------------------------------------------------------------------
1401
1402 #[test]
1403 fn test_peer_scanner_adapter_skips_stale_via_pop_front() {
1404 // After a known_replica_vlsn advance, push some entries
1405 // that are below the new floor — the adapter should
1406 // discard them via pop_front().
1407 let source = Arc::new(PeerLogScanner::new());
1408 for v in 1u64..=5 {
1409 source.push(v, 0, vec![]);
1410 }
1411 let mut adapter = PeerScannerAdapter::new(Arc::clone(&source), 3);
1412 // First call returns vlsn=3, skipping 1 and 2.
1413 let r = adapter.next_entry(3);
1414 assert!(r.is_some());
1415 let (vlsn, _, _) = r.unwrap();
1416 assert_eq!(vlsn, 3);
1417 // 4 next.
1418 let (vlsn, _, _) = adapter.next_entry(4).unwrap();
1419 assert_eq!(vlsn, 4);
1420 }
1421}