noxu_rep/stream/replica_stream.rs
1//! Replica stream - replica-side replication receiver.
2//!
3//! Tracks the state of
4//! receiving replication data from the master, including pending entries,
5//! applied/received VLSNs, and the master's latest known VLSN.
6//!
7//! The [`ReplicaReceiver`] provides the active I/O loop that reads framed
8//! entries from the feeder channel, passes them to a [`LogWriter`], and sends
9//! acks back.
10//!
11//! [`EnvironmentLogWriter`] is the live implementation of [`LogWriter`] that
12//! writes replicated entries into the local `LogManager` and updates the
13//! VLSN index.
14
15use noxu_log::{LogEntryType, Provisional};
16use noxu_sync::Mutex;
17use std::sync::Arc;
18use std::time::Duration;
19
20use crate::error::{RepError, Result};
21use crate::net::channel::Channel;
22
23// CRC32 (Ethernet/zlib polynomial) for per-frame integrity verification.
24// Same polynomial used in feeder.rs; see docs/checksum-selection.md.
25use crc32fast::hash as crc32_hash;
26
27// ---------------------------------------------------------------------------
28// LogWriter trait
29// ---------------------------------------------------------------------------
30
31/// Sink for replicated log entries.
32///
33/// Corresponds to replay thread accepting log records and writing them
34/// to the local environment. The replica calls `write_entry` for every entry
35/// received from the master.
36pub trait LogWriter: Send {
37 /// Write a replicated log entry.
38 ///
39 /// # Arguments
40 /// * `vlsn` - The VLSN of this entry.
41 /// * `entry_type` - The log entry type byte.
42 /// * `payload` - The raw entry payload.
43 fn write_entry(
44 &mut self,
45 vlsn: u64,
46 entry_type: u8,
47 payload: &[u8],
48 ) -> Result<()>;
49}
50
51// ---------------------------------------------------------------------------
52// EnvironmentLogWriter
53// ---------------------------------------------------------------------------
54
55/// `LogWriter` implementation backed by the live `LogManager`.
56///
57/// Each `write_entry` call:
58/// 1. Resolves the `entry_type` byte to a `LogEntryType`.
59/// 2. Writes the payload to the local log via `LogManager::log()`.
60/// 3. Registers the returned LSN in the provided `vlsn_index` so that
61/// the VLSN→LSN mapping is kept up-to-date on the replica.
62///
63///
64pub struct EnvironmentLogWriter {
65 /// Shared log manager for appending replicated entries.
66 log_manager: Arc<noxu_log::LogManager>,
67 /// VLSN index: maps VLSN → (file_number, file_offset) on this replica.
68 vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
69 /// REP-7 (B): optional live replay-apply driver.
70 ///
71 /// When present, every entry written to the WAL is ALSO applied to the
72 /// replica's live in-memory tree (committed ops become visible; txn ops
73 /// are buffered until commit) so the replica can serve reads without a
74 /// restart. `None` for the legacy byte-shadow-only path (e.g. when no
75 /// live `EnvironmentImpl` is wired). Port of JE `Replay` driven from the
76 /// replica replay thread.
77 replay: Option<noxu_dbi::ReplicaReplay>,
78}
79
80impl EnvironmentLogWriter {
81 /// Create a new `EnvironmentLogWriter`.
82 ///
83 /// # Arguments
84 /// * `log_manager` — The live `LogManager` for this replica environment.
85 /// * `vlsn_index` — The VLSN index to update after each written entry.
86 pub fn new(
87 log_manager: Arc<noxu_log::LogManager>,
88 vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
89 ) -> Self {
90 Self { log_manager, vlsn_index, replay: None }
91 }
92
93 /// REP-7 (B): create a writer that ALSO live-applies each entry to the
94 /// replica's in-memory tree via `replay`, so reads on the replica see
95 /// replicated data without a restart.
96 pub fn with_replay(
97 log_manager: Arc<noxu_log::LogManager>,
98 vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
99 replay: noxu_dbi::ReplicaReplay,
100 ) -> Self {
101 Self { log_manager, vlsn_index, replay: Some(replay) }
102 }
103
104 /// REP-10 seam: shared handle to the replica's last-applied VLSN, when a
105 /// live replay driver is installed. A future consistency policy gates
106 /// reads on this; this method only exposes it.
107 pub fn last_applied_vlsn_handle(
108 &self,
109 ) -> Option<std::sync::Arc<std::sync::atomic::AtomicU64>> {
110 self.replay.as_ref().map(|r| r.last_applied_vlsn_handle())
111 }
112}
113
114impl LogWriter for EnvironmentLogWriter {
115 /// Write one replicated entry to the local log.
116 ///
117 /// Resolves `entry_type` → `LogEntryType`, appends the payload to the
118 /// WAL, and records the assigned LSN in the VLSN index. Returns an
119 /// error if the entry type is unknown or the write fails.
120 fn write_entry(
121 &mut self,
122 vlsn: u64,
123 entry_type: u8,
124 payload: &[u8],
125 ) -> crate::error::Result<()> {
126 // Resolve the wire entry-type byte to the typed enum.
127 let log_entry_type = LogEntryType::from_type_num(entry_type)
128 .ok_or_else(|| {
129 crate::error::RepError::ProtocolError(format!(
130 "replica: unknown entry_type byte {}",
131 entry_type
132 ))
133 })?;
134
135 // Write to the local WAL. Replicated entries are non-provisional and
136 // do not require an immediate fsync on every entry (the master already
137 // fsynced before sending).
138 //
139 // Chained replication: re-log the entry preserving the master's
140 // ORIGINAL VLSN via `log_with_vlsn`, so the replica's WAL carries a
141 // VLSN-tagged 22-byte header identical in shape to the master's.
142 // This makes the replica's own WAL a valid `FeederSource` for a
143 // downstream replica (an `EnvironmentLogScanner` reads the VLSN from
144 // the 22-byte header). Faithful to JE `Replay.logReplicatedEntry`,
145 // which logs the received record preserving its VLSN so the replica's
146 // VLSNIndex + log are a feeder source for a replica chain (see
147 // `FeederSource` javadoc: "a Replica in a Replica chain that is
148 // replaying log records it received from some other source"). Plain
149 // `log()` would write a 14-byte no-VLSN header that no downstream
150 // feeder could scan.
151 //
152 // vlsn=0 is reserved as NULL_VLSN; entries without a VLSN fall back to
153 // the plain `log()` path (these are never streamed downstream).
154 //
155 // Chained replication / flush-to-OS (NOT fsync): we write the received
156 // entry with `flush_required = true, fsync_required = false`. The
157 // flush pushes the buffered bytes out to the OS file so the entry's
158 // file length is visible to a DOWNSTREAM feeder's
159 // `EnvironmentLogScanner` (which reads `FileManager::get_file_length`)
160 // — without it the entry would sit in the in-memory `LogBuffer`, the
161 // on-disk file length would not advance, and a cascading replica would
162 // see nothing to feed. We deliberately do NOT fsync: durability of
163 // the forwarded stream is the master's responsibility (it fsynced
164 // before sending), so the replica only needs the OS to make the bytes
165 // readable, not crash-durable. Faithful to JE `Replay`, which flushes
166 // received entries to its log so its VLSNIndex + log are a valid
167 // `FeederSource` for a replica chain, without forcing a per-entry
168 // fsync on the replay path.
169 let lsn = if vlsn > 0 {
170 self.log_manager.log_with_vlsn(
171 log_entry_type,
172 payload,
173 vlsn,
174 /*flush_required=*/ true,
175 /*fsync_required=*/ false,
176 )
177 } else {
178 self.log_manager.log(
179 log_entry_type,
180 payload,
181 Provisional::No,
182 /*flush_required=*/ true,
183 /*fsync_required=*/ false,
184 )
185 }
186 .map_err(|e| {
187 crate::error::RepError::DatabaseError(format!(
188 "replica log write failed: {}",
189 e
190 ))
191 })?;
192
193 // Register VLSN → LSN in the replica's VLSN index so that
194 // FeederRunner/ack tracking can correlate positions. Dispatch the
195 // entry type so `lastSync`/`lastTxnEnd` advance (REP-5; JE
196 // VLSNRange.getUpdateForNewMapping).
197 // vlsn=0 is reserved as NULL_VLSN; skip it.
198 if vlsn > 0 {
199 self.vlsn_index.put_with_type(
200 vlsn,
201 lsn.file_number(),
202 lsn.file_offset(),
203 log_entry_type,
204 );
205 }
206
207 // REP-7 (B): live-apply to the replica's in-memory tree AFTER the WAL
208 // write + VLSN-index update, so the WAL stays the source of truth and
209 // the live tree is the read-serving optimization recovery re-derives.
210 // Port of JE: the replica writes the entry to its log, then
211 // `Replay.replayEntry` applies it to the tree.
212 if let Some(replay) = self.replay.as_mut() {
213 replay.apply_entry(vlsn, entry_type, payload, lsn);
214 }
215
216 log::trace!(
217 "replica: wrote entry vlsn={} type={} lsn=({},{})",
218 vlsn,
219 log_entry_type,
220 lsn.file_number(),
221 lsn.file_offset(),
222 );
223
224 Ok(())
225 }
226}
227
228// ---------------------------------------------------------------------------
229// ReplicaReceiver
230// ---------------------------------------------------------------------------
231
232/// Wire frame header size (matches FeederRunner::FRAME_HEADER_LEN).
233///
234/// Format: `[vlsn:8][type:1][payload_len:4][crc32:4]` = 17 bytes.
235const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; // vlsn + type + len + crc32
236
237/// Active replica I/O loop.
238///
239/// `ReplicaReceiver` owns a channel to the master feeder. `run()` is a
240/// blocking loop that:
241/// 1. Reads framed entries from the feeder.
242/// 2. Deserializes each entry: `[vlsn:8][type:1][len:4][crc32:4][payload]`.
243/// 3. Verifies `crc32fast::hash(payload) == crc32` — returns
244/// [`RepError::FrameCorrupted`] on mismatch.
245/// 4. Passes the entry to `log_writer`.
246/// 5. Sends an 8-byte LE VLSN ack back to the master.
247/// 6. Returns when the channel is closed or an I/O error occurs.
248///
249/// Read thread and replay thread in the replica.
250pub struct ReplicaReceiver {
251 /// Channel to the master feeder.
252 channel: Arc<dyn Channel>,
253}
254
255impl ReplicaReceiver {
256 /// Create a new `ReplicaReceiver` on the given channel.
257 pub fn new(channel: Arc<dyn Channel>) -> Self {
258 Self { channel }
259 }
260
261 /// Run the replica receive loop.
262 ///
263 /// Blocks until the channel is closed or an unrecoverable error occurs.
264 /// Each successfully received entry is passed to `log_writer`; then an
265 /// ack `[vlsn: 8 bytes LE]` is sent back to the master.
266 ///
267 /// Equivalent to [`Self::run_until`] with no shutdown flag.
268 ///
269 /// # Anti-replay / VLSN-ordering enforcement (LOG-7)
270 ///
271 /// The replica MUST observe strictly-increasing VLSNs across the
272 /// connection. Without this check the master could (accidentally
273 /// or maliciously) replay an old frame, causing the replica to ack a
274 /// VLSN it had already applied and silently overwrite a more-recent
275 /// committed value. We track a `received_vlsn` high-water mark and
276 /// reject any incoming frame whose VLSN is `<= high-water` with a
277 /// [`RepError::ProtocolError`]. Gaps above the high-water mark are
278 /// allowed because the master is permitted to skip non-replicated
279 /// entries.
280 ///
281 /// # Entry-type validation (LOG-10)
282 ///
283 /// The wire-format entry-type byte is also validated against the set
284 /// of known [`LogEntryType`] variants before forwarding to
285 /// `log_writer`. An unknown byte indicates either a protocol-version
286 /// skew or an attacker who flipped a header bit, and is logged at
287 /// `error` level then skipped (the connection is kept open so the
288 /// stream can recover from a single bad frame; a repeated stream of
289 /// unknowns will surface via the operator alerting on the error log).
290 pub fn run(&self, log_writer: &mut dyn LogWriter) -> Result<()> {
291 self.run_until(log_writer, None)
292 }
293
294 /// Run the replica receive loop, polling an optional `shutdown` flag.
295 ///
296 /// Identical to [`Self::run`] but, when `shutdown` is `Some`, the loop
297 /// returns `Ok(())` as soon as the flag is observed `true` — including on
298 /// each receive timeout. This lets `ReplicatedEnvironment::close()`
299 /// break a replica receive thread whose upstream feeder stays connected
300 /// (e.g. a mid-tier replica in a chain closed before its upstream), which
301 /// would otherwise block the thread-join in `close()`. The receive
302 /// timeout is shortened to 1s so close is responsive.
303 pub fn run_until(
304 &self,
305 log_writer: &mut dyn LogWriter,
306 shutdown: Option<&std::sync::atomic::AtomicBool>,
307 ) -> Result<()> {
308 use std::sync::atomic::Ordering;
309 // A short timeout when a shutdown flag is wired keeps close()
310 // responsive; without one, keep the original 30s budget.
311 let recv_timeout = if shutdown.is_some() {
312 Duration::from_secs(1)
313 } else {
314 Duration::from_secs(30)
315 };
316 // LOG-7: strictly-increasing VLSN high-water mark. 0 == NULL_VLSN
317 // (never assigned by the master), so "<= high-water" rejects 0
318 // too once a real VLSN has arrived.
319 let mut received_vlsn_high_water: u64 = 0;
320
321 loop {
322 if let Some(flag) = shutdown
323 && flag.load(Ordering::SeqCst)
324 {
325 return Ok(());
326 }
327 // ----------------------------------------------------------------
328 // Receive the next framed entry from the feeder.
329 // ----------------------------------------------------------------
330 let frame = match self.channel.receive(recv_timeout) {
331 Ok(Some(f)) => f,
332 Ok(None) => {
333 // Timeout with no data — keep waiting (loop top re-checks
334 // the shutdown flag).
335 continue;
336 }
337 Err(RepError::ChannelClosed(_)) => {
338 // Master disconnected; clean shutdown.
339 return Ok(());
340 }
341 Err(e) => return Err(e),
342 };
343
344 // ----------------------------------------------------------------
345 // Parse frame: [vlsn:8 LE][entry_type:1][payload_len:4 LE]
346 // [crc32:4 LE][payload]
347 // ----------------------------------------------------------------
348 if frame.len() < FRAME_HEADER_LEN {
349 return Err(RepError::ProtocolError(format!(
350 "replica: short frame: {} bytes",
351 frame.len()
352 )));
353 }
354
355 let vlsn = u64::from_le_bytes(frame[0..8].try_into().unwrap());
356 let entry_type = frame[8];
357 let payload_len =
358 u32::from_le_bytes(frame[9..13].try_into().unwrap()) as usize;
359 let expected_crc =
360 u32::from_le_bytes(frame[13..17].try_into().unwrap());
361
362 if frame.len() < FRAME_HEADER_LEN + payload_len {
363 return Err(RepError::ProtocolError(format!(
364 "replica: frame payload truncated: expected {} bytes, got {}",
365 payload_len,
366 frame.len() - FRAME_HEADER_LEN,
367 )));
368 }
369
370 let payload =
371 &frame[FRAME_HEADER_LEN..FRAME_HEADER_LEN + payload_len];
372
373 // ----------------------------------------------------------------
374 // Verify CRC32 — reject corrupted frames before applying.
375 // ----------------------------------------------------------------
376 let actual_crc = crc32_hash(payload);
377 if actual_crc != expected_crc {
378 return Err(RepError::FrameCorrupted {
379 vlsn,
380 expected: expected_crc,
381 actual: actual_crc,
382 });
383 }
384
385 // ----------------------------------------------------------------
386 // LOG-7: enforce strictly-increasing VLSN order.
387 //
388 // VLSN 0 from the master is the NULL VLSN sentinel and is not
389 // checked against the high-water mark (the feeder sends 0 for
390 // entries that do not carry a VLSN). All non-zero VLSNs MUST
391 // strictly exceed the previous high-water; otherwise we have
392 // either a replayed frame or a master that re-used a sequence
393 // number — both are protocol-fatal.
394 // ----------------------------------------------------------------
395 if vlsn != 0 && vlsn <= received_vlsn_high_water {
396 return Err(RepError::ProtocolError(format!(
397 "replica: VLSN ordering violation: incoming vlsn={vlsn} \
398 <= received high-water {received_vlsn_high_water}; \
399 possible replay attack or master clock-skew"
400 )));
401 }
402
403 // ----------------------------------------------------------------
404 // LOG-10: validate the entry-type byte against the catalog
405 // before forwarding to the log writer. An unknown type is
406 // logged and the frame is skipped — but the connection stays
407 // open so a single corrupt frame does not stall replication.
408 // ----------------------------------------------------------------
409 if LogEntryType::from_type_num(entry_type).is_none() {
410 log::error!(
411 "replica: unknown entry_type byte {entry_type} on frame \
412 vlsn={vlsn}; skipping (LOG-10)"
413 );
414 // Skip this frame: do not advance high-water, do not ack.
415 // The master will retransmit if the replica disconnects;
416 // for now we just continue to the next frame.
417 continue;
418 }
419
420 // ----------------------------------------------------------------
421 // Apply the entry to the local log.
422 // ----------------------------------------------------------------
423 log_writer.write_entry(vlsn, entry_type, payload)?;
424
425 // Advance the high-water mark only after a successful apply.
426 if vlsn != 0 {
427 received_vlsn_high_water = vlsn;
428 }
429
430 // ----------------------------------------------------------------
431 // Send ack: [vlsn: 8 bytes LE]
432 // ----------------------------------------------------------------
433 let ack = vlsn.to_le_bytes();
434 match self.channel.send(&ack) {
435 Ok(()) => {}
436 Err(RepError::ChannelClosed(_)) => return Ok(()),
437 Err(e) => return Err(e),
438 }
439 }
440 }
441}
442
443/// The state of the replica's replication stream.
444///
445/// Replica state machine.
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum ReplicaStreamState {
448 /// Not connected to any master.
449 Idle,
450 /// Establishing connection to the master.
451 Connecting,
452 /// Actively receiving replication data.
453 Streaming,
454 /// Replaying log entries to catch up with the master.
455 CatchingUp,
456 /// Shutting down.
457 Shutdown,
458}
459
460/// Tracks the state of receiving replication data from the master.
461///
462/// The replica stream receives log entries from the master, buffers them
463/// in a pending queue, and tracks which VLSNs have been received vs.
464/// applied to the local database.
465///
466///
467pub struct ReplicaStream {
468 /// Name of the master we are connected to.
469 master_name: Mutex<Option<String>>,
470 /// Current connection state.
471 state: Mutex<ReplicaStreamState>,
472 /// Last VLSN applied to the local database.
473 applied_vlsn: Mutex<u64>,
474 /// Last VLSN received from the master.
475 received_vlsn: Mutex<u64>,
476 /// Master's latest known VLSN (from heartbeat messages).
477 master_vlsn: Mutex<u64>,
478 /// Pending entries waiting to be applied: (vlsn, entry_type, data).
479 pending_entries: Mutex<Vec<(u64, u8, Vec<u8>)>>,
480}
481
482impl Default for ReplicaStream {
483 fn default() -> Self {
484 Self::new()
485 }
486}
487
488impl ReplicaStream {
489 /// Create a new replica stream in the idle state.
490 pub fn new() -> Self {
491 ReplicaStream {
492 master_name: Mutex::new(None),
493 state: Mutex::new(ReplicaStreamState::Idle),
494 applied_vlsn: Mutex::new(0),
495 received_vlsn: Mutex::new(0),
496 master_vlsn: Mutex::new(0),
497 pending_entries: Mutex::new(Vec::new()),
498 }
499 }
500
501 /// Return the current stream state.
502 pub fn get_state(&self) -> ReplicaStreamState {
503 *self.state.lock()
504 }
505
506 /// Set the stream state.
507 pub fn set_state(&self, state: ReplicaStreamState) {
508 *self.state.lock() = state;
509 }
510
511 /// Return the last VLSN that has been applied to the local database.
512 pub fn get_applied_vlsn(&self) -> u64 {
513 *self.applied_vlsn.lock()
514 }
515
516 /// Return the last VLSN received from the master (may be ahead of
517 /// `applied_vlsn` if entries are buffered).
518 pub fn get_received_vlsn(&self) -> u64 {
519 *self.received_vlsn.lock()
520 }
521
522 /// Return the master's latest known VLSN (from heartbeat updates).
523 pub fn get_master_vlsn(&self) -> u64 {
524 *self.master_vlsn.lock()
525 }
526
527 /// Set the master node name.
528 pub fn set_master(&self, name: &str) {
529 *self.master_name.lock() = Some(name.to_string());
530 }
531
532 /// Return the master node name, if set.
533 pub fn get_master(&self) -> Option<String> {
534 self.master_name.lock().clone()
535 }
536
537 /// Receive a log entry from the master.
538 ///
539 /// The entry is added to the pending queue and `received_vlsn` is
540 /// updated if the new VLSN is greater than the current value.
541 pub fn receive_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
542 self.pending_entries.lock().push((vlsn, entry_type, data));
543 let mut received = self.received_vlsn.lock();
544 if vlsn > *received {
545 *received = vlsn;
546 }
547 }
548
549 /// Mark a VLSN as applied to the local database.
550 ///
551 /// The `applied_vlsn` is updated if the new VLSN is greater than the
552 /// current value (applied VLSNs should advance monotonically).
553 pub fn mark_applied(&self, vlsn: u64) {
554 let mut applied = self.applied_vlsn.lock();
555 if vlsn > *applied {
556 *applied = vlsn;
557 }
558 }
559
560 /// Update the master's known latest VLSN (typically from a heartbeat
561 /// message).
562 pub fn update_master_vlsn(&self, vlsn: u64) {
563 let mut master = self.master_vlsn.lock();
564 if vlsn > *master {
565 *master = vlsn;
566 }
567 }
568
569 /// Return the replication lag: the difference between the master's
570 /// latest VLSN and the last applied VLSN.
571 ///
572 /// A lag of 0 means the replica is fully caught up with the master.
573 pub fn get_lag(&self) -> u64 {
574 let master = *self.master_vlsn.lock();
575 let applied = *self.applied_vlsn.lock();
576 master.saturating_sub(applied)
577 }
578
579 /// Drain all pending entries for processing.
580 ///
581 /// Returns the entries in the order they were received and leaves
582 /// the pending queue empty.
583 pub fn drain_pending(&self) -> Vec<(u64, u8, Vec<u8>)> {
584 let mut pending = self.pending_entries.lock();
585 std::mem::take(&mut *pending)
586 }
587
588 /// Check if the replica is caught up with the master.
589 ///
590 /// Returns `true` if **all** of the following are true:
591 /// - the master VLSN is non-zero (i.e. at least one master VLSN has
592 /// been observed; a stream that has never received a master VLSN
593 /// reports `false`),
594 /// - the applied VLSN equals or exceeds the master's latest known
595 /// VLSN, and
596 /// - there are no pending entries.
597 pub fn is_caught_up(&self) -> bool {
598 let applied = *self.applied_vlsn.lock();
599 let master = *self.master_vlsn.lock();
600 let pending_empty = self.pending_entries.lock().is_empty();
601 applied >= master && master > 0 && pending_empty
602 }
603}
604
605impl std::fmt::Debug for ReplicaStream {
606 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607 f.debug_struct("ReplicaStream")
608 .field("master", &self.get_master())
609 .field("state", &self.get_state())
610 .field("applied_vlsn", &self.get_applied_vlsn())
611 .field("received_vlsn", &self.get_received_vlsn())
612 .field("master_vlsn", &self.get_master_vlsn())
613 .field("lag", &self.get_lag())
614 .finish()
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use crate::net::channel::LocalChannelPair;
622
623 // -----------------------------------------------------------------------
624 // LogWriter helper
625 // -----------------------------------------------------------------------
626
627 struct RecordingWriter {
628 entries: Vec<(u64, u8, Vec<u8>)>,
629 }
630
631 impl RecordingWriter {
632 fn new() -> Self {
633 Self { entries: Vec::new() }
634 }
635 }
636
637 impl LogWriter for RecordingWriter {
638 fn write_entry(
639 &mut self,
640 vlsn: u64,
641 entry_type: u8,
642 payload: &[u8],
643 ) -> Result<()> {
644 self.entries.push((vlsn, entry_type, payload.to_vec()));
645 Ok(())
646 }
647 }
648
649 // -----------------------------------------------------------------------
650 // ReplicaReceiver tests
651 // -----------------------------------------------------------------------
652
653 fn make_frame(vlsn: u64, entry_type: u8, payload: &[u8]) -> Vec<u8> {
654 let crc = crc32_hash(payload);
655 let mut f = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
656 f.extend_from_slice(&vlsn.to_le_bytes());
657 f.push(entry_type);
658 f.extend_from_slice(&(payload.len() as u32).to_le_bytes());
659 f.extend_from_slice(&crc.to_le_bytes());
660 f.extend_from_slice(payload);
661 f
662 }
663
664 #[test]
665 fn test_replica_receiver_receives_and_acks() {
666 let pair = LocalChannelPair::new();
667 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
668 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
669
670 // Send 3 frames from the "master" side.
671 let frames = vec![
672 make_frame(1, 10, &[0xAA]),
673 make_frame(2, 20, &[0xBB, 0xCC]),
674 make_frame(3, 30, &[]),
675 ];
676
677 let master_clone = Arc::clone(&master_side);
678 let send_handle = std::thread::spawn(move || {
679 for f in &frames {
680 master_clone.send(f).unwrap();
681 }
682 // Collect 3 acks.
683 let mut acked = Vec::new();
684 let timeout = Duration::from_secs(5);
685 for _ in 0..3 {
686 let ack = master_clone.receive(timeout).unwrap().unwrap();
687 let vlsn = u64::from_le_bytes(ack[..8].try_into().unwrap());
688 acked.push(vlsn);
689 }
690 // Close the channel to terminate the receiver loop.
691 master_clone.close().unwrap();
692 acked
693 });
694
695 let receiver = ReplicaReceiver::new(Arc::clone(&replica_side));
696 let mut writer = RecordingWriter::new();
697 receiver.run(&mut writer).unwrap();
698
699 let acked = send_handle.join().unwrap();
700 assert_eq!(acked, vec![1, 2, 3]);
701
702 assert_eq!(writer.entries.len(), 3);
703 assert_eq!(writer.entries[0], (1, 10, vec![0xAA]));
704 assert_eq!(writer.entries[1], (2, 20, vec![0xBB, 0xCC]));
705 assert_eq!(writer.entries[2], (3, 30, vec![]));
706 }
707
708 #[test]
709 fn test_replica_receiver_stops_on_channel_close() {
710 let pair = LocalChannelPair::new();
711 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
712 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
713
714 // Close the master side immediately; receiver should return Ok.
715 master_side.close().unwrap();
716
717 let receiver = ReplicaReceiver::new(replica_side);
718 let mut writer = RecordingWriter::new();
719 // The replica_side's receive will fail with ChannelClosed since
720 // it is still "open" but the master closed its end. The LocalChannel
721 // implementation returns None on timeout (closed endpoint) then the
722 // RecvError causes ChannelClosed. Let's just verify it terminates.
723 // We accept either Ok or ChannelClosed here.
724 let res = receiver.run(&mut writer);
725 assert!(res.is_ok() || matches!(res, Err(RepError::ChannelClosed(_))));
726 }
727
728 // -----------------------------------------------------------------------
729 // Feeder → Replica round-trip test using LocalChannel
730 // -----------------------------------------------------------------------
731
732 #[test]
733 fn test_feeder_to_replica_round_trip() {
734 use crate::stream::feeder::{FeederRunner, LogScanner};
735 use std::collections::VecDeque;
736
737 struct SimpleScanner {
738 items: VecDeque<(u64, u8, Vec<u8>)>,
739 }
740 impl LogScanner for SimpleScanner {
741 fn next_entry(
742 &mut self,
743 from_vlsn: u64,
744 ) -> Option<(u64, u8, Vec<u8>)> {
745 if let Some(&(v, _, _)) = self.items.front()
746 && v >= from_vlsn
747 {
748 return self.items.pop_front();
749 }
750 None
751 }
752 }
753
754 let pair = LocalChannelPair::new();
755 let master_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
756 let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
757
758 // Entries to replicate. Entry-type bytes must be valid
759 // `LogEntryType` variants (validated by LOG-10 enforcement); pick
760 // FileHeader/IN/BIN/BINDelta/InsertLN.
761 let valid_types: [u8; 5] = [1, 2, 3, 4, 10];
762 let entries: Vec<(u64, u8, Vec<u8>)> = (1..=5)
763 .map(|i| {
764 let etype = valid_types[(i - 1) as usize];
765 (i, etype, vec![i as u8; i as usize])
766 })
767 .collect();
768
769 // Replica thread.
770 let replica_ch_clone = Arc::clone(&replica_ch);
771 let replica_handle = std::thread::spawn(move || {
772 let receiver = ReplicaReceiver::new(replica_ch_clone);
773 let mut writer = RecordingWriter::new();
774 receiver.run(&mut writer).unwrap();
775 writer.entries
776 });
777
778 // Feeder thread.
779 let master_ch_clone = Arc::clone(&master_ch);
780 let feeder_handle = std::thread::spawn(move || {
781 let runner = FeederRunner::new(Arc::clone(&master_ch_clone), 1);
782 let mut scanner =
783 SimpleScanner { items: entries.into_iter().collect() };
784 runner.run(&mut scanner).unwrap();
785 runner.known_replica_vlsn()
786 });
787
788 // Let them run briefly, then close the channel.
789 std::thread::sleep(Duration::from_millis(200));
790 master_ch.close().unwrap();
791 replica_ch.close().unwrap();
792
793 let last_acked = feeder_handle.join().unwrap();
794 let written = replica_handle.join().unwrap();
795
796 assert_eq!(written.len(), 5);
797 for (i, (vlsn, etype, payload)) in written.iter().enumerate() {
798 let expected_vlsn = (i + 1) as u64;
799 assert_eq!(*vlsn, expected_vlsn);
800 assert_eq!(*etype, valid_types[i]);
801 assert_eq!(payload.len(), expected_vlsn as usize);
802 }
803 assert_eq!(last_acked, 5);
804 }
805
806 /// LOG-7: a replayed (or out-of-order) frame whose VLSN is `<=` the
807 /// already-received high-water mark must be rejected with
808 /// [`RepError::ProtocolError`]. Without this check the master could
809 /// re-send a previously-acked frame and the replica would silently
810 /// overwrite a more-recent committed value.
811 #[test]
812 fn test_replica_rejects_replayed_vlsn() {
813 let pair = LocalChannelPair::new();
814 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
815 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
816
817 // Send VLSN=5 then VLSN=3 (replay). Entry-type 10 = InsertLN.
818 let frames =
819 vec![make_frame(5, 10, b"first"), make_frame(3, 10, b"replay")];
820
821 let master_clone = Arc::clone(&master_side);
822 let _send_handle = std::thread::spawn(move || {
823 for f in &frames {
824 let _ = master_clone.send(f);
825 }
826 // Drain the ack for the first frame so the receiver can advance.
827 let _ = master_clone.receive(Duration::from_secs(2));
828 });
829
830 let receiver = ReplicaReceiver::new(replica_side);
831 let mut writer = RecordingWriter::new();
832 let res = receiver.run(&mut writer);
833
834 match res {
835 Err(RepError::ProtocolError(msg)) => {
836 assert!(
837 msg.contains("VLSN ordering violation"),
838 "expected VLSN-ordering protocol error, got: {msg}"
839 );
840 }
841 other => {
842 panic!("expected ProtocolError on replay, got {other:?}")
843 }
844 }
845
846 // The first (in-order) frame should have been applied; the
847 // replayed one MUST NOT be.
848 assert_eq!(writer.entries.len(), 1);
849 assert_eq!(writer.entries[0].0, 5);
850 }
851
852 /// LOG-7: equal VLSNs are also rejected (the rule is *strictly*
853 /// increasing).
854 #[test]
855 fn test_replica_rejects_duplicate_vlsn() {
856 let pair = LocalChannelPair::new();
857 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
858 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
859
860 let frames = vec![make_frame(7, 10, b"a"), make_frame(7, 10, b"b")];
861
862 let master_clone = Arc::clone(&master_side);
863 let _send_handle = std::thread::spawn(move || {
864 for f in &frames {
865 let _ = master_clone.send(f);
866 }
867 let _ = master_clone.receive(Duration::from_secs(2));
868 });
869
870 let receiver = ReplicaReceiver::new(replica_side);
871 let mut writer = RecordingWriter::new();
872 let res = receiver.run(&mut writer);
873 assert!(
874 matches!(res, Err(RepError::ProtocolError(_))),
875 "expected ProtocolError on duplicate VLSN, got {res:?}"
876 );
877 assert_eq!(writer.entries.len(), 1);
878 }
879
880 /// LOG-7: a gap in the VLSN sequence (`vlsn > high-water + 1`) is
881 /// allowed — the master may legitimately skip VLSNs (non-replicated
882 /// entries reuse the same connection).
883 #[test]
884 fn test_replica_allows_vlsn_gap() {
885 let pair = LocalChannelPair::new();
886 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
887 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
888
889 // VLSN gap: 1 → 5 → 100.
890 let frames = vec![
891 make_frame(1, 10, b"a"),
892 make_frame(5, 10, b"b"),
893 make_frame(100, 10, b"c"),
894 ];
895
896 let master_clone = Arc::clone(&master_side);
897 let send_handle = std::thread::spawn(move || {
898 for f in &frames {
899 master_clone.send(f).unwrap();
900 }
901 for _ in 0..3 {
902 let _ = master_clone.receive(Duration::from_secs(2));
903 }
904 master_clone.close().unwrap();
905 });
906
907 let receiver = ReplicaReceiver::new(replica_side);
908 let mut writer = RecordingWriter::new();
909 receiver.run(&mut writer).unwrap();
910 send_handle.join().unwrap();
911
912 assert_eq!(writer.entries.len(), 3);
913 assert_eq!(writer.entries[0].0, 1);
914 assert_eq!(writer.entries[1].0, 5);
915 assert_eq!(writer.entries[2].0, 100);
916 }
917
918 /// LOG-10: an unknown entry-type byte is logged and the frame is
919 /// skipped. The connection stays open; subsequent valid frames are
920 /// applied normally.
921 #[test]
922 fn test_replica_skips_unknown_entry_type() {
923 let pair = LocalChannelPair::new();
924 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
925 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
926
927 // VLSN=1: bogus entry_type=200 (no LogEntryType::from_type_num).
928 // VLSN=2: valid InsertLN (10). After my high-water bookkeeping,
929 // VLSN=2 must still be accepted because the bogus frame did NOT
930 // advance the high-water.
931 let frames =
932 vec![make_frame(1, 200, b"bogus"), make_frame(2, 10, b"good")];
933
934 let master_clone = Arc::clone(&master_side);
935 let send_handle = std::thread::spawn(move || {
936 for f in &frames {
937 master_clone.send(f).unwrap();
938 }
939 // Only the second (valid) frame produces an ack.
940 let ack = master_clone.receive(Duration::from_secs(2)).unwrap();
941 master_clone.close().unwrap();
942 ack
943 });
944
945 let receiver = ReplicaReceiver::new(replica_side);
946 let mut writer = RecordingWriter::new();
947 receiver.run(&mut writer).unwrap();
948
949 let ack = send_handle.join().unwrap();
950 let acked_vlsn =
951 u64::from_le_bytes(ack.unwrap()[..8].try_into().unwrap());
952
953 assert_eq!(writer.entries.len(), 1, "bogus frame must be skipped");
954 assert_eq!(writer.entries[0].0, 2);
955 assert_eq!(writer.entries[0].1, 10);
956 assert_eq!(acked_vlsn, 2);
957 }
958
959 // -----------------------------------------------------------------------
960 // Original ReplicaStream state struct tests
961 // -----------------------------------------------------------------------
962
963 #[test]
964 fn test_new_replica_stream() {
965 let stream = ReplicaStream::new();
966 assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
967 assert_eq!(stream.get_applied_vlsn(), 0);
968 assert_eq!(stream.get_received_vlsn(), 0);
969 assert_eq!(stream.get_master_vlsn(), 0);
970 assert!(stream.get_master().is_none());
971 assert_eq!(stream.get_lag(), 0);
972 }
973
974 #[test]
975 fn test_default() {
976 let stream = ReplicaStream::default();
977 assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
978 }
979
980 #[test]
981 fn test_state_transitions() {
982 let stream = ReplicaStream::new();
983 assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
984
985 stream.set_state(ReplicaStreamState::Connecting);
986 assert_eq!(stream.get_state(), ReplicaStreamState::Connecting);
987
988 stream.set_state(ReplicaStreamState::Streaming);
989 assert_eq!(stream.get_state(), ReplicaStreamState::Streaming);
990
991 stream.set_state(ReplicaStreamState::CatchingUp);
992 assert_eq!(stream.get_state(), ReplicaStreamState::CatchingUp);
993
994 stream.set_state(ReplicaStreamState::Shutdown);
995 assert_eq!(stream.get_state(), ReplicaStreamState::Shutdown);
996 }
997
998 #[test]
999 fn test_master_name() {
1000 let stream = ReplicaStream::new();
1001 assert!(stream.get_master().is_none());
1002
1003 stream.set_master("master-node-1");
1004 assert_eq!(stream.get_master(), Some("master-node-1".to_string()));
1005
1006 stream.set_master("master-node-2");
1007 assert_eq!(stream.get_master(), Some("master-node-2".to_string()));
1008 }
1009
1010 #[test]
1011 fn test_receive_and_drain() {
1012 let stream = ReplicaStream::new();
1013 stream.receive_entry(1, 10, vec![0xAA]);
1014 stream.receive_entry(2, 20, vec![0xBB, 0xCC]);
1015 stream.receive_entry(3, 30, vec![]);
1016
1017 assert_eq!(stream.get_received_vlsn(), 3);
1018
1019 let entries = stream.drain_pending();
1020 assert_eq!(entries.len(), 3);
1021 assert_eq!(entries[0], (1, 10, vec![0xAA]));
1022 assert_eq!(entries[1], (2, 20, vec![0xBB, 0xCC]));
1023 assert_eq!(entries[2], (3, 30, vec![]));
1024
1025 // Pending should be empty now.
1026 let entries2 = stream.drain_pending();
1027 assert!(entries2.is_empty());
1028 }
1029
1030 #[test]
1031 fn test_received_vlsn_monotonic() {
1032 let stream = ReplicaStream::new();
1033 stream.receive_entry(5, 1, vec![]);
1034 assert_eq!(stream.get_received_vlsn(), 5);
1035
1036 // Out-of-order receive should not decrease received_vlsn.
1037 stream.receive_entry(3, 1, vec![]);
1038 assert_eq!(stream.get_received_vlsn(), 5);
1039
1040 stream.receive_entry(7, 1, vec![]);
1041 assert_eq!(stream.get_received_vlsn(), 7);
1042 }
1043
1044 #[test]
1045 fn test_mark_applied() {
1046 let stream = ReplicaStream::new();
1047 stream.mark_applied(5);
1048 assert_eq!(stream.get_applied_vlsn(), 5);
1049
1050 stream.mark_applied(10);
1051 assert_eq!(stream.get_applied_vlsn(), 10);
1052
1053 // Applied VLSN should not go backwards.
1054 stream.mark_applied(7);
1055 assert_eq!(stream.get_applied_vlsn(), 10);
1056 }
1057
1058 #[test]
1059 fn test_update_master_vlsn() {
1060 let stream = ReplicaStream::new();
1061 stream.update_master_vlsn(100);
1062 assert_eq!(stream.get_master_vlsn(), 100);
1063
1064 stream.update_master_vlsn(150);
1065 assert_eq!(stream.get_master_vlsn(), 150);
1066
1067 // Should not go backwards.
1068 stream.update_master_vlsn(120);
1069 assert_eq!(stream.get_master_vlsn(), 150);
1070 }
1071
1072 #[test]
1073 fn test_lag_calculation() {
1074 let stream = ReplicaStream::new();
1075 stream.update_master_vlsn(100);
1076 assert_eq!(stream.get_lag(), 100);
1077
1078 stream.mark_applied(50);
1079 assert_eq!(stream.get_lag(), 50);
1080
1081 stream.mark_applied(100);
1082 assert_eq!(stream.get_lag(), 0);
1083
1084 // Applied exceeds master (shouldn't normally happen, but be safe).
1085 stream.mark_applied(110);
1086 assert_eq!(stream.get_lag(), 0);
1087 }
1088
1089 #[test]
1090 fn test_is_caught_up() {
1091 let stream = ReplicaStream::new();
1092 // Not caught up: master_vlsn is 0.
1093 assert!(!stream.is_caught_up());
1094
1095 stream.update_master_vlsn(10);
1096 // Not caught up: applied_vlsn is 0.
1097 assert!(!stream.is_caught_up());
1098
1099 stream.mark_applied(10);
1100 // Caught up: applied == master, no pending.
1101 assert!(stream.is_caught_up());
1102
1103 // Add a pending entry -> not caught up.
1104 stream.receive_entry(11, 1, vec![]);
1105 stream.update_master_vlsn(11);
1106 assert!(!stream.is_caught_up());
1107
1108 // Drain and apply.
1109 stream.drain_pending();
1110 stream.mark_applied(11);
1111 assert!(stream.is_caught_up());
1112 }
1113
1114 #[test]
1115 fn test_caught_up_with_excess_applied() {
1116 let stream = ReplicaStream::new();
1117 stream.update_master_vlsn(5);
1118 stream.mark_applied(10);
1119 // applied > master, no pending -> caught up.
1120 assert!(stream.is_caught_up());
1121 }
1122
1123 #[test]
1124 fn test_receive_apply_cycle() {
1125 let stream = ReplicaStream::new();
1126 stream.set_master("master1");
1127 stream.set_state(ReplicaStreamState::Streaming);
1128 stream.update_master_vlsn(5);
1129
1130 // Simulate receiving entries 1-5.
1131 for i in 1..=5 {
1132 stream.receive_entry(i, 1, vec![i as u8]);
1133 }
1134 assert_eq!(stream.get_received_vlsn(), 5);
1135 assert_eq!(stream.get_lag(), 5);
1136
1137 // Drain and apply.
1138 let entries = stream.drain_pending();
1139 assert_eq!(entries.len(), 5);
1140 for (vlsn, _, _) in &entries {
1141 stream.mark_applied(*vlsn);
1142 }
1143
1144 assert_eq!(stream.get_applied_vlsn(), 5);
1145 assert_eq!(stream.get_lag(), 0);
1146 assert!(stream.is_caught_up());
1147 }
1148
1149 #[test]
1150 fn test_debug_format() {
1151 let stream = ReplicaStream::new();
1152 stream.set_master("test-master");
1153 stream.set_state(ReplicaStreamState::Streaming);
1154 let debug = format!("{:?}", stream);
1155 assert!(debug.contains("test-master"));
1156 assert!(debug.contains("Streaming"));
1157 }
1158}