noxu_rep/stream/feeder.rs
1//! Feeder - master-side replication sender.
2//!
3//! Tracks the state of
4//! feeding replication data to a single replica, including the current
5//! VLSN position, acknowledged VLSN, output queue, and heartbeat tracking.
6//!
7//! The [`FeederRunner`] provides the active I/O loop that scans the log
8//! forward from a given VLSN, frames each entry, and sends it to the replica
9//! via a [`Channel`]. Acks are received on the same channel.
10//!
11//! [`EnvironmentLogScanner`] is the live implementation of [`LogScanner`]
12//! backed by the real `LogManager` + `FileManager`.
13//! Rep.impl.node.Feeder.MasterFeederSource`.
14
15use noxu_dbi::EnvironmentImpl;
16use noxu_log::MAX_ITEM_SIZE;
17use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
18use noxu_log::file_header::LOG_VERSION as LOG_FILE_VERSION;
19use noxu_log::file_header::on_disk_size as file_header_on_disk_size;
20use noxu_log::file_manager::FileManager;
21use noxu_sync::Mutex;
22use noxu_util::lsn::{Lsn, NULL_LSN};
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use crate::error::{RepError, Result};
27use crate::net::channel::Channel;
28
29// CRC32 (Ethernet/zlib polynomial via PCLMULQDQ on x86-64 — ~18 GiB/s).
30// See docs/checksum-selection.md for the full benchmark rationale.
31use crc32fast;
32
33// ---------------------------------------------------------------------------
34// Log scanner trait
35// ---------------------------------------------------------------------------
36
37/// An iterator over log entries starting from a given VLSN.
38///
39/// Corresponds to `FeederSource` / `MasterFeederSource`. The scanner
40/// returns `(vlsn, entry_type, payload)` tuples in VLSN order. Returning
41/// `None` signals that there are no more entries *yet*; the caller will call
42/// `next_entry` again after a short wait.
43pub trait LogScanner: Send {
44 /// Return the next available entry with VLSN >= `from_vlsn`, or `None` if
45 /// no new entry is available at this moment.
46 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)>;
47}
48
49// ---------------------------------------------------------------------------
50// EnvironmentLogScanner
51// ---------------------------------------------------------------------------
52
53/// `LogScanner` implementation backed by the live `EnvironmentImpl`.
54///
55/// Scans the log forward from an LSN cursor, returning entries that carry a
56/// VLSN >= `from_vlsn`. On each call to `next_entry` the scanner advances
57/// its internal file/offset position one entry at a time; when it reaches
58/// the current end of the log it returns `None` (the `FeederRunner` will
59/// call again after a brief poll interval).
60///
61///
62pub struct EnvironmentLogScanner {
63 /// The log `FileManager` used for raw byte-level reads.
64 file_manager: Arc<FileManager>,
65 /// Current scan position: next file number and byte offset to read.
66 cursor_file: u32,
67 cursor_offset: u64,
68 /// Highest VLSN returned so far (to avoid duplicates on re-entrant calls).
69 last_returned_vlsn: u64,
70}
71
72impl EnvironmentLogScanner {
73 /// Create a scanner that starts at `start_lsn`.
74 ///
75 /// If `start_lsn` is `NULL_LSN` the scanner begins at the very first log
76 /// entry in file 0. The correct first-entry offset is resolved from
77 /// file 0's `log_version` (32 for v2, 36 for v3). If file 0 does not
78 /// exist yet (empty log) the current-version default (36) is used.
79 ///
80 /// Obtain the `FileManager` from `EnvironmentImpl::get_log_manager()` →
81 /// `LogManager` is not directly accessible, but `EnvironmentImpl` exposes
82 /// a `get_log_manager()` returning `Option<Arc<LogManager>>`. For the
83 /// scanner we need the `FileManager` underneath it; the simplest approach
84 /// is to construct the scanner directly from a `FileManager` Arc.
85 pub fn new(env: &EnvironmentImpl, start_lsn: Option<Lsn>) -> Option<Self> {
86 // We need the FileManager to do raw byte reads. It is not directly
87 // exposed on EnvironmentImpl, so we access it via the LogManager.
88 // LogManager::file_manager is private, so we carry it separately.
89 // For now, use the env_home path to construct a read-only FileManager.
90 //
91 // The master feeder reads the log files
92 // directly, starting at the replica's current VLSN position.
93 let env_home = env.get_env_home().to_path_buf();
94 let fm = Arc::new(
95 FileManager::new(&env_home, true, 256 * 1024 * 1024, 32).ok()?,
96 );
97
98 let (cursor_file, cursor_offset) = match start_lsn {
99 Some(lsn) if lsn != NULL_LSN => {
100 (lsn.file_number(), lsn.file_offset() as u64)
101 }
102 _ => {
103 // Start from file 0, first entry offset. Use the actual
104 // header size of file 0 if it exists (v2 → 32, v3 → 36);
105 // fall back to current-version default (36) if it does not.
106 let file0_offset =
107 fm.file_header_size_for(0).unwrap_or_else(|_| {
108 file_header_on_disk_size(LOG_FILE_VERSION)
109 }) as u64;
110 (0, file0_offset)
111 }
112 };
113
114 Some(Self {
115 file_manager: fm,
116 cursor_file,
117 cursor_offset,
118 last_returned_vlsn: 0,
119 })
120 }
121
122 /// Read the raw header+payload at `(file_num, offset)`.
123 ///
124 /// Returns `(entry_size_bytes, vlsn_opt, entry_type_byte, payload)` or
125 /// `None` if the bytes don't form a valid entry (zero fill, truncation).
126 fn read_raw_entry(
127 &self,
128 file_num: u32,
129 offset: u64,
130 ) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
131 let mut hdr = [0u8; MIN_HEADER_SIZE];
132 let n = self
133 .file_manager
134 .read_from_file(file_num, offset, &mut hdr)
135 .ok()?;
136 if n < MIN_HEADER_SIZE {
137 return None;
138 }
139 // Zero-fill region past last written entry.
140 if hdr[4] == 0 {
141 return None;
142 }
143
144 let entry_type_byte = hdr[4];
145 let flags = hdr[5];
146 let item_size =
147 u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
148
149 let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
150 let header_size =
151 if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
152
153 // Sanity cap: same shared MAX_ITEM_SIZE used by every log reader
154 // so that an attacker who flips item_size cannot cause a 100 MiB
155 // allocation here while passing other readers' bounds.
156 if item_size > MAX_ITEM_SIZE {
157 return None;
158 }
159
160 let entry_size = header_size + item_size;
161 let mut full = vec![0u8; entry_size];
162 let n = self
163 .file_manager
164 .read_from_file(file_num, offset, &mut full)
165 .ok()?;
166 if n < entry_size {
167 return None;
168 }
169
170 // Extract VLSN from the header extension if present (8-byte LE i64).
171 let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
172 let raw = i64::from_le_bytes(
173 full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
174 );
175 if raw > 0 {
176 Some(raw as u64)
177 } else {
178 // Negative or zero i64 with vlsn_present flag set is a
179 // contradiction (NULL VLSN should not have the flag set).
180 // Surface it but keep the legacy "treat as missing" behaviour
181 // so a single corrupt entry does not stall the feeder.
182 // LOG-9.
183 log::warn!(
184 "EnvironmentLogScanner: implausible VLSN value {} at \
185 file {:08x} offset {:#x}; treating as no-VLSN",
186 raw,
187 file_num,
188 offset,
189 );
190 None
191 }
192 } else {
193 None
194 };
195
196 let payload = full[header_size..].to_vec();
197 Some((entry_size, vlsn_opt, entry_type_byte, payload))
198 }
199}
200
201impl LogScanner for EnvironmentLogScanner {
202 /// Return the next entry with VLSN >= `from_vlsn`, advancing the cursor.
203 ///
204 /// Scans forward one entry at a time. Returns `None` when the cursor
205 /// reaches the end of the currently-written log. The feeder will sleep
206 /// briefly and call again.
207 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
208 // Collect file numbers once per call; cheap (directory listing).
209 let file_nums = self.file_manager.list_file_numbers().ok()?;
210 if file_nums.is_empty() {
211 return None;
212 }
213
214 loop {
215 // Skip files before the current cursor file.
216 if !file_nums.contains(&self.cursor_file) {
217 // Advance to the next known file.
218 let next =
219 file_nums.iter().find(|&&n| n > self.cursor_file).copied();
220 match next {
221 Some(n) => {
222 self.cursor_file = n;
223 self.cursor_offset =
224 self.file_manager
225 .file_header_size_for(n)
226 .unwrap_or_else(|_| {
227 file_header_on_disk_size(LOG_FILE_VERSION)
228 }) as u64;
229 }
230 None => return None, // No more files.
231 }
232 }
233
234 let file_len =
235 self.file_manager.get_file_length(self.cursor_file).ok()?;
236
237 if self.cursor_offset >= file_len {
238 // End of current file: move to next file.
239 let next =
240 file_nums.iter().find(|&&n| n > self.cursor_file).copied();
241 match next {
242 Some(n) => {
243 self.cursor_file = n;
244 self.cursor_offset =
245 self.file_manager
246 .file_header_size_for(n)
247 .unwrap_or_else(|_| {
248 file_header_on_disk_size(LOG_FILE_VERSION)
249 }) as u64;
250 continue;
251 }
252 None => return None, // End of log.
253 }
254 }
255
256 match self.read_raw_entry(self.cursor_file, self.cursor_offset) {
257 None => {
258 // End of written data in this file; move to next.
259 let next = file_nums
260 .iter()
261 .find(|&&n| n > self.cursor_file)
262 .copied();
263 match next {
264 Some(n) => {
265 self.cursor_file = n;
266 self.cursor_offset = self
267 .file_manager
268 .file_header_size_for(n)
269 .unwrap_or_else(|_| {
270 file_header_on_disk_size(LOG_FILE_VERSION)
271 })
272 as u64;
273 continue;
274 }
275 None => return None,
276 }
277 }
278 Some((entry_size, vlsn_opt, entry_type_byte, payload)) => {
279 self.cursor_offset += entry_size as u64;
280
281 if let Some(vlsn) = vlsn_opt
282 && vlsn >= from_vlsn
283 && vlsn > self.last_returned_vlsn
284 {
285 self.last_returned_vlsn = vlsn;
286 return Some((vlsn, entry_type_byte, payload));
287 }
288 // Entry has no VLSN, vlsn < from_vlsn, or already
289 // returned: keep scanning.
290 }
291 }
292 }
293 }
294}
295
296// ---------------------------------------------------------------------------
297// FeederRunner
298// ---------------------------------------------------------------------------
299
300/// Wire frame format (all integers little-endian):
301///
302/// ```text
303/// ┌──────────────────────────────────────────────────────────┐
304/// │ vlsn : u64 (8 bytes) │
305/// │ entry_type : u8 (1 byte) │
306/// │ payload_len : u32 (4 bytes) │
307/// │ crc32 : u32 (4 bytes) — CRC32 of payload bytes │
308/// ├──────────────────────────────────────────────────────────┤
309/// │ payload : [u8; payload_len] │
310/// └──────────────────────────────────────────────────────────┘
311/// ```
312///
313/// The receiver verifies `crc32fast::hash(payload) == crc32` before
314/// applying the entry. A mismatch is returned as [`RepError::FrameCorrupted`].
315const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; // vlsn + type + len + crc32
316
317/// Active feeder I/O loop.
318///
319/// `FeederRunner` owns a channel to a specific replica and a starting VLSN.
320/// `run()` is a blocking loop that:
321/// 1. Scans the log for entries at `vlsn_start` and beyond.
322/// 2. Frames each entry and sends it to the replica.
323/// 3. Reads ack messages back from the replica and advances `acked_vlsn`.
324/// 4. Returns when the channel is closed or an I/O error occurs.
325///
326/// The runner is single-threaded: log scanning, framing, sending, and ack
327/// polling all interleave inside the one `run()` loop on the caller's
328/// thread. There is no separate output/input thread pair; the same loop
329/// handles both directions.
330pub struct FeederRunner {
331 /// Channel to the replica.
332 channel: Arc<dyn Channel>,
333 /// First VLSN to send.
334 vlsn_start: u64,
335 /// Most recent VLSN acknowledged by the replica (tracked externally via
336 /// the owning [`Feeder`] state struct, but also tracked here for quick
337 /// access).
338 known_replica_vlsn: Mutex<u64>,
339 /// REP-9: name of the replica this runner serves, and a sink that
340 /// forwards each inbound ack `(replica_name, acked_vlsn)` to the owning
341 /// environment's `record_ack`. Without this, production acks reached
342 /// only the private `known_replica_vlsn` and never the `AckTracker`
343 /// (commit-blocking quorum) or `Feeder::acked_vlsn` (DTVLSN ranking).
344 /// Mirrors JE `FeederTxns.noteReplicaAck` being driven from the feeder
345 /// input loop.
346 replica_name: String,
347 ack_sink: Option<AckSink>,
348}
349
350/// REP-9: callback invoked by `FeederRunner::run` for each inbound ack,
351/// forwarding `(replica_name, acked_vlsn)` to `env.record_ack`.
352pub type AckSink = Arc<dyn Fn(&str, u64) + Send + Sync>;
353
354impl FeederRunner {
355 /// Create a new `FeederRunner`.
356 ///
357 /// # Arguments
358 /// * `channel` - The channel to the replica.
359 /// * `vlsn_start` - The VLSN from which to begin streaming.
360 pub fn new(channel: Arc<dyn Channel>, vlsn_start: u64) -> Self {
361 Self {
362 channel,
363 vlsn_start,
364 known_replica_vlsn: Mutex::new(0),
365 replica_name: String::new(),
366 ack_sink: None,
367 }
368 }
369
370 /// REP-9: like [`Self::new`] but wires an ack sink so each inbound ack is
371 /// forwarded to the owning environment (`env.record_ack(vlsn, name)`),
372 /// bridging the production feeder path to the `AckTracker` and the
373 /// `Feeder::acked_vlsn` that the DTVLSN computation reads.
374 pub fn new_with_ack_sink(
375 channel: Arc<dyn Channel>,
376 vlsn_start: u64,
377 replica_name: String,
378 ack_sink: AckSink,
379 ) -> Self {
380 Self {
381 channel,
382 vlsn_start,
383 known_replica_vlsn: Mutex::new(0),
384 replica_name,
385 ack_sink: Some(ack_sink),
386 }
387 }
388
389 /// Return the last VLSN acknowledged by the replica.
390 pub fn known_replica_vlsn(&self) -> u64 {
391 *self.known_replica_vlsn.lock()
392 }
393
394 /// Run the feeder loop.
395 ///
396 /// Streams all log entries from `vlsn_start` to the replica, then polls
397 /// for new entries. Acks from the replica update `known_replica_vlsn`.
398 ///
399 /// Returns `Ok(())` when the channel is closed gracefully, or `Err` on
400 /// an I/O error.
401 pub fn run(&self, log_scanner: &mut dyn LogScanner) -> Result<()> {
402 let mut next_vlsn = self.vlsn_start;
403 let poll_interval = Duration::from_millis(5);
404 let ack_timeout = Duration::from_millis(1);
405
406 loop {
407 // ----------------------------------------------------------------
408 // 1. Send all available log entries.
409 // ----------------------------------------------------------------
410 while let Some((vlsn, entry_type, payload)) =
411 log_scanner.next_entry(next_vlsn)
412 {
413 self.send_entry(vlsn, entry_type, &payload)?;
414 next_vlsn = vlsn + 1;
415 }
416
417 // ----------------------------------------------------------------
418 // 2. Poll for acks from the replica (non-blocking style).
419 // ----------------------------------------------------------------
420 match self.channel.receive(ack_timeout) {
421 Ok(Some(ack_bytes)) => {
422 if ack_bytes.len() >= 8 {
423 let vlsn = u64::from_le_bytes(
424 ack_bytes[..8].try_into().unwrap(),
425 );
426 {
427 let mut guard = self.known_replica_vlsn.lock();
428 if vlsn > *guard {
429 *guard = vlsn;
430 }
431 }
432 // REP-9 Part 1: forward the ack to the owning env so
433 // it reaches the AckTracker (commit-blocking quorum)
434 // AND Feeder::acked_vlsn (DTVLSN ranking). JE drives
435 // FeederTxns.noteReplicaAck from this same loop.
436 if let Some(sink) = &self.ack_sink {
437 sink(&self.replica_name, vlsn);
438 }
439 }
440 // Continue without sleeping — more acks may be waiting.
441 continue;
442 }
443 Ok(None) => {
444 // Timeout: no ack yet, check for more log entries.
445 std::thread::sleep(poll_interval);
446 continue;
447 }
448 Err(RepError::ChannelClosed(_)) => {
449 // Replica disconnected; clean shutdown.
450 return Ok(());
451 }
452 Err(e) => return Err(e),
453 }
454 }
455 }
456
457 /// Frame and send a single log entry.
458 ///
459 /// Frame format (all LE):
460 /// `[vlsn:8][type:1][payload_len:4][crc32:4][payload]`
461 fn send_entry(
462 &self,
463 vlsn: u64,
464 entry_type: u8,
465 payload: &[u8],
466 ) -> Result<()> {
467 let crc = crc32fast::hash(payload);
468 let mut frame = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
469 frame.extend_from_slice(&vlsn.to_le_bytes());
470 frame.push(entry_type);
471 frame.extend_from_slice(&(payload.len() as u32).to_le_bytes());
472 frame.extend_from_slice(&crc.to_le_bytes());
473 frame.extend_from_slice(payload);
474 self.channel.send(&frame)
475 }
476}
477
478/// The state of a feeder connection to a replica.
479///
480/// Feeder state machine.
481#[derive(Debug, Clone, Copy, PartialEq, Eq)]
482pub enum FeederState {
483 /// Not connected to any replica.
484 Idle,
485 /// Performing the initial handshake (protocol negotiation).
486 Handshaking,
487 /// Actively streaming replication data.
488 Streaming,
489 /// Shutting down.
490 Shutdown,
491}
492
493/// Tracks the state of feeding replication data to a single replica.
494///
495/// Each `Feeder` instance corresponds to one replica connection. The feeder
496/// maintains a queue of outbound messages and tracks which VLSNs have been
497/// sent vs. acknowledged.
498///
499///
500pub struct Feeder {
501 /// Name of the replica this feeder is serving.
502 replica_name: String,
503 /// Current connection state.
504 state: Mutex<FeederState>,
505 /// Next VLSN to send to the replica.
506 current_vlsn: Mutex<u64>,
507 /// Last VLSN acknowledged by the replica.
508 acked_vlsn: Mutex<u64>,
509 /// Timestamp of last activity (send or receive).
510 last_activity: Mutex<Instant>,
511 /// Pending messages queued for sending to the replica.
512 /// Each message is a serialized byte vector.
513 output_queue: Mutex<Vec<Vec<u8>>>,
514}
515
516impl Feeder {
517 /// Create a new feeder for the named replica.
518 pub fn new(replica_name: String) -> Self {
519 Feeder {
520 replica_name,
521 state: Mutex::new(FeederState::Idle),
522 current_vlsn: Mutex::new(0),
523 acked_vlsn: Mutex::new(0),
524 last_activity: Mutex::new(Instant::now()),
525 output_queue: Mutex::new(Vec::new()),
526 }
527 }
528
529 /// Return the name of the replica this feeder is serving.
530 pub fn get_replica_name(&self) -> String {
531 self.replica_name.clone()
532 }
533
534 /// Return the current feeder state.
535 pub fn get_state(&self) -> FeederState {
536 *self.state.lock()
537 }
538
539 /// Set the feeder state.
540 pub fn set_state(&self, state: FeederState) {
541 *self.state.lock() = state;
542 }
543
544 /// Return the next VLSN that will be sent.
545 pub fn get_current_vlsn(&self) -> u64 {
546 *self.current_vlsn.lock()
547 }
548
549 /// Return the last VLSN acknowledged by the replica.
550 pub fn get_acked_vlsn(&self) -> u64 {
551 *self.acked_vlsn.lock()
552 }
553
554 /// Queue a log entry for sending to the replica.
555 ///
556 /// The entry is serialized into a byte vector containing the VLSN,
557 /// entry type, and raw data. The current VLSN is advanced to one past
558 /// the queued VLSN **only when `vlsn >= current_vlsn`**; if `vlsn`
559 /// is older than the current position the entry is still queued but
560 /// `current_vlsn` is left unchanged.
561 pub fn queue_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
562 let mut msg = Vec::with_capacity(9 + data.len());
563 msg.extend_from_slice(&vlsn.to_le_bytes());
564 msg.push(entry_type);
565 msg.extend_from_slice(&data);
566
567 self.output_queue.lock().push(msg);
568
569 let mut current = self.current_vlsn.lock();
570 if vlsn >= *current {
571 *current = vlsn + 1;
572 }
573
574 *self.last_activity.lock() = Instant::now();
575 }
576
577 /// Record an acknowledgement from the replica.
578 ///
579 /// The acked VLSN is updated if the new value is greater than the
580 /// current acked VLSN (acks should arrive in order, but we are
581 /// defensive).
582 pub fn record_ack(&self, vlsn: u64) {
583 let mut acked = self.acked_vlsn.lock();
584 if vlsn > *acked {
585 *acked = vlsn;
586 }
587 *self.last_activity.lock() = Instant::now();
588 }
589
590 /// Return the replication lag: the difference between the current
591 /// VLSN (next to send) and the last acknowledged VLSN.
592 ///
593 /// A lag of 0 means the replica is fully caught up.
594 pub fn get_lag(&self) -> u64 {
595 let current = *self.current_vlsn.lock();
596 let acked = *self.acked_vlsn.lock();
597 current.saturating_sub(acked)
598 }
599
600 /// Take all queued messages (drain the output queue).
601 ///
602 /// Returns the messages in the order they were queued and leaves
603 /// the queue empty.
604 pub fn drain_queue(&self) -> Vec<Vec<u8>> {
605 let mut queue = self.output_queue.lock();
606 std::mem::take(&mut *queue)
607 }
608
609 /// Check if the feeder has timed out (no activity within the given
610 /// duration).
611 pub fn is_timed_out(&self, timeout: Duration) -> bool {
612 self.last_activity.lock().elapsed() > timeout
613 }
614
615 /// Update the activity timestamp to the current time.
616 pub fn touch(&self) {
617 *self.last_activity.lock() = Instant::now();
618 }
619}
620
621impl std::fmt::Debug for Feeder {
622 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
623 f.debug_struct("Feeder")
624 .field("replica_name", &self.replica_name)
625 .field("state", &self.get_state())
626 .field("current_vlsn", &self.get_current_vlsn())
627 .field("acked_vlsn", &self.get_acked_vlsn())
628 .field("lag", &self.get_lag())
629 .finish()
630 }
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636 use crate::net::channel::LocalChannelPair;
637 use std::collections::VecDeque;
638
639 // -----------------------------------------------------------------------
640 // Helpers
641 // -----------------------------------------------------------------------
642
643 /// A simple in-memory log scanner backed by a queue.
644 struct VecLogScanner {
645 entries: VecDeque<(u64, u8, Vec<u8>)>,
646 }
647
648 impl VecLogScanner {
649 fn new(entries: Vec<(u64, u8, Vec<u8>)>) -> Self {
650 Self { entries: entries.into_iter().collect() }
651 }
652 }
653
654 impl LogScanner for VecLogScanner {
655 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
656 if let Some(&(vlsn, _, _)) = self.entries.front()
657 && vlsn >= from_vlsn
658 {
659 return self.entries.pop_front();
660 }
661 None
662 }
663 }
664
665 // -----------------------------------------------------------------------
666 // FeederRunner tests
667 // -----------------------------------------------------------------------
668
669 #[test]
670 fn test_feeder_runner_sends_entries_via_local_channel() {
671 // Build a 3-entry log.
672 let entries = vec![
673 (1u64, 10u8, vec![0xAA]),
674 (2u64, 20u8, vec![0xBB, 0xCC]),
675 (3u64, 30u8, vec![]),
676 ];
677 let pair = LocalChannelPair::new();
678 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
679 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
680
681 // Receiver side: collect frames and send back acks.
682 let recv_handle = {
683 let receiver = Arc::clone(&receiver);
684 std::thread::spawn(move || {
685 let mut received: Vec<(u64, u8, Vec<u8>)> = Vec::new();
686 let timeout = Duration::from_secs(5);
687
688 for _ in 0..3 {
689 let frame = receiver.receive(timeout).unwrap().unwrap();
690 // Parse frame: [vlsn:8][type:1][len:4][crc32:4][payload]
691 let vlsn =
692 u64::from_le_bytes(frame[0..8].try_into().unwrap());
693 let entry_type = frame[8];
694 let payload_len =
695 u32::from_le_bytes(frame[9..13].try_into().unwrap())
696 as usize;
697 let expected_crc =
698 u32::from_le_bytes(frame[13..17].try_into().unwrap());
699 let payload = frame[17..17 + payload_len].to_vec();
700 let actual_crc = crc32fast::hash(&payload);
701 assert_eq!(
702 actual_crc, expected_crc,
703 "CRC mismatch for vlsn={vlsn}"
704 );
705 received.push((vlsn, entry_type, payload));
706
707 // Send ack.
708 let mut ack = Vec::with_capacity(8);
709 ack.extend_from_slice(&vlsn.to_le_bytes());
710 receiver.send(&ack).unwrap();
711 }
712
713 received
714 })
715 };
716
717 let mut scanner = VecLogScanner::new(entries);
718 // FeederRunner polls for acks until the channel is closed.
719 // Close the sender side after the scanner drains so run() returns.
720 let runner = FeederRunner::new(Arc::clone(&sender), 1);
721
722 // Run in a separate thread so we can close the channel.
723 let runner_arc = Arc::new(runner);
724 let runner_ref = Arc::clone(&runner_arc);
725 let sender_ref = Arc::clone(&sender);
726 let run_handle =
727 std::thread::spawn(move || runner_ref.run(&mut scanner));
728
729 // Wait for receiver to collect all 3 entries.
730 let received = recv_handle.join().unwrap();
731 assert_eq!(received.len(), 3);
732 assert_eq!(received[0], (1, 10, vec![0xAA]));
733 assert_eq!(received[1], (2, 20, vec![0xBB, 0xCC]));
734 assert_eq!(received[2], (3, 30, vec![]));
735
736 // Verify ack was tracked.
737 //
738 // The receiver thread sends one ack per frame and exits as soon as
739 // the third ack is queued — but `FeederRunner::run()` is on a
740 // separate thread that polls the channel with a 1 ms timeout and
741 // then sleeps 5 ms between polls. By the time `recv_handle.join()`
742 // returns, the runner may not yet have drained all three acks
743 // from the queue. Poll briefly (≤ 100 ms) for the runner to catch
744 // up. 100 ms is ~6× the worst-case three-ack drain time on a
745 // healthy machine, so a real perf regression on the runner's
746 // ack-handling path will still surface as a test failure.
747 let deadline = Instant::now() + Duration::from_millis(100);
748 while runner_arc.known_replica_vlsn() < 3 && Instant::now() < deadline {
749 std::thread::sleep(Duration::from_millis(2));
750 }
751 assert!(
752 runner_arc.known_replica_vlsn() == 3,
753 "FeederRunner did not drain all 3 acks within 100 ms; \
754 known_replica_vlsn() == {}, expected 3 (the receiver thread \
755 sent 3 acks before exiting; the runner reads them one at a \
756 time with a 1 ms timeout + 5 ms sleep cycle)",
757 runner_arc.known_replica_vlsn()
758 );
759
760 // Close the channel to terminate the run loop.
761 sender_ref.close().unwrap();
762 run_handle.join().unwrap().unwrap();
763 }
764
765 #[test]
766 fn test_feeder_runner_empty_scanner_returns_on_close() {
767 let pair = LocalChannelPair::new();
768 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
769 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
770
771 let runner = FeederRunner::new(Arc::clone(&sender), 1);
772 let sender_clone = Arc::clone(&sender);
773
774 // Close the channel almost immediately.
775 let close_handle = std::thread::spawn(move || {
776 std::thread::sleep(Duration::from_millis(50));
777 receiver.close().unwrap();
778 sender_clone.close().unwrap();
779 });
780
781 let mut scanner = VecLogScanner::new(vec![]);
782 let result = runner.run(&mut scanner);
783 assert!(
784 result.is_ok(),
785 "expected Ok on channel close, got {:?}",
786 result
787 );
788 close_handle.join().unwrap();
789 }
790
791 // -----------------------------------------------------------------------
792 // Original Feeder state struct tests
793 // -----------------------------------------------------------------------
794
795 #[test]
796 fn test_new_feeder() {
797 let feeder = Feeder::new("replica1".to_string());
798 assert_eq!(feeder.get_replica_name(), "replica1");
799 assert_eq!(feeder.get_state(), FeederState::Idle);
800 assert_eq!(feeder.get_current_vlsn(), 0);
801 assert_eq!(feeder.get_acked_vlsn(), 0);
802 assert_eq!(feeder.get_lag(), 0);
803 }
804
805 #[test]
806 fn test_state_transitions() {
807 let feeder = Feeder::new("r1".to_string());
808 assert_eq!(feeder.get_state(), FeederState::Idle);
809
810 feeder.set_state(FeederState::Handshaking);
811 assert_eq!(feeder.get_state(), FeederState::Handshaking);
812
813 feeder.set_state(FeederState::Streaming);
814 assert_eq!(feeder.get_state(), FeederState::Streaming);
815
816 feeder.set_state(FeederState::Shutdown);
817 assert_eq!(feeder.get_state(), FeederState::Shutdown);
818 }
819
820 #[test]
821 fn test_queue_and_drain() {
822 let feeder = Feeder::new("r1".to_string());
823 feeder.queue_entry(1, 10, vec![0xAA, 0xBB]);
824 feeder.queue_entry(2, 20, vec![0xCC]);
825 feeder.queue_entry(3, 30, vec![]);
826
827 let messages = feeder.drain_queue();
828 assert_eq!(messages.len(), 3);
829
830 // Verify message format: 8 bytes VLSN + 1 byte type + data.
831 assert_eq!(messages[0].len(), 8 + 1 + 2);
832 assert_eq!(messages[1].len(), 8 + 1 + 1);
833 assert_eq!(messages[2].len(), (8 + 1));
834
835 // Verify VLSN encoding.
836 let vlsn_bytes: [u8; 8] = messages[0][0..8].try_into().unwrap();
837 assert_eq!(u64::from_le_bytes(vlsn_bytes), 1);
838 assert_eq!(messages[0][8], 10); // entry_type
839
840 // Queue should be empty now.
841 let messages2 = feeder.drain_queue();
842 assert!(messages2.is_empty());
843 }
844
845 #[test]
846 fn test_current_vlsn_advances() {
847 let feeder = Feeder::new("r1".to_string());
848 feeder.queue_entry(5, 1, vec![]);
849 assert_eq!(feeder.get_current_vlsn(), 6);
850
851 feeder.queue_entry(10, 1, vec![]);
852 assert_eq!(feeder.get_current_vlsn(), 11);
853
854 // Queueing a lower VLSN should not decrease current_vlsn.
855 feeder.queue_entry(3, 1, vec![]);
856 assert_eq!(feeder.get_current_vlsn(), 11);
857 }
858
859 #[test]
860 fn test_ack_recording() {
861 let feeder = Feeder::new("r1".to_string());
862 feeder.queue_entry(1, 1, vec![]);
863 feeder.queue_entry(2, 1, vec![]);
864 feeder.queue_entry(3, 1, vec![]);
865
866 feeder.record_ack(1);
867 assert_eq!(feeder.get_acked_vlsn(), 1);
868
869 feeder.record_ack(3);
870 assert_eq!(feeder.get_acked_vlsn(), 3);
871
872 // Ack should not go backwards.
873 feeder.record_ack(2);
874 assert_eq!(feeder.get_acked_vlsn(), 3);
875 }
876
877 #[test]
878 fn test_lag_calculation() {
879 let feeder = Feeder::new("r1".to_string());
880 assert_eq!(feeder.get_lag(), 0);
881
882 feeder.queue_entry(1, 1, vec![]);
883 feeder.queue_entry(2, 1, vec![]);
884 feeder.queue_entry(3, 1, vec![]);
885 // current_vlsn = 4, acked_vlsn = 0 -> lag = 4.
886 assert_eq!(feeder.get_lag(), 4);
887
888 feeder.record_ack(2);
889 // current_vlsn = 4, acked_vlsn = 2 -> lag = 2.
890 assert_eq!(feeder.get_lag(), 2);
891
892 feeder.record_ack(4);
893 // Acked caught up to current.
894 assert_eq!(feeder.get_lag(), 0);
895 }
896
897 #[test]
898 fn test_timeout() {
899 let feeder = Feeder::new("r1".to_string());
900 // Just created, should not be timed out with a reasonable timeout.
901 assert!(!feeder.is_timed_out(Duration::from_secs(60)));
902
903 // With a zero timeout, should be timed out immediately (or nearly).
904 // We can't guarantee sub-nanosecond timing, but a zero-duration
905 // timeout is a reasonable edge case.
906 assert!(feeder.is_timed_out(Duration::from_nanos(0)));
907 }
908
909 #[test]
910 fn test_touch_resets_activity() {
911 let feeder = Feeder::new("r1".to_string());
912 // Wait a tiny bit then touch.
913 std::thread::sleep(Duration::from_millis(5));
914 feeder.touch();
915 // After touch, the feeder should not be timed out for a reasonable
916 // duration.
917 assert!(!feeder.is_timed_out(Duration::from_secs(1)));
918 }
919
920 #[test]
921 fn test_debug_format() {
922 let feeder = Feeder::new("replica_debug".to_string());
923 feeder.set_state(FeederState::Streaming);
924 let debug = format!("{:?}", feeder);
925 assert!(debug.contains("replica_debug"));
926 assert!(debug.contains("Streaming"));
927 }
928
929 // -----------------------------------------------------------------------
930 // FeederRunner edge cases (acks, runner restart)
931 // -----------------------------------------------------------------------
932
933 /// Build a sender + receiver channel pair and a runner that sends
934 /// no entries (used by ack-handling edge cases that don't care about
935 /// the entry stream).
936 fn make_runner_with_pair(
937 vlsn_start: u64,
938 ) -> (Arc<dyn Channel>, Arc<dyn Channel>, FeederRunner) {
939 let pair = LocalChannelPair::new();
940 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
941 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
942 let runner = FeederRunner::new(Arc::clone(&sender), vlsn_start);
943 (sender, receiver, runner)
944 }
945
946 #[test]
947 fn test_feeder_runner_short_ack_is_ignored_then_close() {
948 // Send a 4-byte "ack" (less than 8 bytes) — the runner must
949 // not crash, must not advance known_replica_vlsn, and must
950 // continue until the channel closes.
951 let (sender, receiver, runner) = make_runner_with_pair(1);
952 let receiver_clone = Arc::clone(&receiver);
953
954 let close_handle = std::thread::spawn(move || {
955 // Send a malformed (too-short) ack first.
956 std::thread::sleep(Duration::from_millis(20));
957 receiver_clone.send(&[0xAA, 0xBB, 0xCC]).unwrap();
958 // Then close.
959 std::thread::sleep(Duration::from_millis(40));
960 sender.close().unwrap();
961 receiver_clone.close().unwrap();
962 });
963
964 let mut scanner = VecLogScanner::new(vec![]);
965 let r = runner.run(&mut scanner);
966 assert!(r.is_ok(), "short-ack path should not error: {:?}", r);
967 assert_eq!(
968 runner.known_replica_vlsn(),
969 0,
970 "short ack must NOT advance known_replica_vlsn"
971 );
972 close_handle.join().unwrap();
973 }
974
975 #[test]
976 fn test_feeder_runner_ack_advances_known_replica_vlsn() {
977 let (sender, receiver, runner) = make_runner_with_pair(1);
978 let receiver_clone = Arc::clone(&receiver);
979
980 let close_handle = std::thread::spawn(move || {
981 // Send ack vlsn=42, then ack vlsn=10 (must NOT regress
982 // the watermark), then close.
983 std::thread::sleep(Duration::from_millis(20));
984 receiver_clone.send(&42u64.to_le_bytes()).unwrap();
985 std::thread::sleep(Duration::from_millis(20));
986 receiver_clone.send(&10u64.to_le_bytes()).unwrap();
987 std::thread::sleep(Duration::from_millis(40));
988 sender.close().unwrap();
989 receiver_clone.close().unwrap();
990 });
991
992 let mut scanner = VecLogScanner::new(vec![]);
993 let r = runner.run(&mut scanner);
994 assert!(r.is_ok(), "ack path should not error: {:?}", r);
995 assert_eq!(
996 runner.known_replica_vlsn(),
997 42,
998 "ack must advance to highest, never regress"
999 );
1000 close_handle.join().unwrap();
1001 }
1002
1003 #[test]
1004 fn test_feeder_runner_restart_resumes_from_provided_vlsn() {
1005 // First run: send entries 1..=3. Stop. New runner starts at
1006 // vlsn=4. Verify it sends 4..=5 and stops cleanly.
1007 let entries: [(u64, u8, Vec<u8>); 5] = [
1008 (1u64, 0u8, b"e1".to_vec()),
1009 (2, 0, b"e2".to_vec()),
1010 (3, 0, b"e3".to_vec()),
1011 (4, 0, b"e4".to_vec()),
1012 (5, 0, b"e5".to_vec()),
1013 ];
1014
1015 // First runner: vlsn_start = 1
1016 let (sender_a, receiver_a, runner_a) = make_runner_with_pair(1);
1017 let close_a = {
1018 let s = Arc::clone(&sender_a);
1019 let r = Arc::clone(&receiver_a);
1020 std::thread::spawn(move || {
1021 std::thread::sleep(Duration::from_millis(60));
1022 s.close().unwrap();
1023 r.close().unwrap();
1024 })
1025 };
1026 let received_a = {
1027 let r = Arc::clone(&receiver_a);
1028 std::thread::spawn(move || {
1029 let mut got = Vec::new();
1030 while let Ok(Some(frame)) =
1031 r.receive(Duration::from_millis(100))
1032 {
1033 got.push(frame);
1034 }
1035 got
1036 })
1037 };
1038 let mut scanner_a = VecLogScanner::new(entries[0..3].to_vec());
1039 runner_a.run(&mut scanner_a).unwrap();
1040 close_a.join().unwrap();
1041 let frames_a = received_a.join().unwrap();
1042 assert_eq!(
1043 frames_a.len(),
1044 3,
1045 "first runner must send 3 entries, got {}",
1046 frames_a.len()
1047 );
1048
1049 // Second runner: vlsn_start = 4 — resumes where the first
1050 // runner left off (with a fresh channel).
1051 let (sender_b, receiver_b, runner_b) = make_runner_with_pair(4);
1052 let close_b = {
1053 let s = Arc::clone(&sender_b);
1054 let r = Arc::clone(&receiver_b);
1055 std::thread::spawn(move || {
1056 std::thread::sleep(Duration::from_millis(60));
1057 s.close().unwrap();
1058 r.close().unwrap();
1059 })
1060 };
1061 let received_b = {
1062 let r = Arc::clone(&receiver_b);
1063 std::thread::spawn(move || {
1064 let mut got = Vec::new();
1065 while let Ok(Some(frame)) =
1066 r.receive(Duration::from_millis(100))
1067 {
1068 got.push(frame);
1069 }
1070 got
1071 })
1072 };
1073 let mut scanner_b = VecLogScanner::new(entries[3..].to_vec());
1074 runner_b.run(&mut scanner_b).unwrap();
1075 close_b.join().unwrap();
1076 let frames_b = received_b.join().unwrap();
1077 assert_eq!(
1078 frames_b.len(),
1079 2,
1080 "second runner must send 2 entries (4 and 5), got {}",
1081 frames_b.len()
1082 );
1083 }
1084
1085 #[test]
1086 fn test_feeder_runner_known_replica_vlsn_initial_zero() {
1087 let (_sender, _receiver, runner) = make_runner_with_pair(1);
1088 assert_eq!(runner.known_replica_vlsn(), 0);
1089 }
1090
1091 // -----------------------------------------------------------------------
1092 // EnvironmentLogScanner — exercised against a real EnvironmentImpl
1093 // -----------------------------------------------------------------------
1094
1095 #[test]
1096 fn test_environment_log_scanner_new_with_empty_env() {
1097 // Open a fresh environment and construct a scanner. Because
1098 // no log entries have been written, next_entry should return
1099 // None on the first call.
1100 let dir = tempfile::tempdir().expect("tempdir");
1101 let env = EnvironmentImpl::new(dir.path(), false, true)
1102 .expect("EnvironmentImpl::new");
1103
1104 let scanner = EnvironmentLogScanner::new(&env, None);
1105 assert!(scanner.is_some(), "scanner construction should succeed");
1106 let mut scanner = scanner.unwrap();
1107
1108 // Empty log → no entries.
1109 let r = scanner.next_entry(0);
1110 assert!(
1111 r.is_none(),
1112 "next_entry on empty log must return None, got {:?}",
1113 r
1114 );
1115 }
1116
1117 #[test]
1118 fn test_environment_log_scanner_with_explicit_null_lsn() {
1119 let dir = tempfile::tempdir().expect("tempdir");
1120 let env = EnvironmentImpl::new(dir.path(), false, true)
1121 .expect("EnvironmentImpl::new");
1122
1123 // NULL_LSN should be treated the same as None (start from
1124 // file 0, FILE_HEADER_SIZE offset).
1125 let scanner = EnvironmentLogScanner::new(&env, Some(NULL_LSN));
1126 assert!(scanner.is_some());
1127 }
1128
1129 #[test]
1130 fn test_environment_log_scanner_with_explicit_start_lsn() {
1131 let dir = tempfile::tempdir().expect("tempdir");
1132 let env = EnvironmentImpl::new(dir.path(), false, true)
1133 .expect("EnvironmentImpl::new");
1134
1135 // Non-null start LSN — the scanner should record file=5,
1136 // offset=128 as its cursor (no actual file at that offset
1137 // exists, so subsequent reads return None — tests the
1138 // initialisation path).
1139 let lsn = Lsn::new(5, 128);
1140 let scanner = EnvironmentLogScanner::new(&env, Some(lsn));
1141 assert!(scanner.is_some());
1142 let mut scanner = scanner.unwrap();
1143 // No file exists at this LSN — next_entry returns None.
1144 assert!(scanner.next_entry(0).is_none());
1145 }
1146}