Skip to main content

noxu_rep/stream/
feeder.rs

1//! Feeder  -  master-side replication sender.
2//!
3//! Tracks the state of
4//! feeding replication data to a single replica, including the current
5//! VLSN position, acknowledged VLSN, output queue, and heartbeat tracking.
6//!
7//! The [`FeederRunner`] provides the active I/O loop that scans the log
8//! forward from a given VLSN, frames each entry, and sends it to the replica
9//! via a [`Channel`]. Acks are received on the same channel.
10//!
11//! [`EnvironmentLogScanner`] is the live implementation of [`LogScanner`]
12//! backed by the real `LogManager` + `FileManager`.
13//! Rep.impl.node.Feeder.MasterFeederSource`.
14
15use noxu_dbi::EnvironmentImpl;
16use noxu_log::MAX_ITEM_SIZE;
17use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
18use noxu_log::file_header::FILE_HEADER_SIZE;
19use noxu_log::file_manager::FileManager;
20use noxu_sync::Mutex;
21use noxu_util::lsn::{Lsn, NULL_LSN};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use crate::error::{RepError, Result};
26use crate::net::channel::Channel;
27
28// CRC32 (Ethernet/zlib polynomial via PCLMULQDQ on x86-64 — ~18 GiB/s).
29// See docs/checksum-selection.md for the full benchmark rationale.
30use crc32fast;
31
32// ---------------------------------------------------------------------------
33// Log scanner trait
34// ---------------------------------------------------------------------------
35
36/// An iterator over log entries starting from a given VLSN.
37///
38/// Corresponds to `FeederSource` / `MasterFeederSource`. The scanner
39/// returns `(vlsn, entry_type, payload)` tuples in VLSN order. Returning
40/// `None` signals that there are no more entries *yet*; the caller will call
41/// `next_entry` again after a short wait.
42pub trait LogScanner: Send {
43    /// Return the next available entry with VLSN >= `from_vlsn`, or `None` if
44    /// no new entry is available at this moment.
45    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)>;
46}
47
48// ---------------------------------------------------------------------------
49// EnvironmentLogScanner
50// ---------------------------------------------------------------------------
51
52/// `LogScanner` implementation backed by the live `EnvironmentImpl`.
53///
54/// Scans the log forward from an LSN cursor, returning entries that carry a
55/// VLSN >= `from_vlsn`. On each call to `next_entry` the scanner advances
56/// its internal file/offset position one entry at a time; when it reaches
57/// the current end of the log it returns `None` (the `FeederRunner` will
58/// call again after a brief poll interval).
59///
60///
61pub struct EnvironmentLogScanner {
62    /// The log `FileManager` used for raw byte-level reads.
63    file_manager: Arc<FileManager>,
64    /// Current scan position: next file number and byte offset to read.
65    cursor_file: u32,
66    cursor_offset: u64,
67    /// Highest VLSN returned so far (to avoid duplicates on re-entrant calls).
68    last_returned_vlsn: u64,
69}
70
71impl EnvironmentLogScanner {
72    /// Create a scanner that starts at `start_lsn`.
73    ///
74    /// If `start_lsn` is `NULL_LSN` the scanner begins at the very first log
75    /// entry (file 0, offset = FILE_HEADER_SIZE).
76    ///
77    /// Obtain the `FileManager` from `EnvironmentImpl::get_log_manager()` →
78    /// `LogManager` is not directly accessible, but `EnvironmentImpl` exposes
79    /// a `get_log_manager()` returning `Option<Arc<LogManager>>`.  For the
80    /// scanner we need the `FileManager` underneath it; the simplest approach
81    /// is to construct the scanner directly from a `FileManager` Arc.
82    pub fn new(env: &EnvironmentImpl, start_lsn: Option<Lsn>) -> Option<Self> {
83        // We need the FileManager to do raw byte reads.  It is not directly
84        // exposed on EnvironmentImpl, so we access it via the LogManager.
85        // LogManager::file_manager is private, so we carry it separately.
86        // For now, use the env_home path to construct a read-only FileManager.
87        //
88        // The master feeder reads the log files
89        // directly, starting at the replica's current VLSN position.
90        let env_home = env.get_env_home().to_path_buf();
91        let fm = Arc::new(
92            FileManager::new(&env_home, true, 256 * 1024 * 1024, 32).ok()?,
93        );
94
95        let (cursor_file, cursor_offset) = match start_lsn {
96            Some(lsn) if lsn != NULL_LSN => {
97                (lsn.file_number(), lsn.file_offset() as u64)
98            }
99            _ => {
100                // Start from file 0, first entry offset.
101                (0, FILE_HEADER_SIZE as u64)
102            }
103        };
104
105        Some(Self {
106            file_manager: fm,
107            cursor_file,
108            cursor_offset,
109            last_returned_vlsn: 0,
110        })
111    }
112
113    /// Read the raw header+payload at `(file_num, offset)`.
114    ///
115    /// Returns `(entry_size_bytes, vlsn_opt, entry_type_byte, payload)` or
116    /// `None` if the bytes don't form a valid entry (zero fill, truncation).
117    fn read_raw_entry(
118        &self,
119        file_num: u32,
120        offset: u64,
121    ) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
122        let mut hdr = [0u8; MIN_HEADER_SIZE];
123        let n = self
124            .file_manager
125            .read_from_file(file_num, offset, &mut hdr)
126            .ok()?;
127        if n < MIN_HEADER_SIZE {
128            return None;
129        }
130        // Zero-fill region past last written entry.
131        if hdr[4] == 0 {
132            return None;
133        }
134
135        let entry_type_byte = hdr[4];
136        let flags = hdr[5];
137        let item_size =
138            u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
139
140        let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
141        let header_size =
142            if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
143
144        // Sanity cap: same shared MAX_ITEM_SIZE used by every log reader
145        // so that an attacker who flips item_size cannot cause a 100 MiB
146        // allocation here while passing other readers' bounds.
147        if item_size > MAX_ITEM_SIZE {
148            return None;
149        }
150
151        let entry_size = header_size + item_size;
152        let mut full = vec![0u8; entry_size];
153        let n = self
154            .file_manager
155            .read_from_file(file_num, offset, &mut full)
156            .ok()?;
157        if n < entry_size {
158            return None;
159        }
160
161        // Extract VLSN from the header extension if present (8-byte LE i64).
162        let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
163            let raw = i64::from_le_bytes(
164                full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
165            );
166            if raw > 0 {
167                Some(raw as u64)
168            } else {
169                // Negative or zero i64 with vlsn_present flag set is a
170                // contradiction (NULL VLSN should not have the flag set).
171                // Surface it but keep the legacy "treat as missing" behaviour
172                // so a single corrupt entry does not stall the feeder.
173                // LOG-9.
174                log::warn!(
175                    "EnvironmentLogScanner: implausible VLSN value {} at \
176                     file {:08x} offset {:#x}; treating as no-VLSN",
177                    raw,
178                    file_num,
179                    offset,
180                );
181                None
182            }
183        } else {
184            None
185        };
186
187        let payload = full[header_size..].to_vec();
188        Some((entry_size, vlsn_opt, entry_type_byte, payload))
189    }
190}
191
192impl LogScanner for EnvironmentLogScanner {
193    /// Return the next entry with VLSN >= `from_vlsn`, advancing the cursor.
194    ///
195    /// Scans forward one entry at a time.  Returns `None` when the cursor
196    /// reaches the end of the currently-written log.  The feeder will sleep
197    /// briefly and call again.
198    fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
199        // Collect file numbers once per call; cheap (directory listing).
200        let file_nums = self.file_manager.list_file_numbers().ok()?;
201        if file_nums.is_empty() {
202            return None;
203        }
204
205        loop {
206            // Skip files before the current cursor file.
207            if !file_nums.contains(&self.cursor_file) {
208                // Advance to the next known file.
209                let next =
210                    file_nums.iter().find(|&&n| n > self.cursor_file).copied();
211                match next {
212                    Some(n) => {
213                        self.cursor_file = n;
214                        self.cursor_offset = FILE_HEADER_SIZE as u64;
215                    }
216                    None => return None, // No more files.
217                }
218            }
219
220            let file_len =
221                self.file_manager.get_file_length(self.cursor_file).ok()?;
222
223            if self.cursor_offset >= file_len {
224                // End of current file: move to next file.
225                let next =
226                    file_nums.iter().find(|&&n| n > self.cursor_file).copied();
227                match next {
228                    Some(n) => {
229                        self.cursor_file = n;
230                        self.cursor_offset = FILE_HEADER_SIZE as u64;
231                        continue;
232                    }
233                    None => return None, // End of log.
234                }
235            }
236
237            match self.read_raw_entry(self.cursor_file, self.cursor_offset) {
238                None => {
239                    // End of written data in this file; move to next.
240                    let next = file_nums
241                        .iter()
242                        .find(|&&n| n > self.cursor_file)
243                        .copied();
244                    match next {
245                        Some(n) => {
246                            self.cursor_file = n;
247                            self.cursor_offset = FILE_HEADER_SIZE as u64;
248                            continue;
249                        }
250                        None => return None,
251                    }
252                }
253                Some((entry_size, vlsn_opt, entry_type_byte, payload)) => {
254                    self.cursor_offset += entry_size as u64;
255
256                    if let Some(vlsn) = vlsn_opt
257                        && vlsn >= from_vlsn
258                        && vlsn > self.last_returned_vlsn
259                    {
260                        self.last_returned_vlsn = vlsn;
261                        return Some((vlsn, entry_type_byte, payload));
262                    }
263                    // Entry has no VLSN, vlsn < from_vlsn, or already
264                    // returned: keep scanning.
265                }
266            }
267        }
268    }
269}
270
271// ---------------------------------------------------------------------------
272// FeederRunner
273// ---------------------------------------------------------------------------
274
275/// Wire frame format (all integers little-endian):
276///
277/// ```text
278/// ┌──────────────────────────────────────────────────────────┐
279/// │  vlsn        : u64  (8 bytes)                           │
280/// │  entry_type  : u8   (1 byte)                            │
281/// │  payload_len : u32  (4 bytes)                           │
282/// │  crc32       : u32  (4 bytes) — CRC32 of payload bytes  │
283/// ├──────────────────────────────────────────────────────────┤
284/// │  payload     : [u8; payload_len]                        │
285/// └──────────────────────────────────────────────────────────┘
286/// ```
287///
288/// The receiver verifies `crc32fast::hash(payload) == crc32` before
289/// applying the entry.  A mismatch is returned as [`RepError::FrameCorrupted`].
290const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; // vlsn + type + len + crc32
291
292/// Active feeder I/O loop.
293///
294/// `FeederRunner` owns a channel to a specific replica and a starting VLSN.
295/// `run()` is a blocking loop that:
296///   1. Scans the log for entries at `vlsn_start` and beyond.
297///   2. Frames each entry and sends it to the replica.
298///   3. Reads ack messages back from the replica and advances `acked_vlsn`.
299///   4. Returns when the channel is closed or an I/O error occurs.
300///
301/// The runner is single-threaded: log scanning, framing, sending, and ack
302/// polling all interleave inside the one `run()` loop on the caller's
303/// thread. There is no separate output/input thread pair; the same loop
304/// handles both directions.
305pub struct FeederRunner {
306    /// Channel to the replica.
307    channel: Arc<dyn Channel>,
308    /// First VLSN to send.
309    vlsn_start: u64,
310    /// Most recent VLSN acknowledged by the replica (tracked externally via
311    /// the owning [`Feeder`] state struct, but also tracked here for quick
312    /// access).
313    known_replica_vlsn: Mutex<u64>,
314}
315
316impl FeederRunner {
317    /// Create a new `FeederRunner`.
318    ///
319    /// # Arguments
320    /// * `channel` - The channel to the replica.
321    /// * `vlsn_start` - The VLSN from which to begin streaming.
322    pub fn new(channel: Arc<dyn Channel>, vlsn_start: u64) -> Self {
323        Self { channel, vlsn_start, known_replica_vlsn: Mutex::new(0) }
324    }
325
326    /// Return the last VLSN acknowledged by the replica.
327    pub fn known_replica_vlsn(&self) -> u64 {
328        *self.known_replica_vlsn.lock()
329    }
330
331    /// Run the feeder loop.
332    ///
333    /// Streams all log entries from `vlsn_start` to the replica, then polls
334    /// for new entries. Acks from the replica update `known_replica_vlsn`.
335    ///
336    /// Returns `Ok(())` when the channel is closed gracefully, or `Err` on
337    /// an I/O error.
338    pub fn run(&self, log_scanner: &mut dyn LogScanner) -> Result<()> {
339        let mut next_vlsn = self.vlsn_start;
340        let poll_interval = Duration::from_millis(5);
341        let ack_timeout = Duration::from_millis(1);
342
343        loop {
344            // ----------------------------------------------------------------
345            // 1. Send all available log entries.
346            // ----------------------------------------------------------------
347            while let Some((vlsn, entry_type, payload)) =
348                log_scanner.next_entry(next_vlsn)
349            {
350                self.send_entry(vlsn, entry_type, &payload)?;
351                next_vlsn = vlsn + 1;
352            }
353
354            // ----------------------------------------------------------------
355            // 2. Poll for acks from the replica (non-blocking style).
356            // ----------------------------------------------------------------
357            match self.channel.receive(ack_timeout) {
358                Ok(Some(ack_bytes)) => {
359                    if ack_bytes.len() >= 8 {
360                        let vlsn = u64::from_le_bytes(
361                            ack_bytes[..8].try_into().unwrap(),
362                        );
363                        let mut guard = self.known_replica_vlsn.lock();
364                        if vlsn > *guard {
365                            *guard = vlsn;
366                        }
367                    }
368                    // Continue without sleeping — more acks may be waiting.
369                    continue;
370                }
371                Ok(None) => {
372                    // Timeout: no ack yet, check for more log entries.
373                    std::thread::sleep(poll_interval);
374                    continue;
375                }
376                Err(RepError::ChannelClosed(_)) => {
377                    // Replica disconnected; clean shutdown.
378                    return Ok(());
379                }
380                Err(e) => return Err(e),
381            }
382        }
383    }
384
385    /// Frame and send a single log entry.
386    ///
387    /// Frame format (all LE):
388    ///   `[vlsn:8][type:1][payload_len:4][crc32:4][payload]`
389    fn send_entry(
390        &self,
391        vlsn: u64,
392        entry_type: u8,
393        payload: &[u8],
394    ) -> Result<()> {
395        let crc = crc32fast::hash(payload);
396        let mut frame = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
397        frame.extend_from_slice(&vlsn.to_le_bytes());
398        frame.push(entry_type);
399        frame.extend_from_slice(&(payload.len() as u32).to_le_bytes());
400        frame.extend_from_slice(&crc.to_le_bytes());
401        frame.extend_from_slice(payload);
402        self.channel.send(&frame)
403    }
404}
405
406/// The state of a feeder connection to a replica.
407///
408/// Feeder state machine.
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410pub enum FeederState {
411    /// Not connected to any replica.
412    Idle,
413    /// Performing the initial handshake (protocol negotiation).
414    Handshaking,
415    /// Actively streaming replication data.
416    Streaming,
417    /// Shutting down.
418    Shutdown,
419}
420
421/// Tracks the state of feeding replication data to a single replica.
422///
423/// Each `Feeder` instance corresponds to one replica connection. The feeder
424/// maintains a queue of outbound messages and tracks which VLSNs have been
425/// sent vs. acknowledged.
426///
427///
428pub struct Feeder {
429    /// Name of the replica this feeder is serving.
430    replica_name: String,
431    /// Current connection state.
432    state: Mutex<FeederState>,
433    /// Next VLSN to send to the replica.
434    current_vlsn: Mutex<u64>,
435    /// Last VLSN acknowledged by the replica.
436    acked_vlsn: Mutex<u64>,
437    /// Timestamp of last activity (send or receive).
438    last_activity: Mutex<Instant>,
439    /// Pending messages queued for sending to the replica.
440    /// Each message is a serialized byte vector.
441    output_queue: Mutex<Vec<Vec<u8>>>,
442}
443
444impl Feeder {
445    /// Create a new feeder for the named replica.
446    pub fn new(replica_name: String) -> Self {
447        Feeder {
448            replica_name,
449            state: Mutex::new(FeederState::Idle),
450            current_vlsn: Mutex::new(0),
451            acked_vlsn: Mutex::new(0),
452            last_activity: Mutex::new(Instant::now()),
453            output_queue: Mutex::new(Vec::new()),
454        }
455    }
456
457    /// Return the name of the replica this feeder is serving.
458    pub fn get_replica_name(&self) -> String {
459        self.replica_name.clone()
460    }
461
462    /// Return the current feeder state.
463    pub fn get_state(&self) -> FeederState {
464        *self.state.lock()
465    }
466
467    /// Set the feeder state.
468    pub fn set_state(&self, state: FeederState) {
469        *self.state.lock() = state;
470    }
471
472    /// Return the next VLSN that will be sent.
473    pub fn get_current_vlsn(&self) -> u64 {
474        *self.current_vlsn.lock()
475    }
476
477    /// Return the last VLSN acknowledged by the replica.
478    pub fn get_acked_vlsn(&self) -> u64 {
479        *self.acked_vlsn.lock()
480    }
481
482    /// Queue a log entry for sending to the replica.
483    ///
484    /// The entry is serialized into a byte vector containing the VLSN,
485    /// entry type, and raw data. The current VLSN is advanced to one past
486    /// the queued VLSN **only when `vlsn >= current_vlsn`**; if `vlsn`
487    /// is older than the current position the entry is still queued but
488    /// `current_vlsn` is left unchanged.
489    pub fn queue_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
490        let mut msg = Vec::with_capacity(9 + data.len());
491        msg.extend_from_slice(&vlsn.to_le_bytes());
492        msg.push(entry_type);
493        msg.extend_from_slice(&data);
494
495        self.output_queue.lock().push(msg);
496
497        let mut current = self.current_vlsn.lock();
498        if vlsn >= *current {
499            *current = vlsn + 1;
500        }
501
502        *self.last_activity.lock() = Instant::now();
503    }
504
505    /// Record an acknowledgement from the replica.
506    ///
507    /// The acked VLSN is updated if the new value is greater than the
508    /// current acked VLSN (acks should arrive in order, but we are
509    /// defensive).
510    pub fn record_ack(&self, vlsn: u64) {
511        let mut acked = self.acked_vlsn.lock();
512        if vlsn > *acked {
513            *acked = vlsn;
514        }
515        *self.last_activity.lock() = Instant::now();
516    }
517
518    /// Return the replication lag: the difference between the current
519    /// VLSN (next to send) and the last acknowledged VLSN.
520    ///
521    /// A lag of 0 means the replica is fully caught up.
522    pub fn get_lag(&self) -> u64 {
523        let current = *self.current_vlsn.lock();
524        let acked = *self.acked_vlsn.lock();
525        current.saturating_sub(acked)
526    }
527
528    /// Take all queued messages (drain the output queue).
529    ///
530    /// Returns the messages in the order they were queued and leaves
531    /// the queue empty.
532    pub fn drain_queue(&self) -> Vec<Vec<u8>> {
533        let mut queue = self.output_queue.lock();
534        std::mem::take(&mut *queue)
535    }
536
537    /// Check if the feeder has timed out (no activity within the given
538    /// duration).
539    pub fn is_timed_out(&self, timeout: Duration) -> bool {
540        self.last_activity.lock().elapsed() > timeout
541    }
542
543    /// Update the activity timestamp to the current time.
544    pub fn touch(&self) {
545        *self.last_activity.lock() = Instant::now();
546    }
547}
548
549impl std::fmt::Debug for Feeder {
550    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551        f.debug_struct("Feeder")
552            .field("replica_name", &self.replica_name)
553            .field("state", &self.get_state())
554            .field("current_vlsn", &self.get_current_vlsn())
555            .field("acked_vlsn", &self.get_acked_vlsn())
556            .field("lag", &self.get_lag())
557            .finish()
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564    use crate::net::channel::LocalChannelPair;
565    use std::collections::VecDeque;
566
567    // -----------------------------------------------------------------------
568    // Helpers
569    // -----------------------------------------------------------------------
570
571    /// A simple in-memory log scanner backed by a queue.
572    struct VecLogScanner {
573        entries: VecDeque<(u64, u8, Vec<u8>)>,
574    }
575
576    impl VecLogScanner {
577        fn new(entries: Vec<(u64, u8, Vec<u8>)>) -> Self {
578            Self { entries: entries.into_iter().collect() }
579        }
580    }
581
582    impl LogScanner for VecLogScanner {
583        fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
584            if let Some(&(vlsn, _, _)) = self.entries.front()
585                && vlsn >= from_vlsn
586            {
587                return self.entries.pop_front();
588            }
589            None
590        }
591    }
592
593    // -----------------------------------------------------------------------
594    // FeederRunner tests
595    // -----------------------------------------------------------------------
596
597    #[test]
598    fn test_feeder_runner_sends_entries_via_local_channel() {
599        // Build a 3-entry log.
600        let entries = vec![
601            (1u64, 10u8, vec![0xAA]),
602            (2u64, 20u8, vec![0xBB, 0xCC]),
603            (3u64, 30u8, vec![]),
604        ];
605        let pair = LocalChannelPair::new();
606        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
607        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
608
609        // Receiver side: collect frames and send back acks.
610        let recv_handle = {
611            let receiver = Arc::clone(&receiver);
612            std::thread::spawn(move || {
613                let mut received: Vec<(u64, u8, Vec<u8>)> = Vec::new();
614                let timeout = Duration::from_secs(5);
615
616                for _ in 0..3 {
617                    let frame = receiver.receive(timeout).unwrap().unwrap();
618                    // Parse frame: [vlsn:8][type:1][len:4][crc32:4][payload]
619                    let vlsn =
620                        u64::from_le_bytes(frame[0..8].try_into().unwrap());
621                    let entry_type = frame[8];
622                    let payload_len =
623                        u32::from_le_bytes(frame[9..13].try_into().unwrap())
624                            as usize;
625                    let expected_crc =
626                        u32::from_le_bytes(frame[13..17].try_into().unwrap());
627                    let payload = frame[17..17 + payload_len].to_vec();
628                    let actual_crc = crc32fast::hash(&payload);
629                    assert_eq!(
630                        actual_crc, expected_crc,
631                        "CRC mismatch for vlsn={vlsn}"
632                    );
633                    received.push((vlsn, entry_type, payload));
634
635                    // Send ack.
636                    let mut ack = Vec::with_capacity(8);
637                    ack.extend_from_slice(&vlsn.to_le_bytes());
638                    receiver.send(&ack).unwrap();
639                }
640
641                received
642            })
643        };
644
645        let mut scanner = VecLogScanner::new(entries);
646        // FeederRunner polls for acks until the channel is closed.
647        // Close the sender side after the scanner drains so run() returns.
648        let runner = FeederRunner::new(Arc::clone(&sender), 1);
649
650        // Run in a separate thread so we can close the channel.
651        let runner_arc = Arc::new(runner);
652        let runner_ref = Arc::clone(&runner_arc);
653        let sender_ref = Arc::clone(&sender);
654        let run_handle =
655            std::thread::spawn(move || runner_ref.run(&mut scanner));
656
657        // Wait for receiver to collect all 3 entries.
658        let received = recv_handle.join().unwrap();
659        assert_eq!(received.len(), 3);
660        assert_eq!(received[0], (1, 10, vec![0xAA]));
661        assert_eq!(received[1], (2, 20, vec![0xBB, 0xCC]));
662        assert_eq!(received[2], (3, 30, vec![]));
663
664        // Verify ack was tracked.
665        //
666        // The receiver thread sends one ack per frame and exits as soon as
667        // the third ack is queued — but `FeederRunner::run()` is on a
668        // separate thread that polls the channel with a 1 ms timeout and
669        // then sleeps 5 ms between polls. By the time `recv_handle.join()`
670        // returns, the runner may not yet have drained all three acks
671        // from the queue. Poll briefly (≤ 100 ms) for the runner to catch
672        // up. 100 ms is ~6× the worst-case three-ack drain time on a
673        // healthy machine, so a real perf regression on the runner's
674        // ack-handling path will still surface as a test failure.
675        let deadline = Instant::now() + Duration::from_millis(100);
676        while runner_arc.known_replica_vlsn() < 3 && Instant::now() < deadline {
677            std::thread::sleep(Duration::from_millis(2));
678        }
679        assert!(
680            runner_arc.known_replica_vlsn() == 3,
681            "FeederRunner did not drain all 3 acks within 100 ms; \
682             known_replica_vlsn() == {}, expected 3 (the receiver thread \
683             sent 3 acks before exiting; the runner reads them one at a \
684             time with a 1 ms timeout + 5 ms sleep cycle)",
685            runner_arc.known_replica_vlsn()
686        );
687
688        // Close the channel to terminate the run loop.
689        sender_ref.close().unwrap();
690        run_handle.join().unwrap().unwrap();
691    }
692
693    #[test]
694    fn test_feeder_runner_empty_scanner_returns_on_close() {
695        let pair = LocalChannelPair::new();
696        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
697        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
698
699        let runner = FeederRunner::new(Arc::clone(&sender), 1);
700        let sender_clone = Arc::clone(&sender);
701
702        // Close the channel almost immediately.
703        let close_handle = std::thread::spawn(move || {
704            std::thread::sleep(Duration::from_millis(50));
705            receiver.close().unwrap();
706            sender_clone.close().unwrap();
707        });
708
709        let mut scanner = VecLogScanner::new(vec![]);
710        let result = runner.run(&mut scanner);
711        assert!(
712            result.is_ok(),
713            "expected Ok on channel close, got {:?}",
714            result
715        );
716        close_handle.join().unwrap();
717    }
718
719    // -----------------------------------------------------------------------
720    // Original Feeder state struct tests
721    // -----------------------------------------------------------------------
722
723    #[test]
724    fn test_new_feeder() {
725        let feeder = Feeder::new("replica1".to_string());
726        assert_eq!(feeder.get_replica_name(), "replica1");
727        assert_eq!(feeder.get_state(), FeederState::Idle);
728        assert_eq!(feeder.get_current_vlsn(), 0);
729        assert_eq!(feeder.get_acked_vlsn(), 0);
730        assert_eq!(feeder.get_lag(), 0);
731    }
732
733    #[test]
734    fn test_state_transitions() {
735        let feeder = Feeder::new("r1".to_string());
736        assert_eq!(feeder.get_state(), FeederState::Idle);
737
738        feeder.set_state(FeederState::Handshaking);
739        assert_eq!(feeder.get_state(), FeederState::Handshaking);
740
741        feeder.set_state(FeederState::Streaming);
742        assert_eq!(feeder.get_state(), FeederState::Streaming);
743
744        feeder.set_state(FeederState::Shutdown);
745        assert_eq!(feeder.get_state(), FeederState::Shutdown);
746    }
747
748    #[test]
749    fn test_queue_and_drain() {
750        let feeder = Feeder::new("r1".to_string());
751        feeder.queue_entry(1, 10, vec![0xAA, 0xBB]);
752        feeder.queue_entry(2, 20, vec![0xCC]);
753        feeder.queue_entry(3, 30, vec![]);
754
755        let messages = feeder.drain_queue();
756        assert_eq!(messages.len(), 3);
757
758        // Verify message format: 8 bytes VLSN + 1 byte type + data.
759        assert_eq!(messages[0].len(), 8 + 1 + 2);
760        assert_eq!(messages[1].len(), 8 + 1 + 1);
761        assert_eq!(messages[2].len(), (8 + 1));
762
763        // Verify VLSN encoding.
764        let vlsn_bytes: [u8; 8] = messages[0][0..8].try_into().unwrap();
765        assert_eq!(u64::from_le_bytes(vlsn_bytes), 1);
766        assert_eq!(messages[0][8], 10); // entry_type
767
768        // Queue should be empty now.
769        let messages2 = feeder.drain_queue();
770        assert!(messages2.is_empty());
771    }
772
773    #[test]
774    fn test_current_vlsn_advances() {
775        let feeder = Feeder::new("r1".to_string());
776        feeder.queue_entry(5, 1, vec![]);
777        assert_eq!(feeder.get_current_vlsn(), 6);
778
779        feeder.queue_entry(10, 1, vec![]);
780        assert_eq!(feeder.get_current_vlsn(), 11);
781
782        // Queueing a lower VLSN should not decrease current_vlsn.
783        feeder.queue_entry(3, 1, vec![]);
784        assert_eq!(feeder.get_current_vlsn(), 11);
785    }
786
787    #[test]
788    fn test_ack_recording() {
789        let feeder = Feeder::new("r1".to_string());
790        feeder.queue_entry(1, 1, vec![]);
791        feeder.queue_entry(2, 1, vec![]);
792        feeder.queue_entry(3, 1, vec![]);
793
794        feeder.record_ack(1);
795        assert_eq!(feeder.get_acked_vlsn(), 1);
796
797        feeder.record_ack(3);
798        assert_eq!(feeder.get_acked_vlsn(), 3);
799
800        // Ack should not go backwards.
801        feeder.record_ack(2);
802        assert_eq!(feeder.get_acked_vlsn(), 3);
803    }
804
805    #[test]
806    fn test_lag_calculation() {
807        let feeder = Feeder::new("r1".to_string());
808        assert_eq!(feeder.get_lag(), 0);
809
810        feeder.queue_entry(1, 1, vec![]);
811        feeder.queue_entry(2, 1, vec![]);
812        feeder.queue_entry(3, 1, vec![]);
813        // current_vlsn = 4, acked_vlsn = 0 -> lag = 4.
814        assert_eq!(feeder.get_lag(), 4);
815
816        feeder.record_ack(2);
817        // current_vlsn = 4, acked_vlsn = 2 -> lag = 2.
818        assert_eq!(feeder.get_lag(), 2);
819
820        feeder.record_ack(4);
821        // Acked caught up to current.
822        assert_eq!(feeder.get_lag(), 0);
823    }
824
825    #[test]
826    fn test_timeout() {
827        let feeder = Feeder::new("r1".to_string());
828        // Just created, should not be timed out with a reasonable timeout.
829        assert!(!feeder.is_timed_out(Duration::from_secs(60)));
830
831        // With a zero timeout, should be timed out immediately (or nearly).
832        // We can't guarantee sub-nanosecond timing, but a zero-duration
833        // timeout is a reasonable edge case.
834        assert!(feeder.is_timed_out(Duration::from_nanos(0)));
835    }
836
837    #[test]
838    fn test_touch_resets_activity() {
839        let feeder = Feeder::new("r1".to_string());
840        // Wait a tiny bit then touch.
841        std::thread::sleep(Duration::from_millis(5));
842        feeder.touch();
843        // After touch, the feeder should not be timed out for a reasonable
844        // duration.
845        assert!(!feeder.is_timed_out(Duration::from_secs(1)));
846    }
847
848    #[test]
849    fn test_debug_format() {
850        let feeder = Feeder::new("replica_debug".to_string());
851        feeder.set_state(FeederState::Streaming);
852        let debug = format!("{:?}", feeder);
853        assert!(debug.contains("replica_debug"));
854        assert!(debug.contains("Streaming"));
855    }
856
857    // -----------------------------------------------------------------------
858    // FeederRunner edge cases (acks, runner restart)
859    // -----------------------------------------------------------------------
860
861    /// Build a sender + receiver channel pair and a runner that sends
862    /// no entries (used by ack-handling edge cases that don't care about
863    /// the entry stream).
864    fn make_runner_with_pair(
865        vlsn_start: u64,
866    ) -> (Arc<dyn Channel>, Arc<dyn Channel>, FeederRunner) {
867        let pair = LocalChannelPair::new();
868        let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
869        let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
870        let runner = FeederRunner::new(Arc::clone(&sender), vlsn_start);
871        (sender, receiver, runner)
872    }
873
874    #[test]
875    fn test_feeder_runner_short_ack_is_ignored_then_close() {
876        // Send a 4-byte "ack" (less than 8 bytes) — the runner must
877        // not crash, must not advance known_replica_vlsn, and must
878        // continue until the channel closes.
879        let (sender, receiver, runner) = make_runner_with_pair(1);
880        let receiver_clone = Arc::clone(&receiver);
881
882        let close_handle = std::thread::spawn(move || {
883            // Send a malformed (too-short) ack first.
884            std::thread::sleep(Duration::from_millis(20));
885            receiver_clone.send(&[0xAA, 0xBB, 0xCC]).unwrap();
886            // Then close.
887            std::thread::sleep(Duration::from_millis(40));
888            sender.close().unwrap();
889            receiver_clone.close().unwrap();
890        });
891
892        let mut scanner = VecLogScanner::new(vec![]);
893        let r = runner.run(&mut scanner);
894        assert!(r.is_ok(), "short-ack path should not error: {:?}", r);
895        assert_eq!(
896            runner.known_replica_vlsn(),
897            0,
898            "short ack must NOT advance known_replica_vlsn"
899        );
900        close_handle.join().unwrap();
901    }
902
903    #[test]
904    fn test_feeder_runner_ack_advances_known_replica_vlsn() {
905        let (sender, receiver, runner) = make_runner_with_pair(1);
906        let receiver_clone = Arc::clone(&receiver);
907
908        let close_handle = std::thread::spawn(move || {
909            // Send ack vlsn=42, then ack vlsn=10 (must NOT regress
910            // the watermark), then close.
911            std::thread::sleep(Duration::from_millis(20));
912            receiver_clone.send(&42u64.to_le_bytes()).unwrap();
913            std::thread::sleep(Duration::from_millis(20));
914            receiver_clone.send(&10u64.to_le_bytes()).unwrap();
915            std::thread::sleep(Duration::from_millis(40));
916            sender.close().unwrap();
917            receiver_clone.close().unwrap();
918        });
919
920        let mut scanner = VecLogScanner::new(vec![]);
921        let r = runner.run(&mut scanner);
922        assert!(r.is_ok(), "ack path should not error: {:?}", r);
923        assert_eq!(
924            runner.known_replica_vlsn(),
925            42,
926            "ack must advance to highest, never regress"
927        );
928        close_handle.join().unwrap();
929    }
930
931    #[test]
932    fn test_feeder_runner_restart_resumes_from_provided_vlsn() {
933        // First run: send entries 1..=3. Stop. New runner starts at
934        // vlsn=4. Verify it sends 4..=5 and stops cleanly.
935        let entries: [(u64, u8, Vec<u8>); 5] = [
936            (1u64, 0u8, b"e1".to_vec()),
937            (2, 0, b"e2".to_vec()),
938            (3, 0, b"e3".to_vec()),
939            (4, 0, b"e4".to_vec()),
940            (5, 0, b"e5".to_vec()),
941        ];
942
943        // First runner: vlsn_start = 1
944        let (sender_a, receiver_a, runner_a) = make_runner_with_pair(1);
945        let close_a = {
946            let s = Arc::clone(&sender_a);
947            let r = Arc::clone(&receiver_a);
948            std::thread::spawn(move || {
949                std::thread::sleep(Duration::from_millis(60));
950                s.close().unwrap();
951                r.close().unwrap();
952            })
953        };
954        let received_a = {
955            let r = Arc::clone(&receiver_a);
956            std::thread::spawn(move || {
957                let mut got = Vec::new();
958                while let Ok(Some(frame)) =
959                    r.receive(Duration::from_millis(100))
960                {
961                    got.push(frame);
962                }
963                got
964            })
965        };
966        let mut scanner_a = VecLogScanner::new(entries[0..3].to_vec());
967        runner_a.run(&mut scanner_a).unwrap();
968        close_a.join().unwrap();
969        let frames_a = received_a.join().unwrap();
970        assert_eq!(
971            frames_a.len(),
972            3,
973            "first runner must send 3 entries, got {}",
974            frames_a.len()
975        );
976
977        // Second runner: vlsn_start = 4 — resumes where the first
978        // runner left off (with a fresh channel).
979        let (sender_b, receiver_b, runner_b) = make_runner_with_pair(4);
980        let close_b = {
981            let s = Arc::clone(&sender_b);
982            let r = Arc::clone(&receiver_b);
983            std::thread::spawn(move || {
984                std::thread::sleep(Duration::from_millis(60));
985                s.close().unwrap();
986                r.close().unwrap();
987            })
988        };
989        let received_b = {
990            let r = Arc::clone(&receiver_b);
991            std::thread::spawn(move || {
992                let mut got = Vec::new();
993                while let Ok(Some(frame)) =
994                    r.receive(Duration::from_millis(100))
995                {
996                    got.push(frame);
997                }
998                got
999            })
1000        };
1001        let mut scanner_b = VecLogScanner::new(entries[3..].to_vec());
1002        runner_b.run(&mut scanner_b).unwrap();
1003        close_b.join().unwrap();
1004        let frames_b = received_b.join().unwrap();
1005        assert_eq!(
1006            frames_b.len(),
1007            2,
1008            "second runner must send 2 entries (4 and 5), got {}",
1009            frames_b.len()
1010        );
1011    }
1012
1013    #[test]
1014    fn test_feeder_runner_known_replica_vlsn_initial_zero() {
1015        let (_sender, _receiver, runner) = make_runner_with_pair(1);
1016        assert_eq!(runner.known_replica_vlsn(), 0);
1017    }
1018
1019    // -----------------------------------------------------------------------
1020    // EnvironmentLogScanner — exercised against a real EnvironmentImpl
1021    // -----------------------------------------------------------------------
1022
1023    #[test]
1024    fn test_environment_log_scanner_new_with_empty_env() {
1025        // Open a fresh environment and construct a scanner. Because
1026        // no log entries have been written, next_entry should return
1027        // None on the first call.
1028        let dir = tempfile::tempdir().expect("tempdir");
1029        let env = EnvironmentImpl::new(dir.path(), false, true)
1030            .expect("EnvironmentImpl::new");
1031
1032        let scanner = EnvironmentLogScanner::new(&env, None);
1033        assert!(scanner.is_some(), "scanner construction should succeed");
1034        let mut scanner = scanner.unwrap();
1035
1036        // Empty log → no entries.
1037        let r = scanner.next_entry(0);
1038        assert!(
1039            r.is_none(),
1040            "next_entry on empty log must return None, got {:?}",
1041            r
1042        );
1043    }
1044
1045    #[test]
1046    fn test_environment_log_scanner_with_explicit_null_lsn() {
1047        let dir = tempfile::tempdir().expect("tempdir");
1048        let env = EnvironmentImpl::new(dir.path(), false, true)
1049            .expect("EnvironmentImpl::new");
1050
1051        // NULL_LSN should be treated the same as None (start from
1052        // file 0, FILE_HEADER_SIZE offset).
1053        let scanner = EnvironmentLogScanner::new(&env, Some(NULL_LSN));
1054        assert!(scanner.is_some());
1055    }
1056
1057    #[test]
1058    fn test_environment_log_scanner_with_explicit_start_lsn() {
1059        let dir = tempfile::tempdir().expect("tempdir");
1060        let env = EnvironmentImpl::new(dir.path(), false, true)
1061            .expect("EnvironmentImpl::new");
1062
1063        // Non-null start LSN — the scanner should record file=5,
1064        // offset=128 as its cursor (no actual file at that offset
1065        // exists, so subsequent reads return None — tests the
1066        // initialisation path).
1067        let lsn = Lsn::new(5, 128);
1068        let scanner = EnvironmentLogScanner::new(&env, Some(lsn));
1069        assert!(scanner.is_some());
1070        let mut scanner = scanner.unwrap();
1071        // No file exists at this LSN — next_entry returns None.
1072        assert!(scanner.next_entry(0).is_none());
1073    }
1074}