Skip to main content

noxu_rep/stream/
feeder.rs

1//! Feeder  -  master-side replication sender.
2//!
3//! Tracks the state of
4//! feeding replication data to a single replica, including the current
5//! VLSN position, acknowledged VLSN, output queue, and heartbeat tracking.
6//!
7//! The [`FeederRunner`] provides the active I/O loop that scans the log
8//! forward from a given VLSN, frames each entry, and sends it to the replica
9//! via a [`Channel`]. Acks are received on the same channel.
10//!
11//! [`EnvironmentLogScanner`] is the live implementation of [`LogScanner`]
12//! backed by the real `LogManager` + `FileManager`.
13//! Rep.impl.node.Feeder.MasterFeederSource`.
14
15use noxu_dbi::EnvironmentImpl;
16use noxu_log::MAX_ITEM_SIZE;
17use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
18use noxu_log::file_header::LOG_VERSION as LOG_FILE_VERSION;
19use noxu_log::file_header::on_disk_size as file_header_on_disk_size;
20use noxu_log::file_manager::FileManager;
21use noxu_sync::Mutex;
22use noxu_util::lsn::{Lsn, NULL_LSN};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use crate::error::{RepError, Result};
27use crate::net::channel::Channel;
28
29// CRC32 (Ethernet/zlib polynomial via PCLMULQDQ on x86-64 — ~18 GiB/s).
30// See docs/checksum-selection.md for the full benchmark rationale.
31use crc32fast;
32
33// ---------------------------------------------------------------------------
34// Log scanner trait
35// ---------------------------------------------------------------------------
36
37/// An iterator over log entries starting from a given VLSN.
38///
39/// Corresponds to `FeederSource` / `MasterFeederSource`. The scanner
40/// returns `(vlsn, entry_type, payload)` tuples in VLSN order. Returning
41/// `None` signals that there are no more entries *yet*; the caller will call
42/// `next_entry` again after a short wait.
43pub trait LogScanner: Send {
44    /// Return the next available entry with VLSN >= `from_vlsn`, or `None` if
45    /// no new entry is available at this moment.
46    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)>;
47}
48
49// ---------------------------------------------------------------------------
50// EnvironmentLogScanner
51// ---------------------------------------------------------------------------
52
53/// `LogScanner` implementation backed by the live `EnvironmentImpl`.
54///
55/// Scans the log forward from an LSN cursor, returning entries that carry a
56/// VLSN >= `from_vlsn`. On each call to `next_entry` the scanner advances
57/// its internal file/offset position one entry at a time; when it reaches
58/// the current end of the log it returns `None` (the `FeederRunner` will
59/// call again after a brief poll interval).
60///
61///
62pub struct EnvironmentLogScanner {
63    /// The log `FileManager` used for raw byte-level reads.
64    file_manager: Arc<FileManager>,
65    /// Current scan position: next file number and byte offset to read.
66    cursor_file: u32,
67    cursor_offset: u64,
68    /// Highest VLSN returned so far (to avoid duplicates on re-entrant calls).
69    last_returned_vlsn: u64,
70}
71
72impl EnvironmentLogScanner {
73    /// Create a scanner that starts at `start_lsn`.
74    ///
75    /// If `start_lsn` is `NULL_LSN` the scanner begins at the very first log
76    /// entry in file 0.  The correct first-entry offset is resolved from
77    /// file 0's `log_version` (32 for v2, 36 for v3).  If file 0 does not
78    /// exist yet (empty log) the current-version default (36) is used.
79    ///
80    /// Obtain the `FileManager` from `EnvironmentImpl::get_log_manager()` →
81    /// `LogManager` is not directly accessible, but `EnvironmentImpl` exposes
82    /// a `get_log_manager()` returning `Option<Arc<LogManager>>`.  For the
83    /// scanner we need the `FileManager` underneath it; the simplest approach
84    /// is to construct the scanner directly from a `FileManager` Arc.
85    pub fn new(env: &EnvironmentImpl, start_lsn: Option<Lsn>) -> Option<Self> {
86        // We need the FileManager to do raw byte reads.  It is not directly
87        // exposed on EnvironmentImpl, so we access it via the LogManager.
88        // LogManager::file_manager is private, so we carry it separately.
89        // For now, use the env_home path to construct a read-only FileManager.
90        //
91        // The master feeder reads the log files
92        // directly, starting at the replica's current VLSN position.
93        let env_home = env.get_env_home().to_path_buf();
94        let fm = Arc::new(
95            FileManager::new(&env_home, true, 256 * 1024 * 1024, 32).ok()?,
96        );
97
98        let (cursor_file, cursor_offset) = match start_lsn {
99            Some(lsn) if lsn != NULL_LSN => {
100                (lsn.file_number(), lsn.file_offset() as u64)
101            }
102            _ => {
103                // Start from file 0, first entry offset.  Use the actual
104                // header size of file 0 if it exists (v2 → 32, v3 → 36);
105                // fall back to current-version default (36) if it does not.
106                let file0_offset =
107                    fm.file_header_size_for(0).unwrap_or_else(|_| {
108                        file_header_on_disk_size(LOG_FILE_VERSION)
109                    }) as u64;
110                (0, file0_offset)
111            }
112        };
113
114        Some(Self {
115            file_manager: fm,
116            cursor_file,
117            cursor_offset,
118            last_returned_vlsn: 0,
119        })
120    }
121
122    /// Read the raw header+payload at `(file_num, offset)`.
123    ///
124    /// Returns `(entry_size_bytes, vlsn_opt, entry_type_byte, payload)` or
125    /// `None` if the bytes don't form a valid entry (zero fill, truncation).
126    fn read_raw_entry(
127        &self,
128        file_num: u32,
129        offset: u64,
130    ) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
131        let mut hdr = [0u8; MIN_HEADER_SIZE];
132        let n = self
133            .file_manager
134            .read_from_file(file_num, offset, &mut hdr)
135            .ok()?;
136        if n < MIN_HEADER_SIZE {
137            return None;
138        }
139        // Zero-fill region past last written entry.
140        if hdr[4] == 0 {
141            return None;
142        }
143
144        let entry_type_byte = hdr[4];
145        let flags = hdr[5];
146        let item_size =
147            u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
148
149        let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
150        let header_size =
151            if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
152
153        // Sanity cap: same shared MAX_ITEM_SIZE used by every log reader
154        // so that an attacker who flips item_size cannot cause a 100 MiB
155        // allocation here while passing other readers' bounds.
156        if item_size > MAX_ITEM_SIZE {
157            return None;
158        }
159
160        let entry_size = header_size + item_size;
161        let mut full = vec![0u8; entry_size];
162        let n = self
163            .file_manager
164            .read_from_file(file_num, offset, &mut full)
165            .ok()?;
166        if n < entry_size {
167            return None;
168        }
169
170        // Extract VLSN from the header extension if present (8-byte LE i64).
171        let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
172            let raw = i64::from_le_bytes(
173                full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
174            );
175            if raw > 0 {
176                Some(raw as u64)
177            } else {
178                // Negative or zero i64 with vlsn_present flag set is a
179                // contradiction (NULL VLSN should not have the flag set).
180                // Surface it but keep the legacy "treat as missing" behaviour
181                // so a single corrupt entry does not stall the feeder.
182                // LOG-9.
183                log::warn!(
184                    "EnvironmentLogScanner: implausible VLSN value {} at \
185                     file {:08x} offset {:#x}; treating as no-VLSN",
186                    raw,
187                    file_num,
188                    offset,
189                );
190                None
191            }
192        } else {
193            None
194        };
195
196        let payload = full[header_size..].to_vec();
197        Some((entry_size, vlsn_opt, entry_type_byte, payload))
198    }
199}
200
201impl LogScanner for EnvironmentLogScanner {
202    /// Return the next entry with VLSN >= `from_vlsn`, advancing the cursor.
203    ///
204    /// Scans forward one entry at a time.  Returns `None` when the cursor
205    /// reaches the end of the currently-written log.  The feeder will sleep
206    /// briefly and call again.
207    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
208        // Collect file numbers once per call; cheap (directory listing).
209        let file_nums = self.file_manager.list_file_numbers().ok()?;
210        if file_nums.is_empty() {
211            return None;
212        }
213
214        loop {
215            // Skip files before the current cursor file.
216            if !file_nums.contains(&self.cursor_file) {
217                // Advance to the next known file.
218                let next =
219                    file_nums.iter().find(|&&n| n > self.cursor_file).copied();
220                match next {
221                    Some(n) => {
222                        self.cursor_file = n;
223                        self.cursor_offset =
224                            self.file_manager
225                                .file_header_size_for(n)
226                                .unwrap_or_else(|_| {
227                                    file_header_on_disk_size(LOG_FILE_VERSION)
228                                }) as u64;
229                    }
230                    None => return None, // No more files.
231                }
232            }
233
234            let file_len =
235                self.file_manager.get_file_length(self.cursor_file).ok()?;
236
237            if self.cursor_offset >= file_len {
238                // End of current file: move to next file.
239                let next =
240                    file_nums.iter().find(|&&n| n > self.cursor_file).copied();
241                match next {
242                    Some(n) => {
243                        self.cursor_file = n;
244                        self.cursor_offset =
245                            self.file_manager
246                                .file_header_size_for(n)
247                                .unwrap_or_else(|_| {
248                                    file_header_on_disk_size(LOG_FILE_VERSION)
249                                }) as u64;
250                        continue;
251                    }
252                    None => return None, // End of log.
253                }
254            }
255
256            match self.read_raw_entry(self.cursor_file, self.cursor_offset) {
257                None => {
258                    // End of written data in this file; move to next.
259                    let next = file_nums
260                        .iter()
261                        .find(|&&n| n > self.cursor_file)
262                        .copied();
263                    match next {
264                        Some(n) => {
265                            self.cursor_file = n;
266                            self.cursor_offset = self
267                                .file_manager
268                                .file_header_size_for(n)
269                                .unwrap_or_else(|_| {
270                                    file_header_on_disk_size(LOG_FILE_VERSION)
271                                })
272                                as u64;
273                            continue;
274                        }
275                        None => return None,
276                    }
277                }
278                Some((entry_size, vlsn_opt, entry_type_byte, payload)) => {
279                    self.cursor_offset += entry_size as u64;
280
281                    if let Some(vlsn) = vlsn_opt
282                        && vlsn >= from_vlsn
283                        && vlsn > self.last_returned_vlsn
284                    {
285                        self.last_returned_vlsn = vlsn;
286                        return Some((vlsn, entry_type_byte, payload));
287                    }
288                    // Entry has no VLSN, vlsn < from_vlsn, or already
289                    // returned: keep scanning.
290                }
291            }
292        }
293    }
294}
295
296// ---------------------------------------------------------------------------
297// FeederRunner
298// ---------------------------------------------------------------------------
299
300/// Wire frame format (all integers little-endian):
301///
302/// ```text
303/// ┌──────────────────────────────────────────────────────────┐
304/// │  vlsn        : u64  (8 bytes)                           │
305/// │  entry_type  : u8   (1 byte)                            │
306/// │  payload_len : u32  (4 bytes)                           │
307/// │  crc32       : u32  (4 bytes) — CRC32 of payload bytes  │
308/// ├──────────────────────────────────────────────────────────┤
309/// │  payload     : [u8; payload_len]                        │
310/// └──────────────────────────────────────────────────────────┘
311/// ```
312///
313/// The receiver verifies `crc32fast::hash(payload) == crc32` before
314/// applying the entry.  A mismatch is returned as [`RepError::FrameCorrupted`].
315const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; // vlsn + type + len + crc32
316
317/// Active feeder I/O loop.
318///
319/// `FeederRunner` owns a channel to a specific replica and a starting VLSN.
320/// `run()` is a blocking loop that:
321///   1. Scans the log for entries at `vlsn_start` and beyond.
322///   2. Frames each entry and sends it to the replica.
323///   3. Reads ack messages back from the replica and advances `acked_vlsn`.
324///   4. Returns when the channel is closed or an I/O error occurs.
325///
326/// The runner is single-threaded: log scanning, framing, sending, and ack
327/// polling all interleave inside the one `run()` loop on the caller's
328/// thread. There is no separate output/input thread pair; the same loop
329/// handles both directions.
330pub struct FeederRunner {
331    /// Channel to the replica.
332    channel: Arc<dyn Channel>,
333    /// First VLSN to send.
334    vlsn_start: u64,
335    /// Most recent VLSN acknowledged by the replica (tracked externally via
336    /// the owning [`Feeder`] state struct, but also tracked here for quick
337    /// access).
338    known_replica_vlsn: Mutex<u64>,
339    /// REP-9: name of the replica this runner serves, and a sink that
340    /// forwards each inbound ack `(replica_name, acked_vlsn)` to the owning
341    /// environment's `record_ack`.  Without this, production acks reached
342    /// only the private `known_replica_vlsn` and never the `AckTracker`
343    /// (commit-blocking quorum) or `Feeder::acked_vlsn` (DTVLSN ranking).
344    /// Mirrors JE `FeederTxns.noteReplicaAck` being driven from the feeder
345    /// input loop.
346    replica_name: String,
347    ack_sink: Option<AckSink>,
348}
349
350/// REP-9: callback invoked by `FeederRunner::run` for each inbound ack,
351/// forwarding `(replica_name, acked_vlsn)` to `env.record_ack`.
352pub type AckSink = Arc<dyn Fn(&str, u64) + Send + Sync>;
353
354impl FeederRunner {
355    /// Create a new `FeederRunner`.
356    ///
357    /// # Arguments
358    /// * `channel` - The channel to the replica.
359    /// * `vlsn_start` - The VLSN from which to begin streaming.
360    pub fn new(channel: Arc<dyn Channel>, vlsn_start: u64) -> Self {
361        Self {
362            channel,
363            vlsn_start,
364            known_replica_vlsn: Mutex::new(0),
365            replica_name: String::new(),
366            ack_sink: None,
367        }
368    }
369
370    /// REP-9: like [`Self::new`] but wires an ack sink so each inbound ack is
371    /// forwarded to the owning environment (`env.record_ack(vlsn, name)`),
372    /// bridging the production feeder path to the `AckTracker` and the
373    /// `Feeder::acked_vlsn` that the DTVLSN computation reads.
374    pub fn new_with_ack_sink(
375        channel: Arc<dyn Channel>,
376        vlsn_start: u64,
377        replica_name: String,
378        ack_sink: AckSink,
379    ) -> Self {
380        Self {
381            channel,
382            vlsn_start,
383            known_replica_vlsn: Mutex::new(0),
384            replica_name,
385            ack_sink: Some(ack_sink),
386        }
387    }
388
389    /// Return the last VLSN acknowledged by the replica.
390    pub fn known_replica_vlsn(&self) -> u64 {
391        *self.known_replica_vlsn.lock()
392    }
393
394    /// Run the feeder loop.
395    ///
396    /// Streams all log entries from `vlsn_start` to the replica, then polls
397    /// for new entries. Acks from the replica update `known_replica_vlsn`.
398    ///
399    /// Returns `Ok(())` when the channel is closed gracefully, or `Err` on
400    /// an I/O error.
401    pub fn run(&self, log_scanner: &mut dyn LogScanner) -> Result<()> {
402        let mut next_vlsn = self.vlsn_start;
403        let poll_interval = Duration::from_millis(5);
404        let ack_timeout = Duration::from_millis(1);
405
406        loop {
407            // ----------------------------------------------------------------
408            // 1. Send all available log entries.
409            // ----------------------------------------------------------------
410            while let Some((vlsn, entry_type, payload)) =
411                log_scanner.next_entry(next_vlsn)
412            {
413                self.send_entry(vlsn, entry_type, &payload)?;
414                next_vlsn = vlsn + 1;
415            }
416
417            // ----------------------------------------------------------------
418            // 2. Poll for acks from the replica (non-blocking style).
419            // ----------------------------------------------------------------
420            match self.channel.receive(ack_timeout) {
421                Ok(Some(ack_bytes)) => {
422                    if ack_bytes.len() >= 8 {
423                        let vlsn = u64::from_le_bytes(
424                            ack_bytes[..8].try_into().unwrap(),
425                        );
426                        {
427                            let mut guard = self.known_replica_vlsn.lock();
428                            if vlsn > *guard {
429                                *guard = vlsn;
430                            }
431                        }
432                        // REP-9 Part 1: forward the ack to the owning env so
433                        // it reaches the AckTracker (commit-blocking quorum)
434                        // AND Feeder::acked_vlsn (DTVLSN ranking). JE drives
435                        // FeederTxns.noteReplicaAck from this same loop.
436                        if let Some(sink) = &self.ack_sink {
437                            sink(&self.replica_name, vlsn);
438                        }
439                    }
440                    // Continue without sleeping — more acks may be waiting.
441                    continue;
442                }
443                Ok(None) => {
444                    // Timeout: no ack yet, check for more log entries.
445                    std::thread::sleep(poll_interval);
446                    continue;
447                }
448                Err(RepError::ChannelClosed(_)) => {
449                    // Replica disconnected; clean shutdown.
450                    return Ok(());
451                }
452                Err(e) => return Err(e),
453            }
454        }
455    }
456
457    /// Frame and send a single log entry.
458    ///
459    /// Frame format (all LE):
460    ///   `[vlsn:8][type:1][payload_len:4][crc32:4][payload]`
461    fn send_entry(
462        &self,
463        vlsn: u64,
464        entry_type: u8,
465        payload: &[u8],
466    ) -> Result<()> {
467        let crc = crc32fast::hash(payload);
468        let mut frame = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
469        frame.extend_from_slice(&vlsn.to_le_bytes());
470        frame.push(entry_type);
471        frame.extend_from_slice(&(payload.len() as u32).to_le_bytes());
472        frame.extend_from_slice(&crc.to_le_bytes());
473        frame.extend_from_slice(payload);
474        self.channel.send(&frame)
475    }
476}
477
478/// The state of a feeder connection to a replica.
479///
480/// Feeder state machine.
481#[derive(Debug, Clone, Copy, PartialEq, Eq)]
482pub enum FeederState {
483    /// Not connected to any replica.
484    Idle,
485    /// Performing the initial handshake (protocol negotiation).
486    Handshaking,
487    /// Actively streaming replication data.
488    Streaming,
489    /// Shutting down.
490    Shutdown,
491}
492
493/// Tracks the state of feeding replication data to a single replica.
494///
495/// Each `Feeder` instance corresponds to one replica connection. The feeder
496/// maintains a queue of outbound messages and tracks which VLSNs have been
497/// sent vs. acknowledged.
498///
499///
500pub struct Feeder {
501    /// Name of the replica this feeder is serving.
502    replica_name: String,
503    /// Current connection state.
504    state: Mutex<FeederState>,
505    /// Next VLSN to send to the replica.
506    current_vlsn: Mutex<u64>,
507    /// Last VLSN acknowledged by the replica.
508    acked_vlsn: Mutex<u64>,
509    /// Timestamp of last activity (send or receive).
510    last_activity: Mutex<Instant>,
511    /// Pending messages queued for sending to the replica.
512    /// Each message is a serialized byte vector.
513    output_queue: Mutex<Vec<Vec<u8>>>,
514}
515
516impl Feeder {
517    /// Create a new feeder for the named replica.
518    pub fn new(replica_name: String) -> Self {
519        Feeder {
520            replica_name,
521            state: Mutex::new(FeederState::Idle),
522            current_vlsn: Mutex::new(0),
523            acked_vlsn: Mutex::new(0),
524            last_activity: Mutex::new(Instant::now()),
525            output_queue: Mutex::new(Vec::new()),
526        }
527    }
528
529    /// Return the name of the replica this feeder is serving.
530    pub fn get_replica_name(&self) -> String {
531        self.replica_name.clone()
532    }
533
534    /// Return the current feeder state.
535    pub fn get_state(&self) -> FeederState {
536        *self.state.lock()
537    }
538
539    /// Set the feeder state.
540    pub fn set_state(&self, state: FeederState) {
541        *self.state.lock() = state;
542    }
543
544    /// Return the next VLSN that will be sent.
545    pub fn get_current_vlsn(&self) -> u64 {
546        *self.current_vlsn.lock()
547    }
548
549    /// Return the last VLSN acknowledged by the replica.
550    pub fn get_acked_vlsn(&self) -> u64 {
551        *self.acked_vlsn.lock()
552    }
553
554    /// Queue a log entry for sending to the replica.
555    ///
556    /// The entry is serialized into a byte vector containing the VLSN,
557    /// entry type, and raw data. The current VLSN is advanced to one past
558    /// the queued VLSN **only when `vlsn >= current_vlsn`**; if `vlsn`
559    /// is older than the current position the entry is still queued but
560    /// `current_vlsn` is left unchanged.
561    pub fn queue_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
562        let mut msg = Vec::with_capacity(9 + data.len());
563        msg.extend_from_slice(&vlsn.to_le_bytes());
564        msg.push(entry_type);
565        msg.extend_from_slice(&data);
566
567        self.output_queue.lock().push(msg);
568
569        let mut current = self.current_vlsn.lock();
570        if vlsn >= *current {
571            *current = vlsn + 1;
572        }
573
574        *self.last_activity.lock() = Instant::now();
575    }
576
577    /// Record an acknowledgement from the replica.
578    ///
579    /// The acked VLSN is updated if the new value is greater than the
580    /// current acked VLSN (acks should arrive in order, but we are
581    /// defensive).
582    pub fn record_ack(&self, vlsn: u64) {
583        let mut acked = self.acked_vlsn.lock();
584        if vlsn > *acked {
585            *acked = vlsn;
586        }
587        *self.last_activity.lock() = Instant::now();
588    }
589
590    /// Return the replication lag: the difference between the current
591    /// VLSN (next to send) and the last acknowledged VLSN.
592    ///
593    /// A lag of 0 means the replica is fully caught up.
594    pub fn get_lag(&self) -> u64 {
595        let current = *self.current_vlsn.lock();
596        let acked = *self.acked_vlsn.lock();
597        current.saturating_sub(acked)
598    }
599
600    /// Take all queued messages (drain the output queue).
601    ///
602    /// Returns the messages in the order they were queued and leaves
603    /// the queue empty.
604    pub fn drain_queue(&self) -> Vec<Vec<u8>> {
605        let mut queue = self.output_queue.lock();
606        std::mem::take(&mut *queue)
607    }
608
609    /// Check if the feeder has timed out (no activity within the given
610    /// duration).
611    pub fn is_timed_out(&self, timeout: Duration) -> bool {
612        self.last_activity.lock().elapsed() > timeout
613    }
614
615    /// Update the activity timestamp to the current time.
616    pub fn touch(&self) {
617        *self.last_activity.lock() = Instant::now();
618    }
619}
620
621impl std::fmt::Debug for Feeder {
622    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
623        f.debug_struct("Feeder")
624            .field("replica_name", &self.replica_name)
625            .field("state", &self.get_state())
626            .field("current_vlsn", &self.get_current_vlsn())
627            .field("acked_vlsn", &self.get_acked_vlsn())
628            .field("lag", &self.get_lag())
629            .finish()
630    }
631}
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636    use crate::net::channel::LocalChannelPair;
637    use std::collections::VecDeque;
638
639    // -----------------------------------------------------------------------
640    // Helpers
641    // -----------------------------------------------------------------------
642
643    /// A simple in-memory log scanner backed by a queue.
644    struct VecLogScanner {
645        entries: VecDeque<(u64, u8, Vec<u8>)>,
646    }
647
648    impl VecLogScanner {
649        fn new(entries: Vec<(u64, u8, Vec<u8>)>) -> Self {
650            Self { entries: entries.into_iter().collect() }
651        }
652    }
653
654    impl LogScanner for VecLogScanner {
655        fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
656            if let Some(&(vlsn, _, _)) = self.entries.front()
657                && vlsn >= from_vlsn
658            {
659                return self.entries.pop_front();
660            }
661            None
662        }
663    }
664
665    // -----------------------------------------------------------------------
666    // FeederRunner tests
667    // -----------------------------------------------------------------------
668
669    #[test]
670    fn test_feeder_runner_sends_entries_via_local_channel() {
671        // Build a 3-entry log.
672        let entries = vec![
673            (1u64, 10u8, vec![0xAA]),
674            (2u64, 20u8, vec![0xBB, 0xCC]),
675            (3u64, 30u8, vec![]),
676        ];
677        let pair = LocalChannelPair::new();
678        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
679        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
680
681        // Receiver side: collect frames and send back acks.
682        let recv_handle = {
683            let receiver = Arc::clone(&receiver);
684            std::thread::spawn(move || {
685                let mut received: Vec<(u64, u8, Vec<u8>)> = Vec::new();
686                let timeout = Duration::from_secs(5);
687
688                for _ in 0..3 {
689                    let frame = receiver.receive(timeout).unwrap().unwrap();
690                    // Parse frame: [vlsn:8][type:1][len:4][crc32:4][payload]
691                    let vlsn =
692                        u64::from_le_bytes(frame[0..8].try_into().unwrap());
693                    let entry_type = frame[8];
694                    let payload_len =
695                        u32::from_le_bytes(frame[9..13].try_into().unwrap())
696                            as usize;
697                    let expected_crc =
698                        u32::from_le_bytes(frame[13..17].try_into().unwrap());
699                    let payload = frame[17..17 + payload_len].to_vec();
700                    let actual_crc = crc32fast::hash(&payload);
701                    assert_eq!(
702                        actual_crc, expected_crc,
703                        "CRC mismatch for vlsn={vlsn}"
704                    );
705                    received.push((vlsn, entry_type, payload));
706
707                    // Send ack.
708                    let mut ack = Vec::with_capacity(8);
709                    ack.extend_from_slice(&vlsn.to_le_bytes());
710                    receiver.send(&ack).unwrap();
711                }
712
713                received
714            })
715        };
716
717        let mut scanner = VecLogScanner::new(entries);
718        // FeederRunner polls for acks until the channel is closed.
719        // Close the sender side after the scanner drains so run() returns.
720        let runner = FeederRunner::new(Arc::clone(&sender), 1);
721
722        // Run in a separate thread so we can close the channel.
723        let runner_arc = Arc::new(runner);
724        let runner_ref = Arc::clone(&runner_arc);
725        let sender_ref = Arc::clone(&sender);
726        let run_handle =
727            std::thread::spawn(move || runner_ref.run(&mut scanner));
728
729        // Wait for receiver to collect all 3 entries.
730        let received = recv_handle.join().unwrap();
731        assert_eq!(received.len(), 3);
732        assert_eq!(received[0], (1, 10, vec![0xAA]));
733        assert_eq!(received[1], (2, 20, vec![0xBB, 0xCC]));
734        assert_eq!(received[2], (3, 30, vec![]));
735
736        // Verify ack was tracked.
737        //
738        // The receiver thread sends one ack per frame and exits as soon as
739        // the third ack is queued — but `FeederRunner::run()` is on a
740        // separate thread that polls the channel with a 1 ms timeout and
741        // then sleeps 5 ms between polls. By the time `recv_handle.join()`
742        // returns, the runner may not yet have drained all three acks
743        // from the queue. Poll briefly (≤ 100 ms) for the runner to catch
744        // up. 100 ms is ~6× the worst-case three-ack drain time on a
745        // healthy machine, so a real perf regression on the runner's
746        // ack-handling path will still surface as a test failure.
747        let deadline = Instant::now() + Duration::from_millis(100);
748        while runner_arc.known_replica_vlsn() < 3 && Instant::now() < deadline {
749            std::thread::sleep(Duration::from_millis(2));
750        }
751        assert!(
752            runner_arc.known_replica_vlsn() == 3,
753            "FeederRunner did not drain all 3 acks within 100 ms; \
754             known_replica_vlsn() == {}, expected 3 (the receiver thread \
755             sent 3 acks before exiting; the runner reads them one at a \
756             time with a 1 ms timeout + 5 ms sleep cycle)",
757            runner_arc.known_replica_vlsn()
758        );
759
760        // Close the channel to terminate the run loop.
761        sender_ref.close().unwrap();
762        run_handle.join().unwrap().unwrap();
763    }
764
765    #[test]
766    fn test_feeder_runner_empty_scanner_returns_on_close() {
767        let pair = LocalChannelPair::new();
768        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
769        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
770
771        let runner = FeederRunner::new(Arc::clone(&sender), 1);
772        let sender_clone = Arc::clone(&sender);
773
774        // Close the channel almost immediately.
775        let close_handle = std::thread::spawn(move || {
776            std::thread::sleep(Duration::from_millis(50));
777            receiver.close().unwrap();
778            sender_clone.close().unwrap();
779        });
780
781        let mut scanner = VecLogScanner::new(vec![]);
782        let result = runner.run(&mut scanner);
783        assert!(
784            result.is_ok(),
785            "expected Ok on channel close, got {:?}",
786            result
787        );
788        close_handle.join().unwrap();
789    }
790
791    // -----------------------------------------------------------------------
792    // Original Feeder state struct tests
793    // -----------------------------------------------------------------------
794
795    #[test]
796    fn test_new_feeder() {
797        let feeder = Feeder::new("replica1".to_string());
798        assert_eq!(feeder.get_replica_name(), "replica1");
799        assert_eq!(feeder.get_state(), FeederState::Idle);
800        assert_eq!(feeder.get_current_vlsn(), 0);
801        assert_eq!(feeder.get_acked_vlsn(), 0);
802        assert_eq!(feeder.get_lag(), 0);
803    }
804
805    #[test]
806    fn test_state_transitions() {
807        let feeder = Feeder::new("r1".to_string());
808        assert_eq!(feeder.get_state(), FeederState::Idle);
809
810        feeder.set_state(FeederState::Handshaking);
811        assert_eq!(feeder.get_state(), FeederState::Handshaking);
812
813        feeder.set_state(FeederState::Streaming);
814        assert_eq!(feeder.get_state(), FeederState::Streaming);
815
816        feeder.set_state(FeederState::Shutdown);
817        assert_eq!(feeder.get_state(), FeederState::Shutdown);
818    }
819
820    #[test]
821    fn test_queue_and_drain() {
822        let feeder = Feeder::new("r1".to_string());
823        feeder.queue_entry(1, 10, vec![0xAA, 0xBB]);
824        feeder.queue_entry(2, 20, vec![0xCC]);
825        feeder.queue_entry(3, 30, vec![]);
826
827        let messages = feeder.drain_queue();
828        assert_eq!(messages.len(), 3);
829
830        // Verify message format: 8 bytes VLSN + 1 byte type + data.
831        assert_eq!(messages[0].len(), 8 + 1 + 2);
832        assert_eq!(messages[1].len(), 8 + 1 + 1);
833        assert_eq!(messages[2].len(), (8 + 1));
834
835        // Verify VLSN encoding.
836        let vlsn_bytes: [u8; 8] = messages[0][0..8].try_into().unwrap();
837        assert_eq!(u64::from_le_bytes(vlsn_bytes), 1);
838        assert_eq!(messages[0][8], 10); // entry_type
839
840        // Queue should be empty now.
841        let messages2 = feeder.drain_queue();
842        assert!(messages2.is_empty());
843    }
844
845    #[test]
846    fn test_current_vlsn_advances() {
847        let feeder = Feeder::new("r1".to_string());
848        feeder.queue_entry(5, 1, vec![]);
849        assert_eq!(feeder.get_current_vlsn(), 6);
850
851        feeder.queue_entry(10, 1, vec![]);
852        assert_eq!(feeder.get_current_vlsn(), 11);
853
854        // Queueing a lower VLSN should not decrease current_vlsn.
855        feeder.queue_entry(3, 1, vec![]);
856        assert_eq!(feeder.get_current_vlsn(), 11);
857    }
858
859    #[test]
860    fn test_ack_recording() {
861        let feeder = Feeder::new("r1".to_string());
862        feeder.queue_entry(1, 1, vec![]);
863        feeder.queue_entry(2, 1, vec![]);
864        feeder.queue_entry(3, 1, vec![]);
865
866        feeder.record_ack(1);
867        assert_eq!(feeder.get_acked_vlsn(), 1);
868
869        feeder.record_ack(3);
870        assert_eq!(feeder.get_acked_vlsn(), 3);
871
872        // Ack should not go backwards.
873        feeder.record_ack(2);
874        assert_eq!(feeder.get_acked_vlsn(), 3);
875    }
876
877    #[test]
878    fn test_lag_calculation() {
879        let feeder = Feeder::new("r1".to_string());
880        assert_eq!(feeder.get_lag(), 0);
881
882        feeder.queue_entry(1, 1, vec![]);
883        feeder.queue_entry(2, 1, vec![]);
884        feeder.queue_entry(3, 1, vec![]);
885        // current_vlsn = 4, acked_vlsn = 0 -> lag = 4.
886        assert_eq!(feeder.get_lag(), 4);
887
888        feeder.record_ack(2);
889        // current_vlsn = 4, acked_vlsn = 2 -> lag = 2.
890        assert_eq!(feeder.get_lag(), 2);
891
892        feeder.record_ack(4);
893        // Acked caught up to current.
894        assert_eq!(feeder.get_lag(), 0);
895    }
896
897    #[test]
898    fn test_timeout() {
899        let feeder = Feeder::new("r1".to_string());
900        // Just created, should not be timed out with a reasonable timeout.
901        assert!(!feeder.is_timed_out(Duration::from_secs(60)));
902
903        // With a zero timeout, should be timed out immediately (or nearly).
904        // We can't guarantee sub-nanosecond timing, but a zero-duration
905        // timeout is a reasonable edge case.
906        assert!(feeder.is_timed_out(Duration::from_nanos(0)));
907    }
908
909    #[test]
910    fn test_touch_resets_activity() {
911        let feeder = Feeder::new("r1".to_string());
912        // Wait a tiny bit then touch.
913        std::thread::sleep(Duration::from_millis(5));
914        feeder.touch();
915        // After touch, the feeder should not be timed out for a reasonable
916        // duration.
917        assert!(!feeder.is_timed_out(Duration::from_secs(1)));
918    }
919
920    #[test]
921    fn test_debug_format() {
922        let feeder = Feeder::new("replica_debug".to_string());
923        feeder.set_state(FeederState::Streaming);
924        let debug = format!("{:?}", feeder);
925        assert!(debug.contains("replica_debug"));
926        assert!(debug.contains("Streaming"));
927    }
928
929    // -----------------------------------------------------------------------
930    // FeederRunner edge cases (acks, runner restart)
931    // -----------------------------------------------------------------------
932
933    /// Build a sender + receiver channel pair and a runner that sends
934    /// no entries (used by ack-handling edge cases that don't care about
935    /// the entry stream).
936    fn make_runner_with_pair(
937        vlsn_start: u64,
938    ) -> (Arc<dyn Channel>, Arc<dyn Channel>, FeederRunner) {
939        let pair = LocalChannelPair::new();
940        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
941        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
942        let runner = FeederRunner::new(Arc::clone(&sender), vlsn_start);
943        (sender, receiver, runner)
944    }
945
946    #[test]
947    fn test_feeder_runner_short_ack_is_ignored_then_close() {
948        // Send a 4-byte "ack" (less than 8 bytes) — the runner must
949        // not crash, must not advance known_replica_vlsn, and must
950        // continue until the channel closes.
951        let (sender, receiver, runner) = make_runner_with_pair(1);
952        let receiver_clone = Arc::clone(&receiver);
953
954        let close_handle = std::thread::spawn(move || {
955            // Send a malformed (too-short) ack first.
956            std::thread::sleep(Duration::from_millis(20));
957            receiver_clone.send(&[0xAA, 0xBB, 0xCC]).unwrap();
958            // Then close.
959            std::thread::sleep(Duration::from_millis(40));
960            sender.close().unwrap();
961            receiver_clone.close().unwrap();
962        });
963
964        let mut scanner = VecLogScanner::new(vec![]);
965        let r = runner.run(&mut scanner);
966        assert!(r.is_ok(), "short-ack path should not error: {:?}", r);
967        assert_eq!(
968            runner.known_replica_vlsn(),
969            0,
970            "short ack must NOT advance known_replica_vlsn"
971        );
972        close_handle.join().unwrap();
973    }
974
975    #[test]
976    fn test_feeder_runner_ack_advances_known_replica_vlsn() {
977        let (sender, receiver, runner) = make_runner_with_pair(1);
978        let receiver_clone = Arc::clone(&receiver);
979
980        let close_handle = std::thread::spawn(move || {
981            // Send ack vlsn=42, then ack vlsn=10 (must NOT regress
982            // the watermark), then close.
983            std::thread::sleep(Duration::from_millis(20));
984            receiver_clone.send(&42u64.to_le_bytes()).unwrap();
985            std::thread::sleep(Duration::from_millis(20));
986            receiver_clone.send(&10u64.to_le_bytes()).unwrap();
987            std::thread::sleep(Duration::from_millis(40));
988            sender.close().unwrap();
989            receiver_clone.close().unwrap();
990        });
991
992        let mut scanner = VecLogScanner::new(vec![]);
993        let r = runner.run(&mut scanner);
994        assert!(r.is_ok(), "ack path should not error: {:?}", r);
995        assert_eq!(
996            runner.known_replica_vlsn(),
997            42,
998            "ack must advance to highest, never regress"
999        );
1000        close_handle.join().unwrap();
1001    }
1002
1003    #[test]
1004    fn test_feeder_runner_restart_resumes_from_provided_vlsn() {
1005        // First run: send entries 1..=3. Stop. New runner starts at
1006        // vlsn=4. Verify it sends 4..=5 and stops cleanly.
1007        let entries: [(u64, u8, Vec<u8>); 5] = [
1008            (1u64, 0u8, b"e1".to_vec()),
1009            (2, 0, b"e2".to_vec()),
1010            (3, 0, b"e3".to_vec()),
1011            (4, 0, b"e4".to_vec()),
1012            (5, 0, b"e5".to_vec()),
1013        ];
1014
1015        // First runner: vlsn_start = 1
1016        let (sender_a, receiver_a, runner_a) = make_runner_with_pair(1);
1017        let close_a = {
1018            let s = Arc::clone(&sender_a);
1019            let r = Arc::clone(&receiver_a);
1020            std::thread::spawn(move || {
1021                std::thread::sleep(Duration::from_millis(60));
1022                s.close().unwrap();
1023                r.close().unwrap();
1024            })
1025        };
1026        let received_a = {
1027            let r = Arc::clone(&receiver_a);
1028            std::thread::spawn(move || {
1029                let mut got = Vec::new();
1030                while let Ok(Some(frame)) =
1031                    r.receive(Duration::from_millis(100))
1032                {
1033                    got.push(frame);
1034                }
1035                got
1036            })
1037        };
1038        let mut scanner_a = VecLogScanner::new(entries[0..3].to_vec());
1039        runner_a.run(&mut scanner_a).unwrap();
1040        close_a.join().unwrap();
1041        let frames_a = received_a.join().unwrap();
1042        assert_eq!(
1043            frames_a.len(),
1044            3,
1045            "first runner must send 3 entries, got {}",
1046            frames_a.len()
1047        );
1048
1049        // Second runner: vlsn_start = 4 — resumes where the first
1050        // runner left off (with a fresh channel).
1051        let (sender_b, receiver_b, runner_b) = make_runner_with_pair(4);
1052        let close_b = {
1053            let s = Arc::clone(&sender_b);
1054            let r = Arc::clone(&receiver_b);
1055            std::thread::spawn(move || {
1056                std::thread::sleep(Duration::from_millis(60));
1057                s.close().unwrap();
1058                r.close().unwrap();
1059            })
1060        };
1061        let received_b = {
1062            let r = Arc::clone(&receiver_b);
1063            std::thread::spawn(move || {
1064                let mut got = Vec::new();
1065                while let Ok(Some(frame)) =
1066                    r.receive(Duration::from_millis(100))
1067                {
1068                    got.push(frame);
1069                }
1070                got
1071            })
1072        };
1073        let mut scanner_b = VecLogScanner::new(entries[3..].to_vec());
1074        runner_b.run(&mut scanner_b).unwrap();
1075        close_b.join().unwrap();
1076        let frames_b = received_b.join().unwrap();
1077        assert_eq!(
1078            frames_b.len(),
1079            2,
1080            "second runner must send 2 entries (4 and 5), got {}",
1081            frames_b.len()
1082        );
1083    }
1084
1085    #[test]
1086    fn test_feeder_runner_known_replica_vlsn_initial_zero() {
1087        let (_sender, _receiver, runner) = make_runner_with_pair(1);
1088        assert_eq!(runner.known_replica_vlsn(), 0);
1089    }
1090
1091    // -----------------------------------------------------------------------
1092    // EnvironmentLogScanner — exercised against a real EnvironmentImpl
1093    // -----------------------------------------------------------------------
1094
1095    #[test]
1096    fn test_environment_log_scanner_new_with_empty_env() {
1097        // Open a fresh environment and construct a scanner. Because
1098        // no log entries have been written, next_entry should return
1099        // None on the first call.
1100        let dir = tempfile::tempdir().expect("tempdir");
1101        let env = EnvironmentImpl::new(dir.path(), false, true)
1102            .expect("EnvironmentImpl::new");
1103
1104        let scanner = EnvironmentLogScanner::new(&env, None);
1105        assert!(scanner.is_some(), "scanner construction should succeed");
1106        let mut scanner = scanner.unwrap();
1107
1108        // Empty log → no entries.
1109        let r = scanner.next_entry(0);
1110        assert!(
1111            r.is_none(),
1112            "next_entry on empty log must return None, got {:?}",
1113            r
1114        );
1115    }
1116
1117    #[test]
1118    fn test_environment_log_scanner_with_explicit_null_lsn() {
1119        let dir = tempfile::tempdir().expect("tempdir");
1120        let env = EnvironmentImpl::new(dir.path(), false, true)
1121            .expect("EnvironmentImpl::new");
1122
1123        // NULL_LSN should be treated the same as None (start from
1124        // file 0, FILE_HEADER_SIZE offset).
1125        let scanner = EnvironmentLogScanner::new(&env, Some(NULL_LSN));
1126        assert!(scanner.is_some());
1127    }
1128
1129    #[test]
1130    fn test_environment_log_scanner_with_explicit_start_lsn() {
1131        let dir = tempfile::tempdir().expect("tempdir");
1132        let env = EnvironmentImpl::new(dir.path(), false, true)
1133            .expect("EnvironmentImpl::new");
1134
1135        // Non-null start LSN — the scanner should record file=5,
1136        // offset=128 as its cursor (no actual file at that offset
1137        // exists, so subsequent reads return None — tests the
1138        // initialisation path).
1139        let lsn = Lsn::new(5, 128);
1140        let scanner = EnvironmentLogScanner::new(&env, Some(lsn));
1141        assert!(scanner.is_some());
1142        let mut scanner = scanner.unwrap();
1143        // No file exists at this LSN — next_entry returns None.
1144        assert!(scanner.next_entry(0).is_none());
1145    }
1146}