Skip to main content

noxu_rep/stream/
replica_stream.rs

1//! Replica stream  -  replica-side replication receiver.
2//!
3//! Tracks the state of
4//! receiving replication data from the master, including pending entries,
5//! applied/received VLSNs, and the master's latest known VLSN.
6//!
7//! The [`ReplicaReceiver`] provides the active I/O loop that reads framed
8//! entries from the feeder channel, passes them to a [`LogWriter`], and sends
9//! acks back.
10//!
11//! [`EnvironmentLogWriter`] is the live implementation of [`LogWriter`] that
12//! writes replicated entries into the local `LogManager` and updates the
13//! VLSN index.
14
15use noxu_log::{LogEntryType, Provisional};
16use noxu_sync::Mutex;
17use std::sync::Arc;
18use std::time::Duration;
19
20use crate::error::{RepError, Result};
21use crate::net::channel::Channel;
22
23// CRC32 (Ethernet/zlib polynomial) for per-frame integrity verification.
24// Same polynomial used in feeder.rs; see docs/checksum-selection.md.
25use crc32fast::hash as crc32_hash;
26
27// ---------------------------------------------------------------------------
28// LogWriter trait
29// ---------------------------------------------------------------------------
30
31/// Sink for replicated log entries.
32///
33/// Corresponds to replay thread accepting log records and writing them
34/// to the local environment. The replica calls `write_entry` for every entry
35/// received from the master.
36pub trait LogWriter: Send {
37    /// Write a replicated log entry.
38    ///
39    /// # Arguments
40    /// * `vlsn` - The VLSN of this entry.
41    /// * `entry_type` - The log entry type byte.
42    /// * `payload` - The raw entry payload.
43    fn write_entry(
44        &mut self,
45        vlsn: u64,
46        entry_type: u8,
47        payload: &[u8],
48    ) -> Result<()>;
49}
50
51// ---------------------------------------------------------------------------
52// EnvironmentLogWriter
53// ---------------------------------------------------------------------------
54
55/// `LogWriter` implementation backed by the live `LogManager`.
56///
57/// Each `write_entry` call:
58///   1. Resolves the `entry_type` byte to a `LogEntryType`.
59///   2. Writes the payload to the local log via `LogManager::log()`.
60///   3. Registers the returned LSN in the provided `vlsn_index` so that
61///      the VLSN→LSN mapping is kept up-to-date on the replica.
62///
63///
64pub struct EnvironmentLogWriter {
65    /// Shared log manager for appending replicated entries.
66    log_manager: Arc<noxu_log::LogManager>,
67    /// VLSN index: maps VLSN → (file_number, file_offset) on this replica.
68    vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
69    /// REP-7 (B): optional live replay-apply driver.
70    ///
71    /// When present, every entry written to the WAL is ALSO applied to the
72    /// replica's live in-memory tree (committed ops become visible; txn ops
73    /// are buffered until commit) so the replica can serve reads without a
74    /// restart.  `None` for the legacy byte-shadow-only path (e.g. when no
75    /// live `EnvironmentImpl` is wired).  Port of JE `Replay` driven from the
76    /// replica replay thread.
77    replay: Option<noxu_dbi::ReplicaReplay>,
78}
79
80impl EnvironmentLogWriter {
81    /// Create a new `EnvironmentLogWriter`.
82    ///
83    /// # Arguments
84    /// * `log_manager` — The live `LogManager` for this replica environment.
85    /// * `vlsn_index`  — The VLSN index to update after each written entry.
86    pub fn new(
87        log_manager: Arc<noxu_log::LogManager>,
88        vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
89    ) -> Self {
90        Self { log_manager, vlsn_index, replay: None }
91    }
92
93    /// REP-7 (B): create a writer that ALSO live-applies each entry to the
94    /// replica's in-memory tree via `replay`, so reads on the replica see
95    /// replicated data without a restart.
96    pub fn with_replay(
97        log_manager: Arc<noxu_log::LogManager>,
98        vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
99        replay: noxu_dbi::ReplicaReplay,
100    ) -> Self {
101        Self { log_manager, vlsn_index, replay: Some(replay) }
102    }
103
104    /// REP-10 seam: shared handle to the replica's last-applied VLSN, when a
105    /// live replay driver is installed.  A future consistency policy gates
106    /// reads on this; this method only exposes it.
107    pub fn last_applied_vlsn_handle(
108        &self,
109    ) -> Option<std::sync::Arc<std::sync::atomic::AtomicU64>> {
110        self.replay.as_ref().map(|r| r.last_applied_vlsn_handle())
111    }
112}
113
114impl LogWriter for EnvironmentLogWriter {
115    /// Write one replicated entry to the local log.
116    ///
117    /// Resolves `entry_type` → `LogEntryType`, appends the payload to the
118    /// WAL, and records the assigned LSN in the VLSN index.  Returns an
119    /// error if the entry type is unknown or the write fails.
120    fn write_entry(
121        &mut self,
122        vlsn: u64,
123        entry_type: u8,
124        payload: &[u8],
125    ) -> crate::error::Result<()> {
126        // Resolve the wire entry-type byte to the typed enum.
127        let log_entry_type = LogEntryType::from_type_num(entry_type)
128            .ok_or_else(|| {
129                crate::error::RepError::ProtocolError(format!(
130                    "replica: unknown entry_type byte {}",
131                    entry_type
132                ))
133            })?;
134
135        // Write to the local WAL.  Replicated entries are non-provisional and
136        // do not require an immediate fsync on every entry (the master already
137        // fsynced before sending).
138        //
139        // Chained replication: re-log the entry preserving the master's
140        // ORIGINAL VLSN via `log_with_vlsn`, so the replica's WAL carries a
141        // VLSN-tagged 22-byte header identical in shape to the master's.
142        // This makes the replica's own WAL a valid `FeederSource` for a
143        // downstream replica (an `EnvironmentLogScanner` reads the VLSN from
144        // the 22-byte header).  Faithful to JE `Replay.logReplicatedEntry`,
145        // which logs the received record preserving its VLSN so the replica's
146        // VLSNIndex + log are a feeder source for a replica chain (see
147        // `FeederSource` javadoc: "a Replica in a Replica chain that is
148        // replaying log records it received from some other source").  Plain
149        // `log()` would write a 14-byte no-VLSN header that no downstream
150        // feeder could scan.
151        //
152        // vlsn=0 is reserved as NULL_VLSN; entries without a VLSN fall back to
153        // the plain `log()` path (these are never streamed downstream).
154        //
155        // Chained replication / flush-to-OS (NOT fsync): we write the received
156        // entry with `flush_required = true, fsync_required = false`.  The
157        // flush pushes the buffered bytes out to the OS file so the entry's
158        // file length is visible to a DOWNSTREAM feeder's
159        // `EnvironmentLogScanner` (which reads `FileManager::get_file_length`)
160        // — without it the entry would sit in the in-memory `LogBuffer`, the
161        // on-disk file length would not advance, and a cascading replica would
162        // see nothing to feed.  We deliberately do NOT fsync: durability of
163        // the forwarded stream is the master's responsibility (it fsynced
164        // before sending), so the replica only needs the OS to make the bytes
165        // readable, not crash-durable.  Faithful to JE `Replay`, which flushes
166        // received entries to its log so its VLSNIndex + log are a valid
167        // `FeederSource` for a replica chain, without forcing a per-entry
168        // fsync on the replay path.
169        let lsn = if vlsn > 0 {
170            self.log_manager.log_with_vlsn(
171                log_entry_type,
172                payload,
173                vlsn,
174                /*flush_required=*/ true,
175                /*fsync_required=*/ false,
176            )
177        } else {
178            self.log_manager.log(
179                log_entry_type,
180                payload,
181                Provisional::No,
182                /*flush_required=*/ true,
183                /*fsync_required=*/ false,
184            )
185        }
186        .map_err(|e| {
187            crate::error::RepError::DatabaseError(format!(
188                "replica log write failed: {}",
189                e
190            ))
191        })?;
192
193        // Register VLSN → LSN in the replica's VLSN index so that
194        // FeederRunner/ack tracking can correlate positions.  Dispatch the
195        // entry type so `lastSync`/`lastTxnEnd` advance (REP-5; JE
196        // VLSNRange.getUpdateForNewMapping).
197        // vlsn=0 is reserved as NULL_VLSN; skip it.
198        if vlsn > 0 {
199            self.vlsn_index.put_with_type(
200                vlsn,
201                lsn.file_number(),
202                lsn.file_offset(),
203                log_entry_type,
204            );
205        }
206
207        // REP-7 (B): live-apply to the replica's in-memory tree AFTER the WAL
208        // write + VLSN-index update, so the WAL stays the source of truth and
209        // the live tree is the read-serving optimization recovery re-derives.
210        // Port of JE: the replica writes the entry to its log, then
211        // `Replay.replayEntry` applies it to the tree.
212        if let Some(replay) = self.replay.as_mut() {
213            replay.apply_entry(vlsn, entry_type, payload, lsn);
214        }
215
216        log::trace!(
217            "replica: wrote entry vlsn={} type={} lsn=({},{})",
218            vlsn,
219            log_entry_type,
220            lsn.file_number(),
221            lsn.file_offset(),
222        );
223
224        Ok(())
225    }
226}
227
228// ---------------------------------------------------------------------------
229// ReplicaReceiver
230// ---------------------------------------------------------------------------
231
232/// Wire frame header size (matches FeederRunner::FRAME_HEADER_LEN).
233///
234/// Format: `[vlsn:8][type:1][payload_len:4][crc32:4]` = 17 bytes.
235const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; // vlsn + type + len + crc32
236
237/// Active replica I/O loop.
238///
239/// `ReplicaReceiver` owns a channel to the master feeder. `run()` is a
240/// blocking loop that:
241///   1. Reads framed entries from the feeder.
242///   2. Deserializes each entry: `[vlsn:8][type:1][len:4][crc32:4][payload]`.
243///   3. Verifies `crc32fast::hash(payload) == crc32` — returns
244///      [`RepError::FrameCorrupted`] on mismatch.
245///   4. Passes the entry to `log_writer`.
246///   5. Sends an 8-byte LE VLSN ack back to the master.
247///   6. Returns when the channel is closed or an I/O error occurs.
248///
249/// Read thread and replay thread in the replica.
250pub struct ReplicaReceiver {
251    /// Channel to the master feeder.
252    channel: Arc<dyn Channel>,
253}
254
255impl ReplicaReceiver {
256    /// Create a new `ReplicaReceiver` on the given channel.
257    pub fn new(channel: Arc<dyn Channel>) -> Self {
258        Self { channel }
259    }
260
261    /// Run the replica receive loop.
262    ///
263    /// Blocks until the channel is closed or an unrecoverable error occurs.
264    /// Each successfully received entry is passed to `log_writer`; then an
265    /// ack `[vlsn: 8 bytes LE]` is sent back to the master.
266    ///
267    /// Equivalent to [`Self::run_until`] with no shutdown flag.
268    ///
269    /// # Anti-replay / VLSN-ordering enforcement (LOG-7)
270    ///
271    /// The replica MUST observe strictly-increasing VLSNs across the
272    /// connection.  Without this check the master could (accidentally
273    /// or maliciously) replay an old frame, causing the replica to ack a
274    /// VLSN it had already applied and silently overwrite a more-recent
275    /// committed value.  We track a `received_vlsn` high-water mark and
276    /// reject any incoming frame whose VLSN is `<= high-water` with a
277    /// [`RepError::ProtocolError`].  Gaps above the high-water mark are
278    /// allowed because the master is permitted to skip non-replicated
279    /// entries.
280    ///
281    /// # Entry-type validation (LOG-10)
282    ///
283    /// The wire-format entry-type byte is also validated against the set
284    /// of known [`LogEntryType`] variants before forwarding to
285    /// `log_writer`.  An unknown byte indicates either a protocol-version
286    /// skew or an attacker who flipped a header bit, and is logged at
287    /// `error` level then skipped (the connection is kept open so the
288    /// stream can recover from a single bad frame; a repeated stream of
289    /// unknowns will surface via the operator alerting on the error log).
290    pub fn run(&self, log_writer: &mut dyn LogWriter) -> Result<()> {
291        self.run_until(log_writer, None)
292    }
293
294    /// Run the replica receive loop, polling an optional `shutdown` flag.
295    ///
296    /// Identical to [`Self::run`] but, when `shutdown` is `Some`, the loop
297    /// returns `Ok(())` as soon as the flag is observed `true` — including on
298    /// each receive timeout.  This lets `ReplicatedEnvironment::close()`
299    /// break a replica receive thread whose upstream feeder stays connected
300    /// (e.g. a mid-tier replica in a chain closed before its upstream), which
301    /// would otherwise block the thread-join in `close()`.  The receive
302    /// timeout is shortened to 1s so close is responsive.
303    pub fn run_until(
304        &self,
305        log_writer: &mut dyn LogWriter,
306        shutdown: Option<&std::sync::atomic::AtomicBool>,
307    ) -> Result<()> {
308        use std::sync::atomic::Ordering;
309        // A short timeout when a shutdown flag is wired keeps close()
310        // responsive; without one, keep the original 30s budget.
311        let recv_timeout = if shutdown.is_some() {
312            Duration::from_secs(1)
313        } else {
314            Duration::from_secs(30)
315        };
316        // LOG-7: strictly-increasing VLSN high-water mark.  0 == NULL_VLSN
317        // (never assigned by the master), so "<= high-water" rejects 0
318        // too once a real VLSN has arrived.
319        let mut received_vlsn_high_water: u64 = 0;
320
321        loop {
322            if let Some(flag) = shutdown
323                && flag.load(Ordering::SeqCst)
324            {
325                return Ok(());
326            }
327            // ----------------------------------------------------------------
328            // Receive the next framed entry from the feeder.
329            // ----------------------------------------------------------------
330            let frame = match self.channel.receive(recv_timeout) {
331                Ok(Some(f)) => f,
332                Ok(None) => {
333                    // Timeout with no data — keep waiting (loop top re-checks
334                    // the shutdown flag).
335                    continue;
336                }
337                Err(RepError::ChannelClosed(_)) => {
338                    // Master disconnected; clean shutdown.
339                    return Ok(());
340                }
341                Err(e) => return Err(e),
342            };
343
344            // ----------------------------------------------------------------
345            // Parse frame: [vlsn:8 LE][entry_type:1][payload_len:4 LE]
346            //              [crc32:4 LE][payload]
347            // ----------------------------------------------------------------
348            if frame.len() < FRAME_HEADER_LEN {
349                return Err(RepError::ProtocolError(format!(
350                    "replica: short frame: {} bytes",
351                    frame.len()
352                )));
353            }
354
355            let vlsn = u64::from_le_bytes(frame[0..8].try_into().unwrap());
356            let entry_type = frame[8];
357            let payload_len =
358                u32::from_le_bytes(frame[9..13].try_into().unwrap()) as usize;
359            let expected_crc =
360                u32::from_le_bytes(frame[13..17].try_into().unwrap());
361
362            if frame.len() < FRAME_HEADER_LEN + payload_len {
363                return Err(RepError::ProtocolError(format!(
364                    "replica: frame payload truncated: expected {} bytes, got {}",
365                    payload_len,
366                    frame.len() - FRAME_HEADER_LEN,
367                )));
368            }
369
370            let payload =
371                &frame[FRAME_HEADER_LEN..FRAME_HEADER_LEN + payload_len];
372
373            // ----------------------------------------------------------------
374            // Verify CRC32 — reject corrupted frames before applying.
375            // ----------------------------------------------------------------
376            let actual_crc = crc32_hash(payload);
377            if actual_crc != expected_crc {
378                return Err(RepError::FrameCorrupted {
379                    vlsn,
380                    expected: expected_crc,
381                    actual: actual_crc,
382                });
383            }
384
385            // ----------------------------------------------------------------
386            // LOG-7: enforce strictly-increasing VLSN order.
387            //
388            // VLSN 0 from the master is the NULL VLSN sentinel and is not
389            // checked against the high-water mark (the feeder sends 0 for
390            // entries that do not carry a VLSN).  All non-zero VLSNs MUST
391            // strictly exceed the previous high-water; otherwise we have
392            // either a replayed frame or a master that re-used a sequence
393            // number — both are protocol-fatal.
394            // ----------------------------------------------------------------
395            if vlsn != 0 && vlsn <= received_vlsn_high_water {
396                return Err(RepError::ProtocolError(format!(
397                    "replica: VLSN ordering violation: incoming vlsn={vlsn} \
398                     <= received high-water {received_vlsn_high_water}; \
399                     possible replay attack or master clock-skew"
400                )));
401            }
402
403            // ----------------------------------------------------------------
404            // LOG-10: validate the entry-type byte against the catalog
405            // before forwarding to the log writer.  An unknown type is
406            // logged and the frame is skipped — but the connection stays
407            // open so a single corrupt frame does not stall replication.
408            // ----------------------------------------------------------------
409            if LogEntryType::from_type_num(entry_type).is_none() {
410                log::error!(
411                    "replica: unknown entry_type byte {entry_type} on frame \
412                     vlsn={vlsn}; skipping (LOG-10)"
413                );
414                // Skip this frame: do not advance high-water, do not ack.
415                // The master will retransmit if the replica disconnects;
416                // for now we just continue to the next frame.
417                continue;
418            }
419
420            // ----------------------------------------------------------------
421            // Apply the entry to the local log.
422            // ----------------------------------------------------------------
423            log_writer.write_entry(vlsn, entry_type, payload)?;
424
425            // Advance the high-water mark only after a successful apply.
426            if vlsn != 0 {
427                received_vlsn_high_water = vlsn;
428            }
429
430            // ----------------------------------------------------------------
431            // Send ack: [vlsn: 8 bytes LE]
432            // ----------------------------------------------------------------
433            let ack = vlsn.to_le_bytes();
434            match self.channel.send(&ack) {
435                Ok(()) => {}
436                Err(RepError::ChannelClosed(_)) => return Ok(()),
437                Err(e) => return Err(e),
438            }
439        }
440    }
441}
442
443/// The state of the replica's replication stream.
444///
445/// Replica state machine.
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum ReplicaStreamState {
448    /// Not connected to any master.
449    Idle,
450    /// Establishing connection to the master.
451    Connecting,
452    /// Actively receiving replication data.
453    Streaming,
454    /// Replaying log entries to catch up with the master.
455    CatchingUp,
456    /// Shutting down.
457    Shutdown,
458}
459
460/// Tracks the state of receiving replication data from the master.
461///
462/// The replica stream receives log entries from the master, buffers them
463/// in a pending queue, and tracks which VLSNs have been received vs.
464/// applied to the local database.
465///
466///
467pub struct ReplicaStream {
468    /// Name of the master we are connected to.
469    master_name: Mutex<Option<String>>,
470    /// Current connection state.
471    state: Mutex<ReplicaStreamState>,
472    /// Last VLSN applied to the local database.
473    applied_vlsn: Mutex<u64>,
474    /// Last VLSN received from the master.
475    received_vlsn: Mutex<u64>,
476    /// Master's latest known VLSN (from heartbeat messages).
477    master_vlsn: Mutex<u64>,
478    /// Pending entries waiting to be applied: (vlsn, entry_type, data).
479    pending_entries: Mutex<Vec<(u64, u8, Vec<u8>)>>,
480}
481
482impl Default for ReplicaStream {
483    fn default() -> Self {
484        Self::new()
485    }
486}
487
488impl ReplicaStream {
489    /// Create a new replica stream in the idle state.
490    pub fn new() -> Self {
491        ReplicaStream {
492            master_name: Mutex::new(None),
493            state: Mutex::new(ReplicaStreamState::Idle),
494            applied_vlsn: Mutex::new(0),
495            received_vlsn: Mutex::new(0),
496            master_vlsn: Mutex::new(0),
497            pending_entries: Mutex::new(Vec::new()),
498        }
499    }
500
501    /// Return the current stream state.
502    pub fn get_state(&self) -> ReplicaStreamState {
503        *self.state.lock()
504    }
505
506    /// Set the stream state.
507    pub fn set_state(&self, state: ReplicaStreamState) {
508        *self.state.lock() = state;
509    }
510
511    /// Return the last VLSN that has been applied to the local database.
512    pub fn get_applied_vlsn(&self) -> u64 {
513        *self.applied_vlsn.lock()
514    }
515
516    /// Return the last VLSN received from the master (may be ahead of
517    /// `applied_vlsn` if entries are buffered).
518    pub fn get_received_vlsn(&self) -> u64 {
519        *self.received_vlsn.lock()
520    }
521
522    /// Return the master's latest known VLSN (from heartbeat updates).
523    pub fn get_master_vlsn(&self) -> u64 {
524        *self.master_vlsn.lock()
525    }
526
527    /// Set the master node name.
528    pub fn set_master(&self, name: &str) {
529        *self.master_name.lock() = Some(name.to_string());
530    }
531
532    /// Return the master node name, if set.
533    pub fn get_master(&self) -> Option<String> {
534        self.master_name.lock().clone()
535    }
536
537    /// Receive a log entry from the master.
538    ///
539    /// The entry is added to the pending queue and `received_vlsn` is
540    /// updated if the new VLSN is greater than the current value.
541    pub fn receive_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
542        self.pending_entries.lock().push((vlsn, entry_type, data));
543        let mut received = self.received_vlsn.lock();
544        if vlsn > *received {
545            *received = vlsn;
546        }
547    }
548
549    /// Mark a VLSN as applied to the local database.
550    ///
551    /// The `applied_vlsn` is updated if the new VLSN is greater than the
552    /// current value (applied VLSNs should advance monotonically).
553    pub fn mark_applied(&self, vlsn: u64) {
554        let mut applied = self.applied_vlsn.lock();
555        if vlsn > *applied {
556            *applied = vlsn;
557        }
558    }
559
560    /// Update the master's known latest VLSN (typically from a heartbeat
561    /// message).
562    pub fn update_master_vlsn(&self, vlsn: u64) {
563        let mut master = self.master_vlsn.lock();
564        if vlsn > *master {
565            *master = vlsn;
566        }
567    }
568
569    /// Return the replication lag: the difference between the master's
570    /// latest VLSN and the last applied VLSN.
571    ///
572    /// A lag of 0 means the replica is fully caught up with the master.
573    pub fn get_lag(&self) -> u64 {
574        let master = *self.master_vlsn.lock();
575        let applied = *self.applied_vlsn.lock();
576        master.saturating_sub(applied)
577    }
578
579    /// Drain all pending entries for processing.
580    ///
581    /// Returns the entries in the order they were received and leaves
582    /// the pending queue empty.
583    pub fn drain_pending(&self) -> Vec<(u64, u8, Vec<u8>)> {
584        let mut pending = self.pending_entries.lock();
585        std::mem::take(&mut *pending)
586    }
587
588    /// Check if the replica is caught up with the master.
589    ///
590    /// Returns `true` if **all** of the following are true:
591    /// - the master VLSN is non-zero (i.e. at least one master VLSN has
592    ///   been observed; a stream that has never received a master VLSN
593    ///   reports `false`),
594    /// - the applied VLSN equals or exceeds the master's latest known
595    ///   VLSN, and
596    /// - there are no pending entries.
597    pub fn is_caught_up(&self) -> bool {
598        let applied = *self.applied_vlsn.lock();
599        let master = *self.master_vlsn.lock();
600        let pending_empty = self.pending_entries.lock().is_empty();
601        applied >= master && master > 0 && pending_empty
602    }
603}
604
605impl std::fmt::Debug for ReplicaStream {
606    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607        f.debug_struct("ReplicaStream")
608            .field("master", &self.get_master())
609            .field("state", &self.get_state())
610            .field("applied_vlsn", &self.get_applied_vlsn())
611            .field("received_vlsn", &self.get_received_vlsn())
612            .field("master_vlsn", &self.get_master_vlsn())
613            .field("lag", &self.get_lag())
614            .finish()
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use crate::net::channel::LocalChannelPair;
622
623    // -----------------------------------------------------------------------
624    // LogWriter helper
625    // -----------------------------------------------------------------------
626
627    struct RecordingWriter {
628        entries: Vec<(u64, u8, Vec<u8>)>,
629    }
630
631    impl RecordingWriter {
632        fn new() -> Self {
633            Self { entries: Vec::new() }
634        }
635    }
636
637    impl LogWriter for RecordingWriter {
638        fn write_entry(
639            &mut self,
640            vlsn: u64,
641            entry_type: u8,
642            payload: &[u8],
643        ) -> Result<()> {
644            self.entries.push((vlsn, entry_type, payload.to_vec()));
645            Ok(())
646        }
647    }
648
649    // -----------------------------------------------------------------------
650    // ReplicaReceiver tests
651    // -----------------------------------------------------------------------
652
653    fn make_frame(vlsn: u64, entry_type: u8, payload: &[u8]) -> Vec<u8> {
654        let crc = crc32_hash(payload);
655        let mut f = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
656        f.extend_from_slice(&vlsn.to_le_bytes());
657        f.push(entry_type);
658        f.extend_from_slice(&(payload.len() as u32).to_le_bytes());
659        f.extend_from_slice(&crc.to_le_bytes());
660        f.extend_from_slice(payload);
661        f
662    }
663
664    #[test]
665    fn test_replica_receiver_receives_and_acks() {
666        let pair = LocalChannelPair::new();
667        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
668        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
669
670        // Send 3 frames from the "master" side.
671        let frames = vec![
672            make_frame(1, 10, &[0xAA]),
673            make_frame(2, 20, &[0xBB, 0xCC]),
674            make_frame(3, 30, &[]),
675        ];
676
677        let master_clone = Arc::clone(&master_side);
678        let send_handle = std::thread::spawn(move || {
679            for f in &frames {
680                master_clone.send(f).unwrap();
681            }
682            // Collect 3 acks.
683            let mut acked = Vec::new();
684            let timeout = Duration::from_secs(5);
685            for _ in 0..3 {
686                let ack = master_clone.receive(timeout).unwrap().unwrap();
687                let vlsn = u64::from_le_bytes(ack[..8].try_into().unwrap());
688                acked.push(vlsn);
689            }
690            // Close the channel to terminate the receiver loop.
691            master_clone.close().unwrap();
692            acked
693        });
694
695        let receiver = ReplicaReceiver::new(Arc::clone(&replica_side));
696        let mut writer = RecordingWriter::new();
697        receiver.run(&mut writer).unwrap();
698
699        let acked = send_handle.join().unwrap();
700        assert_eq!(acked, vec![1, 2, 3]);
701
702        assert_eq!(writer.entries.len(), 3);
703        assert_eq!(writer.entries[0], (1, 10, vec![0xAA]));
704        assert_eq!(writer.entries[1], (2, 20, vec![0xBB, 0xCC]));
705        assert_eq!(writer.entries[2], (3, 30, vec![]));
706    }
707
708    #[test]
709    fn test_replica_receiver_stops_on_channel_close() {
710        let pair = LocalChannelPair::new();
711        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
712        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
713
714        // Close the master side immediately; receiver should return Ok.
715        master_side.close().unwrap();
716
717        let receiver = ReplicaReceiver::new(replica_side);
718        let mut writer = RecordingWriter::new();
719        // The replica_side's receive will fail with ChannelClosed since
720        // it is still "open" but the master closed its end. The LocalChannel
721        // implementation returns None on timeout (closed endpoint) then the
722        // RecvError causes ChannelClosed. Let's just verify it terminates.
723        // We accept either Ok or ChannelClosed here.
724        let res = receiver.run(&mut writer);
725        assert!(res.is_ok() || matches!(res, Err(RepError::ChannelClosed(_))));
726    }
727
728    // -----------------------------------------------------------------------
729    // Feeder → Replica round-trip test using LocalChannel
730    // -----------------------------------------------------------------------
731
732    #[test]
733    fn test_feeder_to_replica_round_trip() {
734        use crate::stream::feeder::{FeederRunner, LogScanner};
735        use std::collections::VecDeque;
736
737        struct SimpleScanner {
738            items: VecDeque<(u64, u8, Vec<u8>)>,
739        }
740        impl LogScanner for SimpleScanner {
741            fn next_entry(
742                &mut self,
743                from_vlsn: u64,
744            ) -> Option<(u64, u8, Vec<u8>)> {
745                if let Some(&(v, _, _)) = self.items.front()
746                    && v >= from_vlsn
747                {
748                    return self.items.pop_front();
749                }
750                None
751            }
752        }
753
754        let pair = LocalChannelPair::new();
755        let master_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
756        let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
757
758        // Entries to replicate.  Entry-type bytes must be valid
759        // `LogEntryType` variants (validated by LOG-10 enforcement); pick
760        // FileHeader/IN/BIN/BINDelta/InsertLN.
761        let valid_types: [u8; 5] = [1, 2, 3, 4, 10];
762        let entries: Vec<(u64, u8, Vec<u8>)> = (1..=5)
763            .map(|i| {
764                let etype = valid_types[(i - 1) as usize];
765                (i, etype, vec![i as u8; i as usize])
766            })
767            .collect();
768
769        // Replica thread.
770        let replica_ch_clone = Arc::clone(&replica_ch);
771        let replica_handle = std::thread::spawn(move || {
772            let receiver = ReplicaReceiver::new(replica_ch_clone);
773            let mut writer = RecordingWriter::new();
774            receiver.run(&mut writer).unwrap();
775            writer.entries
776        });
777
778        // Feeder thread.
779        let master_ch_clone = Arc::clone(&master_ch);
780        let feeder_handle = std::thread::spawn(move || {
781            let runner = FeederRunner::new(Arc::clone(&master_ch_clone), 1);
782            let mut scanner =
783                SimpleScanner { items: entries.into_iter().collect() };
784            runner.run(&mut scanner).unwrap();
785            runner.known_replica_vlsn()
786        });
787
788        // Let them run briefly, then close the channel.
789        std::thread::sleep(Duration::from_millis(200));
790        master_ch.close().unwrap();
791        replica_ch.close().unwrap();
792
793        let last_acked = feeder_handle.join().unwrap();
794        let written = replica_handle.join().unwrap();
795
796        assert_eq!(written.len(), 5);
797        for (i, (vlsn, etype, payload)) in written.iter().enumerate() {
798            let expected_vlsn = (i + 1) as u64;
799            assert_eq!(*vlsn, expected_vlsn);
800            assert_eq!(*etype, valid_types[i]);
801            assert_eq!(payload.len(), expected_vlsn as usize);
802        }
803        assert_eq!(last_acked, 5);
804    }
805
806    /// LOG-7: a replayed (or out-of-order) frame whose VLSN is `<=` the
807    /// already-received high-water mark must be rejected with
808    /// [`RepError::ProtocolError`].  Without this check the master could
809    /// re-send a previously-acked frame and the replica would silently
810    /// overwrite a more-recent committed value.
811    #[test]
812    fn test_replica_rejects_replayed_vlsn() {
813        let pair = LocalChannelPair::new();
814        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
815        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
816
817        // Send VLSN=5 then VLSN=3 (replay).  Entry-type 10 = InsertLN.
818        let frames =
819            vec![make_frame(5, 10, b"first"), make_frame(3, 10, b"replay")];
820
821        let master_clone = Arc::clone(&master_side);
822        let _send_handle = std::thread::spawn(move || {
823            for f in &frames {
824                let _ = master_clone.send(f);
825            }
826            // Drain the ack for the first frame so the receiver can advance.
827            let _ = master_clone.receive(Duration::from_secs(2));
828        });
829
830        let receiver = ReplicaReceiver::new(replica_side);
831        let mut writer = RecordingWriter::new();
832        let res = receiver.run(&mut writer);
833
834        match res {
835            Err(RepError::ProtocolError(msg)) => {
836                assert!(
837                    msg.contains("VLSN ordering violation"),
838                    "expected VLSN-ordering protocol error, got: {msg}"
839                );
840            }
841            other => {
842                panic!("expected ProtocolError on replay, got {other:?}")
843            }
844        }
845
846        // The first (in-order) frame should have been applied; the
847        // replayed one MUST NOT be.
848        assert_eq!(writer.entries.len(), 1);
849        assert_eq!(writer.entries[0].0, 5);
850    }
851
852    /// LOG-7: equal VLSNs are also rejected (the rule is *strictly*
853    /// increasing).
854    #[test]
855    fn test_replica_rejects_duplicate_vlsn() {
856        let pair = LocalChannelPair::new();
857        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
858        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
859
860        let frames = vec![make_frame(7, 10, b"a"), make_frame(7, 10, b"b")];
861
862        let master_clone = Arc::clone(&master_side);
863        let _send_handle = std::thread::spawn(move || {
864            for f in &frames {
865                let _ = master_clone.send(f);
866            }
867            let _ = master_clone.receive(Duration::from_secs(2));
868        });
869
870        let receiver = ReplicaReceiver::new(replica_side);
871        let mut writer = RecordingWriter::new();
872        let res = receiver.run(&mut writer);
873        assert!(
874            matches!(res, Err(RepError::ProtocolError(_))),
875            "expected ProtocolError on duplicate VLSN, got {res:?}"
876        );
877        assert_eq!(writer.entries.len(), 1);
878    }
879
880    /// LOG-7: a gap in the VLSN sequence (`vlsn > high-water + 1`) is
881    /// allowed — the master may legitimately skip VLSNs (non-replicated
882    /// entries reuse the same connection).
883    #[test]
884    fn test_replica_allows_vlsn_gap() {
885        let pair = LocalChannelPair::new();
886        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
887        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
888
889        // VLSN gap: 1 → 5 → 100.
890        let frames = vec![
891            make_frame(1, 10, b"a"),
892            make_frame(5, 10, b"b"),
893            make_frame(100, 10, b"c"),
894        ];
895
896        let master_clone = Arc::clone(&master_side);
897        let send_handle = std::thread::spawn(move || {
898            for f in &frames {
899                master_clone.send(f).unwrap();
900            }
901            for _ in 0..3 {
902                let _ = master_clone.receive(Duration::from_secs(2));
903            }
904            master_clone.close().unwrap();
905        });
906
907        let receiver = ReplicaReceiver::new(replica_side);
908        let mut writer = RecordingWriter::new();
909        receiver.run(&mut writer).unwrap();
910        send_handle.join().unwrap();
911
912        assert_eq!(writer.entries.len(), 3);
913        assert_eq!(writer.entries[0].0, 1);
914        assert_eq!(writer.entries[1].0, 5);
915        assert_eq!(writer.entries[2].0, 100);
916    }
917
918    /// LOG-10: an unknown entry-type byte is logged and the frame is
919    /// skipped.  The connection stays open; subsequent valid frames are
920    /// applied normally.
921    #[test]
922    fn test_replica_skips_unknown_entry_type() {
923        let pair = LocalChannelPair::new();
924        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
925        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
926
927        // VLSN=1: bogus entry_type=200 (no LogEntryType::from_type_num).
928        // VLSN=2: valid InsertLN (10).  After my high-water bookkeeping,
929        // VLSN=2 must still be accepted because the bogus frame did NOT
930        // advance the high-water.
931        let frames =
932            vec![make_frame(1, 200, b"bogus"), make_frame(2, 10, b"good")];
933
934        let master_clone = Arc::clone(&master_side);
935        let send_handle = std::thread::spawn(move || {
936            for f in &frames {
937                master_clone.send(f).unwrap();
938            }
939            // Only the second (valid) frame produces an ack.
940            let ack = master_clone.receive(Duration::from_secs(2)).unwrap();
941            master_clone.close().unwrap();
942            ack
943        });
944
945        let receiver = ReplicaReceiver::new(replica_side);
946        let mut writer = RecordingWriter::new();
947        receiver.run(&mut writer).unwrap();
948
949        let ack = send_handle.join().unwrap();
950        let acked_vlsn =
951            u64::from_le_bytes(ack.unwrap()[..8].try_into().unwrap());
952
953        assert_eq!(writer.entries.len(), 1, "bogus frame must be skipped");
954        assert_eq!(writer.entries[0].0, 2);
955        assert_eq!(writer.entries[0].1, 10);
956        assert_eq!(acked_vlsn, 2);
957    }
958
959    // -----------------------------------------------------------------------
960    // Original ReplicaStream state struct tests
961    // -----------------------------------------------------------------------
962
963    #[test]
964    fn test_new_replica_stream() {
965        let stream = ReplicaStream::new();
966        assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
967        assert_eq!(stream.get_applied_vlsn(), 0);
968        assert_eq!(stream.get_received_vlsn(), 0);
969        assert_eq!(stream.get_master_vlsn(), 0);
970        assert!(stream.get_master().is_none());
971        assert_eq!(stream.get_lag(), 0);
972    }
973
974    #[test]
975    fn test_default() {
976        let stream = ReplicaStream::default();
977        assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
978    }
979
980    #[test]
981    fn test_state_transitions() {
982        let stream = ReplicaStream::new();
983        assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
984
985        stream.set_state(ReplicaStreamState::Connecting);
986        assert_eq!(stream.get_state(), ReplicaStreamState::Connecting);
987
988        stream.set_state(ReplicaStreamState::Streaming);
989        assert_eq!(stream.get_state(), ReplicaStreamState::Streaming);
990
991        stream.set_state(ReplicaStreamState::CatchingUp);
992        assert_eq!(stream.get_state(), ReplicaStreamState::CatchingUp);
993
994        stream.set_state(ReplicaStreamState::Shutdown);
995        assert_eq!(stream.get_state(), ReplicaStreamState::Shutdown);
996    }
997
998    #[test]
999    fn test_master_name() {
1000        let stream = ReplicaStream::new();
1001        assert!(stream.get_master().is_none());
1002
1003        stream.set_master("master-node-1");
1004        assert_eq!(stream.get_master(), Some("master-node-1".to_string()));
1005
1006        stream.set_master("master-node-2");
1007        assert_eq!(stream.get_master(), Some("master-node-2".to_string()));
1008    }
1009
1010    #[test]
1011    fn test_receive_and_drain() {
1012        let stream = ReplicaStream::new();
1013        stream.receive_entry(1, 10, vec![0xAA]);
1014        stream.receive_entry(2, 20, vec![0xBB, 0xCC]);
1015        stream.receive_entry(3, 30, vec![]);
1016
1017        assert_eq!(stream.get_received_vlsn(), 3);
1018
1019        let entries = stream.drain_pending();
1020        assert_eq!(entries.len(), 3);
1021        assert_eq!(entries[0], (1, 10, vec![0xAA]));
1022        assert_eq!(entries[1], (2, 20, vec![0xBB, 0xCC]));
1023        assert_eq!(entries[2], (3, 30, vec![]));
1024
1025        // Pending should be empty now.
1026        let entries2 = stream.drain_pending();
1027        assert!(entries2.is_empty());
1028    }
1029
1030    #[test]
1031    fn test_received_vlsn_monotonic() {
1032        let stream = ReplicaStream::new();
1033        stream.receive_entry(5, 1, vec![]);
1034        assert_eq!(stream.get_received_vlsn(), 5);
1035
1036        // Out-of-order receive should not decrease received_vlsn.
1037        stream.receive_entry(3, 1, vec![]);
1038        assert_eq!(stream.get_received_vlsn(), 5);
1039
1040        stream.receive_entry(7, 1, vec![]);
1041        assert_eq!(stream.get_received_vlsn(), 7);
1042    }
1043
1044    #[test]
1045    fn test_mark_applied() {
1046        let stream = ReplicaStream::new();
1047        stream.mark_applied(5);
1048        assert_eq!(stream.get_applied_vlsn(), 5);
1049
1050        stream.mark_applied(10);
1051        assert_eq!(stream.get_applied_vlsn(), 10);
1052
1053        // Applied VLSN should not go backwards.
1054        stream.mark_applied(7);
1055        assert_eq!(stream.get_applied_vlsn(), 10);
1056    }
1057
1058    #[test]
1059    fn test_update_master_vlsn() {
1060        let stream = ReplicaStream::new();
1061        stream.update_master_vlsn(100);
1062        assert_eq!(stream.get_master_vlsn(), 100);
1063
1064        stream.update_master_vlsn(150);
1065        assert_eq!(stream.get_master_vlsn(), 150);
1066
1067        // Should not go backwards.
1068        stream.update_master_vlsn(120);
1069        assert_eq!(stream.get_master_vlsn(), 150);
1070    }
1071
1072    #[test]
1073    fn test_lag_calculation() {
1074        let stream = ReplicaStream::new();
1075        stream.update_master_vlsn(100);
1076        assert_eq!(stream.get_lag(), 100);
1077
1078        stream.mark_applied(50);
1079        assert_eq!(stream.get_lag(), 50);
1080
1081        stream.mark_applied(100);
1082        assert_eq!(stream.get_lag(), 0);
1083
1084        // Applied exceeds master (shouldn't normally happen, but be safe).
1085        stream.mark_applied(110);
1086        assert_eq!(stream.get_lag(), 0);
1087    }
1088
1089    #[test]
1090    fn test_is_caught_up() {
1091        let stream = ReplicaStream::new();
1092        // Not caught up: master_vlsn is 0.
1093        assert!(!stream.is_caught_up());
1094
1095        stream.update_master_vlsn(10);
1096        // Not caught up: applied_vlsn is 0.
1097        assert!(!stream.is_caught_up());
1098
1099        stream.mark_applied(10);
1100        // Caught up: applied == master, no pending.
1101        assert!(stream.is_caught_up());
1102
1103        // Add a pending entry -> not caught up.
1104        stream.receive_entry(11, 1, vec![]);
1105        stream.update_master_vlsn(11);
1106        assert!(!stream.is_caught_up());
1107
1108        // Drain and apply.
1109        stream.drain_pending();
1110        stream.mark_applied(11);
1111        assert!(stream.is_caught_up());
1112    }
1113
1114    #[test]
1115    fn test_caught_up_with_excess_applied() {
1116        let stream = ReplicaStream::new();
1117        stream.update_master_vlsn(5);
1118        stream.mark_applied(10);
1119        // applied > master, no pending -> caught up.
1120        assert!(stream.is_caught_up());
1121    }
1122
1123    #[test]
1124    fn test_receive_apply_cycle() {
1125        let stream = ReplicaStream::new();
1126        stream.set_master("master1");
1127        stream.set_state(ReplicaStreamState::Streaming);
1128        stream.update_master_vlsn(5);
1129
1130        // Simulate receiving entries 1-5.
1131        for i in 1..=5 {
1132            stream.receive_entry(i, 1, vec![i as u8]);
1133        }
1134        assert_eq!(stream.get_received_vlsn(), 5);
1135        assert_eq!(stream.get_lag(), 5);
1136
1137        // Drain and apply.
1138        let entries = stream.drain_pending();
1139        assert_eq!(entries.len(), 5);
1140        for (vlsn, _, _) in &entries {
1141            stream.mark_applied(*vlsn);
1142        }
1143
1144        assert_eq!(stream.get_applied_vlsn(), 5);
1145        assert_eq!(stream.get_lag(), 0);
1146        assert!(stream.is_caught_up());
1147    }
1148
1149    #[test]
1150    fn test_debug_format() {
1151        let stream = ReplicaStream::new();
1152        stream.set_master("test-master");
1153        stream.set_state(ReplicaStreamState::Streaming);
1154        let debug = format!("{:?}", stream);
1155        assert!(debug.contains("test-master"));
1156        assert!(debug.contains("Streaming"));
1157    }
1158}