Skip to main content

noxu_rep/stream/
replica_stream.rs

1//! Replica stream  -  replica-side replication receiver.
2//!
3//! Tracks the state of
4//! receiving replication data from the master, including pending entries,
5//! applied/received VLSNs, and the master's latest known VLSN.
6//!
7//! The [`ReplicaReceiver`] provides the active I/O loop that reads framed
8//! entries from the feeder channel, passes them to a [`LogWriter`], and sends
9//! acks back.
10//!
11//! [`EnvironmentLogWriter`] is the live implementation of [`LogWriter`] that
12//! writes replicated entries into the local `LogManager` and updates the
13//! VLSN index.
14
15use noxu_log::{LogEntryType, Provisional};
16use noxu_sync::Mutex;
17use std::sync::Arc;
18use std::time::Duration;
19
20use crate::error::{RepError, Result};
21use crate::net::channel::Channel;
22
23// CRC32 (Ethernet/zlib polynomial) for per-frame integrity verification.
24// Same polynomial used in feeder.rs; see docs/checksum-selection.md.
25use crc32fast::hash as crc32_hash;
26
27// ---------------------------------------------------------------------------
28// LogWriter trait
29// ---------------------------------------------------------------------------
30
31/// Sink for replicated log entries.
32///
33/// Corresponds to replay thread accepting log records and writing them
34/// to the local environment. The replica calls `write_entry` for every entry
35/// received from the master.
36pub trait LogWriter: Send {
37    /// Write a replicated log entry.
38    ///
39    /// # Arguments
40    /// * `vlsn` - The VLSN of this entry.
41    /// * `entry_type` - The log entry type byte.
42    /// * `payload` - The raw entry payload.
43    fn write_entry(
44        &mut self,
45        vlsn: u64,
46        entry_type: u8,
47        payload: &[u8],
48    ) -> Result<()>;
49}
50
51// ---------------------------------------------------------------------------
52// EnvironmentLogWriter
53// ---------------------------------------------------------------------------
54
55/// `LogWriter` implementation backed by the live `LogManager`.
56///
57/// Each `write_entry` call:
58///   1. Resolves the `entry_type` byte to a `LogEntryType`.
59///   2. Writes the payload to the local log via `LogManager::log()`.
60///   3. Registers the returned LSN in the provided `vlsn_index` so that
61///      the VLSN→LSN mapping is kept up-to-date on the replica.
62///
63///
64pub struct EnvironmentLogWriter {
65    /// Shared log manager for appending replicated entries.
66    log_manager: Arc<noxu_log::LogManager>,
67    /// VLSN index: maps VLSN → (file_number, file_offset) on this replica.
68    vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
69}
70
71impl EnvironmentLogWriter {
72    /// Create a new `EnvironmentLogWriter`.
73    ///
74    /// # Arguments
75    /// * `log_manager` — The live `LogManager` for this replica environment.
76    /// * `vlsn_index`  — The VLSN index to update after each written entry.
77    pub fn new(
78        log_manager: Arc<noxu_log::LogManager>,
79        vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
80    ) -> Self {
81        Self { log_manager, vlsn_index }
82    }
83}
84
85impl LogWriter for EnvironmentLogWriter {
86    /// Write one replicated entry to the local log.
87    ///
88    /// Resolves `entry_type` → `LogEntryType`, appends the payload to the
89    /// WAL, and records the assigned LSN in the VLSN index.  Returns an
90    /// error if the entry type is unknown or the write fails.
91    fn write_entry(
92        &mut self,
93        vlsn: u64,
94        entry_type: u8,
95        payload: &[u8],
96    ) -> crate::error::Result<()> {
97        // Resolve the wire entry-type byte to the typed enum.
98        let log_entry_type = LogEntryType::from_type_num(entry_type)
99            .ok_or_else(|| {
100                crate::error::RepError::ProtocolError(format!(
101                    "replica: unknown entry_type byte {}",
102                    entry_type
103                ))
104            })?;
105
106        // Write to the local WAL.  Replicated entries are non-provisional and
107        // do not require an immediate fsync on every entry (the master already
108        // fsynced before sending).
109        let lsn = self
110            .log_manager
111            .log(log_entry_type, payload, Provisional::No, false, false)
112            .map_err(|e| {
113                crate::error::RepError::DatabaseError(format!(
114                    "replica log write failed: {}",
115                    e
116                ))
117            })?;
118
119        // Register VLSN → LSN in the replica's VLSN index so that
120        // FeederRunner/ack tracking can correlate positions.
121        // vlsn=0 is reserved as NULL_VLSN; skip it.
122        if vlsn > 0 {
123            self.vlsn_index.put(vlsn, lsn.file_number(), lsn.file_offset());
124        }
125
126        log::trace!(
127            "replica: wrote entry vlsn={} type={} lsn=({},{})",
128            vlsn,
129            log_entry_type,
130            lsn.file_number(),
131            lsn.file_offset(),
132        );
133
134        Ok(())
135    }
136}
137
138// ---------------------------------------------------------------------------
139// ReplicaReceiver
140// ---------------------------------------------------------------------------
141
142/// Wire frame header size (matches FeederRunner::FRAME_HEADER_LEN).
143///
144/// Format: `[vlsn:8][type:1][payload_len:4][crc32:4]` = 17 bytes.
145const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; // vlsn + type + len + crc32
146
147/// Active replica I/O loop.
148///
149/// `ReplicaReceiver` owns a channel to the master feeder. `run()` is a
150/// blocking loop that:
151///   1. Reads framed entries from the feeder.
152///   2. Deserializes each entry: `[vlsn:8][type:1][len:4][crc32:4][payload]`.
153///   3. Verifies `crc32fast::hash(payload) == crc32` — returns
154///      [`RepError::FrameCorrupted`] on mismatch.
155///   4. Passes the entry to `log_writer`.
156///   5. Sends an 8-byte LE VLSN ack back to the master.
157///   6. Returns when the channel is closed or an I/O error occurs.
158///
159/// Read thread and replay thread in the replica.
160pub struct ReplicaReceiver {
161    /// Channel to the master feeder.
162    channel: Arc<dyn Channel>,
163}
164
165impl ReplicaReceiver {
166    /// Create a new `ReplicaReceiver` on the given channel.
167    pub fn new(channel: Arc<dyn Channel>) -> Self {
168        Self { channel }
169    }
170
171    /// Run the replica receive loop.
172    ///
173    /// Blocks until the channel is closed or an unrecoverable error occurs.
174    /// Each successfully received entry is passed to `log_writer`; then an
175    /// ack `[vlsn: 8 bytes LE]` is sent back to the master.
176    ///
177    /// # Anti-replay / VLSN-ordering enforcement (LOG-7)
178    ///
179    /// The replica MUST observe strictly-increasing VLSNs across the
180    /// connection.  Without this check the master could (accidentally
181    /// or maliciously) replay an old frame, causing the replica to ack a
182    /// VLSN it had already applied and silently overwrite a more-recent
183    /// committed value.  We track a `received_vlsn` high-water mark and
184    /// reject any incoming frame whose VLSN is `<= high-water` with a
185    /// [`RepError::ProtocolError`].  Gaps above the high-water mark are
186    /// allowed because the master is permitted to skip non-replicated
187    /// entries.
188    ///
189    /// # Entry-type validation (LOG-10)
190    ///
191    /// The wire-format entry-type byte is also validated against the set
192    /// of known [`LogEntryType`] variants before forwarding to
193    /// `log_writer`.  An unknown byte indicates either a protocol-version
194    /// skew or an attacker who flipped a header bit, and is logged at
195    /// `error` level then skipped (the connection is kept open so the
196    /// stream can recover from a single bad frame; a repeated stream of
197    /// unknowns will surface via the operator alerting on the error log).
198    pub fn run(&self, log_writer: &mut dyn LogWriter) -> Result<()> {
199        let recv_timeout = Duration::from_secs(30);
200        // LOG-7: strictly-increasing VLSN high-water mark.  0 == NULL_VLSN
201        // (never assigned by the master), so "<= high-water" rejects 0
202        // too once a real VLSN has arrived.
203        let mut received_vlsn_high_water: u64 = 0;
204
205        loop {
206            // ----------------------------------------------------------------
207            // Receive the next framed entry from the feeder.
208            // ----------------------------------------------------------------
209            let frame = match self.channel.receive(recv_timeout) {
210                Ok(Some(f)) => f,
211                Ok(None) => {
212                    // Timeout with no data — keep waiting.
213                    continue;
214                }
215                Err(RepError::ChannelClosed(_)) => {
216                    // Master disconnected; clean shutdown.
217                    return Ok(());
218                }
219                Err(e) => return Err(e),
220            };
221
222            // ----------------------------------------------------------------
223            // Parse frame: [vlsn:8 LE][entry_type:1][payload_len:4 LE]
224            //              [crc32:4 LE][payload]
225            // ----------------------------------------------------------------
226            if frame.len() < FRAME_HEADER_LEN {
227                return Err(RepError::ProtocolError(format!(
228                    "replica: short frame: {} bytes",
229                    frame.len()
230                )));
231            }
232
233            let vlsn = u64::from_le_bytes(frame[0..8].try_into().unwrap());
234            let entry_type = frame[8];
235            let payload_len =
236                u32::from_le_bytes(frame[9..13].try_into().unwrap()) as usize;
237            let expected_crc =
238                u32::from_le_bytes(frame[13..17].try_into().unwrap());
239
240            if frame.len() < FRAME_HEADER_LEN + payload_len {
241                return Err(RepError::ProtocolError(format!(
242                    "replica: frame payload truncated: expected {} bytes, got {}",
243                    payload_len,
244                    frame.len() - FRAME_HEADER_LEN,
245                )));
246            }
247
248            let payload =
249                &frame[FRAME_HEADER_LEN..FRAME_HEADER_LEN + payload_len];
250
251            // ----------------------------------------------------------------
252            // Verify CRC32 — reject corrupted frames before applying.
253            // ----------------------------------------------------------------
254            let actual_crc = crc32_hash(payload);
255            if actual_crc != expected_crc {
256                return Err(RepError::FrameCorrupted {
257                    vlsn,
258                    expected: expected_crc,
259                    actual: actual_crc,
260                });
261            }
262
263            // ----------------------------------------------------------------
264            // LOG-7: enforce strictly-increasing VLSN order.
265            //
266            // VLSN 0 from the master is the NULL VLSN sentinel and is not
267            // checked against the high-water mark (the feeder sends 0 for
268            // entries that do not carry a VLSN).  All non-zero VLSNs MUST
269            // strictly exceed the previous high-water; otherwise we have
270            // either a replayed frame or a master that re-used a sequence
271            // number — both are protocol-fatal.
272            // ----------------------------------------------------------------
273            if vlsn != 0 && vlsn <= received_vlsn_high_water {
274                return Err(RepError::ProtocolError(format!(
275                    "replica: VLSN ordering violation: incoming vlsn={vlsn} \
276                     <= received high-water {received_vlsn_high_water}; \
277                     possible replay attack or master clock-skew"
278                )));
279            }
280
281            // ----------------------------------------------------------------
282            // LOG-10: validate the entry-type byte against the catalog
283            // before forwarding to the log writer.  An unknown type is
284            // logged and the frame is skipped — but the connection stays
285            // open so a single corrupt frame does not stall replication.
286            // ----------------------------------------------------------------
287            if LogEntryType::from_type_num(entry_type).is_none() {
288                log::error!(
289                    "replica: unknown entry_type byte {entry_type} on frame \
290                     vlsn={vlsn}; skipping (LOG-10)"
291                );
292                // Skip this frame: do not advance high-water, do not ack.
293                // The master will retransmit if the replica disconnects;
294                // for now we just continue to the next frame.
295                continue;
296            }
297
298            // ----------------------------------------------------------------
299            // Apply the entry to the local log.
300            // ----------------------------------------------------------------
301            log_writer.write_entry(vlsn, entry_type, payload)?;
302
303            // Advance the high-water mark only after a successful apply.
304            if vlsn != 0 {
305                received_vlsn_high_water = vlsn;
306            }
307
308            // ----------------------------------------------------------------
309            // Send ack: [vlsn: 8 bytes LE]
310            // ----------------------------------------------------------------
311            let ack = vlsn.to_le_bytes();
312            match self.channel.send(&ack) {
313                Ok(()) => {}
314                Err(RepError::ChannelClosed(_)) => return Ok(()),
315                Err(e) => return Err(e),
316            }
317        }
318    }
319}
320
321/// The state of the replica's replication stream.
322///
323/// Replica state machine.
324#[derive(Debug, Clone, Copy, PartialEq, Eq)]
325pub enum ReplicaStreamState {
326    /// Not connected to any master.
327    Idle,
328    /// Establishing connection to the master.
329    Connecting,
330    /// Actively receiving replication data.
331    Streaming,
332    /// Replaying log entries to catch up with the master.
333    CatchingUp,
334    /// Shutting down.
335    Shutdown,
336}
337
338/// Tracks the state of receiving replication data from the master.
339///
340/// The replica stream receives log entries from the master, buffers them
341/// in a pending queue, and tracks which VLSNs have been received vs.
342/// applied to the local database.
343///
344///
345pub struct ReplicaStream {
346    /// Name of the master we are connected to.
347    master_name: Mutex<Option<String>>,
348    /// Current connection state.
349    state: Mutex<ReplicaStreamState>,
350    /// Last VLSN applied to the local database.
351    applied_vlsn: Mutex<u64>,
352    /// Last VLSN received from the master.
353    received_vlsn: Mutex<u64>,
354    /// Master's latest known VLSN (from heartbeat messages).
355    master_vlsn: Mutex<u64>,
356    /// Pending entries waiting to be applied: (vlsn, entry_type, data).
357    pending_entries: Mutex<Vec<(u64, u8, Vec<u8>)>>,
358}
359
360impl Default for ReplicaStream {
361    fn default() -> Self {
362        Self::new()
363    }
364}
365
366impl ReplicaStream {
367    /// Create a new replica stream in the idle state.
368    pub fn new() -> Self {
369        ReplicaStream {
370            master_name: Mutex::new(None),
371            state: Mutex::new(ReplicaStreamState::Idle),
372            applied_vlsn: Mutex::new(0),
373            received_vlsn: Mutex::new(0),
374            master_vlsn: Mutex::new(0),
375            pending_entries: Mutex::new(Vec::new()),
376        }
377    }
378
379    /// Return the current stream state.
380    pub fn get_state(&self) -> ReplicaStreamState {
381        *self.state.lock()
382    }
383
384    /// Set the stream state.
385    pub fn set_state(&self, state: ReplicaStreamState) {
386        *self.state.lock() = state;
387    }
388
389    /// Return the last VLSN that has been applied to the local database.
390    pub fn get_applied_vlsn(&self) -> u64 {
391        *self.applied_vlsn.lock()
392    }
393
394    /// Return the last VLSN received from the master (may be ahead of
395    /// `applied_vlsn` if entries are buffered).
396    pub fn get_received_vlsn(&self) -> u64 {
397        *self.received_vlsn.lock()
398    }
399
400    /// Return the master's latest known VLSN (from heartbeat updates).
401    pub fn get_master_vlsn(&self) -> u64 {
402        *self.master_vlsn.lock()
403    }
404
405    /// Set the master node name.
406    pub fn set_master(&self, name: &str) {
407        *self.master_name.lock() = Some(name.to_string());
408    }
409
410    /// Return the master node name, if set.
411    pub fn get_master(&self) -> Option<String> {
412        self.master_name.lock().clone()
413    }
414
415    /// Receive a log entry from the master.
416    ///
417    /// The entry is added to the pending queue and `received_vlsn` is
418    /// updated if the new VLSN is greater than the current value.
419    pub fn receive_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
420        self.pending_entries.lock().push((vlsn, entry_type, data));
421        let mut received = self.received_vlsn.lock();
422        if vlsn > *received {
423            *received = vlsn;
424        }
425    }
426
427    /// Mark a VLSN as applied to the local database.
428    ///
429    /// The `applied_vlsn` is updated if the new VLSN is greater than the
430    /// current value (applied VLSNs should advance monotonically).
431    pub fn mark_applied(&self, vlsn: u64) {
432        let mut applied = self.applied_vlsn.lock();
433        if vlsn > *applied {
434            *applied = vlsn;
435        }
436    }
437
438    /// Update the master's known latest VLSN (typically from a heartbeat
439    /// message).
440    pub fn update_master_vlsn(&self, vlsn: u64) {
441        let mut master = self.master_vlsn.lock();
442        if vlsn > *master {
443            *master = vlsn;
444        }
445    }
446
447    /// Return the replication lag: the difference between the master's
448    /// latest VLSN and the last applied VLSN.
449    ///
450    /// A lag of 0 means the replica is fully caught up with the master.
451    pub fn get_lag(&self) -> u64 {
452        let master = *self.master_vlsn.lock();
453        let applied = *self.applied_vlsn.lock();
454        master.saturating_sub(applied)
455    }
456
457    /// Drain all pending entries for processing.
458    ///
459    /// Returns the entries in the order they were received and leaves
460    /// the pending queue empty.
461    pub fn drain_pending(&self) -> Vec<(u64, u8, Vec<u8>)> {
462        let mut pending = self.pending_entries.lock();
463        std::mem::take(&mut *pending)
464    }
465
466    /// Check if the replica is caught up with the master.
467    ///
468    /// Returns `true` if **all** of the following are true:
469    /// - the master VLSN is non-zero (i.e. at least one master VLSN has
470    ///   been observed; a stream that has never received a master VLSN
471    ///   reports `false`),
472    /// - the applied VLSN equals or exceeds the master's latest known
473    ///   VLSN, and
474    /// - there are no pending entries.
475    pub fn is_caught_up(&self) -> bool {
476        let applied = *self.applied_vlsn.lock();
477        let master = *self.master_vlsn.lock();
478        let pending_empty = self.pending_entries.lock().is_empty();
479        applied >= master && master > 0 && pending_empty
480    }
481}
482
483impl std::fmt::Debug for ReplicaStream {
484    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485        f.debug_struct("ReplicaStream")
486            .field("master", &self.get_master())
487            .field("state", &self.get_state())
488            .field("applied_vlsn", &self.get_applied_vlsn())
489            .field("received_vlsn", &self.get_received_vlsn())
490            .field("master_vlsn", &self.get_master_vlsn())
491            .field("lag", &self.get_lag())
492            .finish()
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499    use crate::net::channel::LocalChannelPair;
500
501    // -----------------------------------------------------------------------
502    // LogWriter helper
503    // -----------------------------------------------------------------------
504
505    struct RecordingWriter {
506        entries: Vec<(u64, u8, Vec<u8>)>,
507    }
508
509    impl RecordingWriter {
510        fn new() -> Self {
511            Self { entries: Vec::new() }
512        }
513    }
514
515    impl LogWriter for RecordingWriter {
516        fn write_entry(
517            &mut self,
518            vlsn: u64,
519            entry_type: u8,
520            payload: &[u8],
521        ) -> Result<()> {
522            self.entries.push((vlsn, entry_type, payload.to_vec()));
523            Ok(())
524        }
525    }
526
527    // -----------------------------------------------------------------------
528    // ReplicaReceiver tests
529    // -----------------------------------------------------------------------
530
531    fn make_frame(vlsn: u64, entry_type: u8, payload: &[u8]) -> Vec<u8> {
532        let crc = crc32_hash(payload);
533        let mut f = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
534        f.extend_from_slice(&vlsn.to_le_bytes());
535        f.push(entry_type);
536        f.extend_from_slice(&(payload.len() as u32).to_le_bytes());
537        f.extend_from_slice(&crc.to_le_bytes());
538        f.extend_from_slice(payload);
539        f
540    }
541
542    #[test]
543    fn test_replica_receiver_receives_and_acks() {
544        let pair = LocalChannelPair::new();
545        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
546        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
547
548        // Send 3 frames from the "master" side.
549        let frames = vec![
550            make_frame(1, 10, &[0xAA]),
551            make_frame(2, 20, &[0xBB, 0xCC]),
552            make_frame(3, 30, &[]),
553        ];
554
555        let master_clone = Arc::clone(&master_side);
556        let send_handle = std::thread::spawn(move || {
557            for f in &frames {
558                master_clone.send(f).unwrap();
559            }
560            // Collect 3 acks.
561            let mut acked = Vec::new();
562            let timeout = Duration::from_secs(5);
563            for _ in 0..3 {
564                let ack = master_clone.receive(timeout).unwrap().unwrap();
565                let vlsn = u64::from_le_bytes(ack[..8].try_into().unwrap());
566                acked.push(vlsn);
567            }
568            // Close the channel to terminate the receiver loop.
569            master_clone.close().unwrap();
570            acked
571        });
572
573        let receiver = ReplicaReceiver::new(Arc::clone(&replica_side));
574        let mut writer = RecordingWriter::new();
575        receiver.run(&mut writer).unwrap();
576
577        let acked = send_handle.join().unwrap();
578        assert_eq!(acked, vec![1, 2, 3]);
579
580        assert_eq!(writer.entries.len(), 3);
581        assert_eq!(writer.entries[0], (1, 10, vec![0xAA]));
582        assert_eq!(writer.entries[1], (2, 20, vec![0xBB, 0xCC]));
583        assert_eq!(writer.entries[2], (3, 30, vec![]));
584    }
585
586    #[test]
587    fn test_replica_receiver_stops_on_channel_close() {
588        let pair = LocalChannelPair::new();
589        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
590        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
591
592        // Close the master side immediately; receiver should return Ok.
593        master_side.close().unwrap();
594
595        let receiver = ReplicaReceiver::new(replica_side);
596        let mut writer = RecordingWriter::new();
597        // The replica_side's receive will fail with ChannelClosed since
598        // it is still "open" but the master closed its end. The LocalChannel
599        // implementation returns None on timeout (closed endpoint) then the
600        // RecvError causes ChannelClosed. Let's just verify it terminates.
601        // We accept either Ok or ChannelClosed here.
602        let res = receiver.run(&mut writer);
603        assert!(res.is_ok() || matches!(res, Err(RepError::ChannelClosed(_))));
604    }
605
606    // -----------------------------------------------------------------------
607    // Feeder → Replica round-trip test using LocalChannel
608    // -----------------------------------------------------------------------
609
610    #[test]
611    fn test_feeder_to_replica_round_trip() {
612        use crate::stream::feeder::{FeederRunner, LogScanner};
613        use std::collections::VecDeque;
614
615        struct SimpleScanner {
616            items: VecDeque<(u64, u8, Vec<u8>)>,
617        }
618        impl LogScanner for SimpleScanner {
619            fn next_entry(
620                &mut self,
621                from_vlsn: u64,
622            ) -> Option<(u64, u8, Vec<u8>)> {
623                if let Some(&(v, _, _)) = self.items.front()
624                    && v >= from_vlsn
625                {
626                    return self.items.pop_front();
627                }
628                None
629            }
630        }
631
632        let pair = LocalChannelPair::new();
633        let master_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
634        let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
635
636        // Entries to replicate.  Entry-type bytes must be valid
637        // `LogEntryType` variants (validated by LOG-10 enforcement); pick
638        // FileHeader/IN/BIN/BINDelta/InsertLN.
639        let valid_types: [u8; 5] = [1, 2, 3, 4, 10];
640        let entries: Vec<(u64, u8, Vec<u8>)> = (1..=5)
641            .map(|i| {
642                let etype = valid_types[(i - 1) as usize];
643                (i, etype, vec![i as u8; i as usize])
644            })
645            .collect();
646
647        // Replica thread.
648        let replica_ch_clone = Arc::clone(&replica_ch);
649        let replica_handle = std::thread::spawn(move || {
650            let receiver = ReplicaReceiver::new(replica_ch_clone);
651            let mut writer = RecordingWriter::new();
652            receiver.run(&mut writer).unwrap();
653            writer.entries
654        });
655
656        // Feeder thread.
657        let master_ch_clone = Arc::clone(&master_ch);
658        let feeder_handle = std::thread::spawn(move || {
659            let runner = FeederRunner::new(Arc::clone(&master_ch_clone), 1);
660            let mut scanner =
661                SimpleScanner { items: entries.into_iter().collect() };
662            runner.run(&mut scanner).unwrap();
663            runner.known_replica_vlsn()
664        });
665
666        // Let them run briefly, then close the channel.
667        std::thread::sleep(Duration::from_millis(200));
668        master_ch.close().unwrap();
669        replica_ch.close().unwrap();
670
671        let last_acked = feeder_handle.join().unwrap();
672        let written = replica_handle.join().unwrap();
673
674        assert_eq!(written.len(), 5);
675        for (i, (vlsn, etype, payload)) in written.iter().enumerate() {
676            let expected_vlsn = (i + 1) as u64;
677            assert_eq!(*vlsn, expected_vlsn);
678            assert_eq!(*etype, valid_types[i]);
679            assert_eq!(payload.len(), expected_vlsn as usize);
680        }
681        assert_eq!(last_acked, 5);
682    }
683
684    /// LOG-7: a replayed (or out-of-order) frame whose VLSN is `<=` the
685    /// already-received high-water mark must be rejected with
686    /// [`RepError::ProtocolError`].  Without this check the master could
687    /// re-send a previously-acked frame and the replica would silently
688    /// overwrite a more-recent committed value.
689    #[test]
690    fn test_replica_rejects_replayed_vlsn() {
691        let pair = LocalChannelPair::new();
692        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
693        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
694
695        // Send VLSN=5 then VLSN=3 (replay).  Entry-type 10 = InsertLN.
696        let frames =
697            vec![make_frame(5, 10, b"first"), make_frame(3, 10, b"replay")];
698
699        let master_clone = Arc::clone(&master_side);
700        let _send_handle = std::thread::spawn(move || {
701            for f in &frames {
702                let _ = master_clone.send(f);
703            }
704            // Drain the ack for the first frame so the receiver can advance.
705            let _ = master_clone.receive(Duration::from_secs(2));
706        });
707
708        let receiver = ReplicaReceiver::new(replica_side);
709        let mut writer = RecordingWriter::new();
710        let res = receiver.run(&mut writer);
711
712        match res {
713            Err(RepError::ProtocolError(msg)) => {
714                assert!(
715                    msg.contains("VLSN ordering violation"),
716                    "expected VLSN-ordering protocol error, got: {msg}"
717                );
718            }
719            other => {
720                panic!("expected ProtocolError on replay, got {other:?}")
721            }
722        }
723
724        // The first (in-order) frame should have been applied; the
725        // replayed one MUST NOT be.
726        assert_eq!(writer.entries.len(), 1);
727        assert_eq!(writer.entries[0].0, 5);
728    }
729
730    /// LOG-7: equal VLSNs are also rejected (the rule is *strictly*
731    /// increasing).
732    #[test]
733    fn test_replica_rejects_duplicate_vlsn() {
734        let pair = LocalChannelPair::new();
735        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
736        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
737
738        let frames = vec![make_frame(7, 10, b"a"), make_frame(7, 10, b"b")];
739
740        let master_clone = Arc::clone(&master_side);
741        let _send_handle = std::thread::spawn(move || {
742            for f in &frames {
743                let _ = master_clone.send(f);
744            }
745            let _ = master_clone.receive(Duration::from_secs(2));
746        });
747
748        let receiver = ReplicaReceiver::new(replica_side);
749        let mut writer = RecordingWriter::new();
750        let res = receiver.run(&mut writer);
751        assert!(
752            matches!(res, Err(RepError::ProtocolError(_))),
753            "expected ProtocolError on duplicate VLSN, got {res:?}"
754        );
755        assert_eq!(writer.entries.len(), 1);
756    }
757
758    /// LOG-7: a gap in the VLSN sequence (`vlsn > high-water + 1`) is
759    /// allowed — the master may legitimately skip VLSNs (non-replicated
760    /// entries reuse the same connection).
761    #[test]
762    fn test_replica_allows_vlsn_gap() {
763        let pair = LocalChannelPair::new();
764        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
765        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
766
767        // VLSN gap: 1 → 5 → 100.
768        let frames = vec![
769            make_frame(1, 10, b"a"),
770            make_frame(5, 10, b"b"),
771            make_frame(100, 10, b"c"),
772        ];
773
774        let master_clone = Arc::clone(&master_side);
775        let send_handle = std::thread::spawn(move || {
776            for f in &frames {
777                master_clone.send(f).unwrap();
778            }
779            for _ in 0..3 {
780                let _ = master_clone.receive(Duration::from_secs(2));
781            }
782            master_clone.close().unwrap();
783        });
784
785        let receiver = ReplicaReceiver::new(replica_side);
786        let mut writer = RecordingWriter::new();
787        receiver.run(&mut writer).unwrap();
788        send_handle.join().unwrap();
789
790        assert_eq!(writer.entries.len(), 3);
791        assert_eq!(writer.entries[0].0, 1);
792        assert_eq!(writer.entries[1].0, 5);
793        assert_eq!(writer.entries[2].0, 100);
794    }
795
796    /// LOG-10: an unknown entry-type byte is logged and the frame is
797    /// skipped.  The connection stays open; subsequent valid frames are
798    /// applied normally.
799    #[test]
800    fn test_replica_skips_unknown_entry_type() {
801        let pair = LocalChannelPair::new();
802        let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
803        let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
804
805        // VLSN=1: bogus entry_type=200 (no LogEntryType::from_type_num).
806        // VLSN=2: valid InsertLN (10).  After my high-water bookkeeping,
807        // VLSN=2 must still be accepted because the bogus frame did NOT
808        // advance the high-water.
809        let frames =
810            vec![make_frame(1, 200, b"bogus"), make_frame(2, 10, b"good")];
811
812        let master_clone = Arc::clone(&master_side);
813        let send_handle = std::thread::spawn(move || {
814            for f in &frames {
815                master_clone.send(f).unwrap();
816            }
817            // Only the second (valid) frame produces an ack.
818            let ack = master_clone.receive(Duration::from_secs(2)).unwrap();
819            master_clone.close().unwrap();
820            ack
821        });
822
823        let receiver = ReplicaReceiver::new(replica_side);
824        let mut writer = RecordingWriter::new();
825        receiver.run(&mut writer).unwrap();
826
827        let ack = send_handle.join().unwrap();
828        let acked_vlsn =
829            u64::from_le_bytes(ack.unwrap()[..8].try_into().unwrap());
830
831        assert_eq!(writer.entries.len(), 1, "bogus frame must be skipped");
832        assert_eq!(writer.entries[0].0, 2);
833        assert_eq!(writer.entries[0].1, 10);
834        assert_eq!(acked_vlsn, 2);
835    }
836
837    // -----------------------------------------------------------------------
838    // Original ReplicaStream state struct tests
839    // -----------------------------------------------------------------------
840
841    #[test]
842    fn test_new_replica_stream() {
843        let stream = ReplicaStream::new();
844        assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
845        assert_eq!(stream.get_applied_vlsn(), 0);
846        assert_eq!(stream.get_received_vlsn(), 0);
847        assert_eq!(stream.get_master_vlsn(), 0);
848        assert!(stream.get_master().is_none());
849        assert_eq!(stream.get_lag(), 0);
850    }
851
852    #[test]
853    fn test_default() {
854        let stream = ReplicaStream::default();
855        assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
856    }
857
858    #[test]
859    fn test_state_transitions() {
860        let stream = ReplicaStream::new();
861        assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
862
863        stream.set_state(ReplicaStreamState::Connecting);
864        assert_eq!(stream.get_state(), ReplicaStreamState::Connecting);
865
866        stream.set_state(ReplicaStreamState::Streaming);
867        assert_eq!(stream.get_state(), ReplicaStreamState::Streaming);
868
869        stream.set_state(ReplicaStreamState::CatchingUp);
870        assert_eq!(stream.get_state(), ReplicaStreamState::CatchingUp);
871
872        stream.set_state(ReplicaStreamState::Shutdown);
873        assert_eq!(stream.get_state(), ReplicaStreamState::Shutdown);
874    }
875
876    #[test]
877    fn test_master_name() {
878        let stream = ReplicaStream::new();
879        assert!(stream.get_master().is_none());
880
881        stream.set_master("master-node-1");
882        assert_eq!(stream.get_master(), Some("master-node-1".to_string()));
883
884        stream.set_master("master-node-2");
885        assert_eq!(stream.get_master(), Some("master-node-2".to_string()));
886    }
887
888    #[test]
889    fn test_receive_and_drain() {
890        let stream = ReplicaStream::new();
891        stream.receive_entry(1, 10, vec![0xAA]);
892        stream.receive_entry(2, 20, vec![0xBB, 0xCC]);
893        stream.receive_entry(3, 30, vec![]);
894
895        assert_eq!(stream.get_received_vlsn(), 3);
896
897        let entries = stream.drain_pending();
898        assert_eq!(entries.len(), 3);
899        assert_eq!(entries[0], (1, 10, vec![0xAA]));
900        assert_eq!(entries[1], (2, 20, vec![0xBB, 0xCC]));
901        assert_eq!(entries[2], (3, 30, vec![]));
902
903        // Pending should be empty now.
904        let entries2 = stream.drain_pending();
905        assert!(entries2.is_empty());
906    }
907
908    #[test]
909    fn test_received_vlsn_monotonic() {
910        let stream = ReplicaStream::new();
911        stream.receive_entry(5, 1, vec![]);
912        assert_eq!(stream.get_received_vlsn(), 5);
913
914        // Out-of-order receive should not decrease received_vlsn.
915        stream.receive_entry(3, 1, vec![]);
916        assert_eq!(stream.get_received_vlsn(), 5);
917
918        stream.receive_entry(7, 1, vec![]);
919        assert_eq!(stream.get_received_vlsn(), 7);
920    }
921
922    #[test]
923    fn test_mark_applied() {
924        let stream = ReplicaStream::new();
925        stream.mark_applied(5);
926        assert_eq!(stream.get_applied_vlsn(), 5);
927
928        stream.mark_applied(10);
929        assert_eq!(stream.get_applied_vlsn(), 10);
930
931        // Applied VLSN should not go backwards.
932        stream.mark_applied(7);
933        assert_eq!(stream.get_applied_vlsn(), 10);
934    }
935
936    #[test]
937    fn test_update_master_vlsn() {
938        let stream = ReplicaStream::new();
939        stream.update_master_vlsn(100);
940        assert_eq!(stream.get_master_vlsn(), 100);
941
942        stream.update_master_vlsn(150);
943        assert_eq!(stream.get_master_vlsn(), 150);
944
945        // Should not go backwards.
946        stream.update_master_vlsn(120);
947        assert_eq!(stream.get_master_vlsn(), 150);
948    }
949
950    #[test]
951    fn test_lag_calculation() {
952        let stream = ReplicaStream::new();
953        stream.update_master_vlsn(100);
954        assert_eq!(stream.get_lag(), 100);
955
956        stream.mark_applied(50);
957        assert_eq!(stream.get_lag(), 50);
958
959        stream.mark_applied(100);
960        assert_eq!(stream.get_lag(), 0);
961
962        // Applied exceeds master (shouldn't normally happen, but be safe).
963        stream.mark_applied(110);
964        assert_eq!(stream.get_lag(), 0);
965    }
966
967    #[test]
968    fn test_is_caught_up() {
969        let stream = ReplicaStream::new();
970        // Not caught up: master_vlsn is 0.
971        assert!(!stream.is_caught_up());
972
973        stream.update_master_vlsn(10);
974        // Not caught up: applied_vlsn is 0.
975        assert!(!stream.is_caught_up());
976
977        stream.mark_applied(10);
978        // Caught up: applied == master, no pending.
979        assert!(stream.is_caught_up());
980
981        // Add a pending entry -> not caught up.
982        stream.receive_entry(11, 1, vec![]);
983        stream.update_master_vlsn(11);
984        assert!(!stream.is_caught_up());
985
986        // Drain and apply.
987        stream.drain_pending();
988        stream.mark_applied(11);
989        assert!(stream.is_caught_up());
990    }
991
992    #[test]
993    fn test_caught_up_with_excess_applied() {
994        let stream = ReplicaStream::new();
995        stream.update_master_vlsn(5);
996        stream.mark_applied(10);
997        // applied > master, no pending -> caught up.
998        assert!(stream.is_caught_up());
999    }
1000
1001    #[test]
1002    fn test_receive_apply_cycle() {
1003        let stream = ReplicaStream::new();
1004        stream.set_master("master1");
1005        stream.set_state(ReplicaStreamState::Streaming);
1006        stream.update_master_vlsn(5);
1007
1008        // Simulate receiving entries 1-5.
1009        for i in 1..=5 {
1010            stream.receive_entry(i, 1, vec![i as u8]);
1011        }
1012        assert_eq!(stream.get_received_vlsn(), 5);
1013        assert_eq!(stream.get_lag(), 5);
1014
1015        // Drain and apply.
1016        let entries = stream.drain_pending();
1017        assert_eq!(entries.len(), 5);
1018        for (vlsn, _, _) in &entries {
1019            stream.mark_applied(*vlsn);
1020        }
1021
1022        assert_eq!(stream.get_applied_vlsn(), 5);
1023        assert_eq!(stream.get_lag(), 0);
1024        assert!(stream.is_caught_up());
1025    }
1026
1027    #[test]
1028    fn test_debug_format() {
1029        let stream = ReplicaStream::new();
1030        stream.set_master("test-master");
1031        stream.set_state(ReplicaStreamState::Streaming);
1032        let debug = format!("{:?}", stream);
1033        assert!(debug.contains("test-master"));
1034        assert!(debug.contains("Streaming"));
1035    }
1036}