1use noxu_dbi::EnvironmentImpl;
16use noxu_log::MAX_ITEM_SIZE;
17use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
18use noxu_log::file_header::FILE_HEADER_SIZE;
19use noxu_log::file_manager::FileManager;
20use noxu_sync::Mutex;
21use noxu_util::lsn::{Lsn, NULL_LSN};
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use crate::error::{RepError, Result};
26use crate::net::channel::Channel;
27
28use crc32fast;
31
32pub trait LogScanner: Send {
43 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)>;
46}
47
48pub struct EnvironmentLogScanner {
62 file_manager: Arc<FileManager>,
64 cursor_file: u32,
66 cursor_offset: u64,
67 last_returned_vlsn: u64,
69}
70
71impl EnvironmentLogScanner {
72 pub fn new(env: &EnvironmentImpl, start_lsn: Option<Lsn>) -> Option<Self> {
83 let env_home = env.get_env_home().to_path_buf();
91 let fm = Arc::new(
92 FileManager::new(&env_home, true, 256 * 1024 * 1024, 32).ok()?,
93 );
94
95 let (cursor_file, cursor_offset) = match start_lsn {
96 Some(lsn) if lsn != NULL_LSN => {
97 (lsn.file_number(), lsn.file_offset() as u64)
98 }
99 _ => {
100 (0, FILE_HEADER_SIZE as u64)
102 }
103 };
104
105 Some(Self {
106 file_manager: fm,
107 cursor_file,
108 cursor_offset,
109 last_returned_vlsn: 0,
110 })
111 }
112
113 fn read_raw_entry(
118 &self,
119 file_num: u32,
120 offset: u64,
121 ) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
122 let mut hdr = [0u8; MIN_HEADER_SIZE];
123 let n = self
124 .file_manager
125 .read_from_file(file_num, offset, &mut hdr)
126 .ok()?;
127 if n < MIN_HEADER_SIZE {
128 return None;
129 }
130 if hdr[4] == 0 {
132 return None;
133 }
134
135 let entry_type_byte = hdr[4];
136 let flags = hdr[5];
137 let item_size =
138 u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
139
140 let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
141 let header_size =
142 if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
143
144 if item_size > MAX_ITEM_SIZE {
148 return None;
149 }
150
151 let entry_size = header_size + item_size;
152 let mut full = vec![0u8; entry_size];
153 let n = self
154 .file_manager
155 .read_from_file(file_num, offset, &mut full)
156 .ok()?;
157 if n < entry_size {
158 return None;
159 }
160
161 let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
163 let raw = i64::from_le_bytes(
164 full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
165 );
166 if raw > 0 {
167 Some(raw as u64)
168 } else {
169 log::warn!(
175 "EnvironmentLogScanner: implausible VLSN value {} at \
176 file {:08x} offset {:#x}; treating as no-VLSN",
177 raw,
178 file_num,
179 offset,
180 );
181 None
182 }
183 } else {
184 None
185 };
186
187 let payload = full[header_size..].to_vec();
188 Some((entry_size, vlsn_opt, entry_type_byte, payload))
189 }
190}
191
192impl LogScanner for EnvironmentLogScanner {
193 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
199 let file_nums = self.file_manager.list_file_numbers().ok()?;
201 if file_nums.is_empty() {
202 return None;
203 }
204
205 loop {
206 if !file_nums.contains(&self.cursor_file) {
208 let next =
210 file_nums.iter().find(|&&n| n > self.cursor_file).copied();
211 match next {
212 Some(n) => {
213 self.cursor_file = n;
214 self.cursor_offset = FILE_HEADER_SIZE as u64;
215 }
216 None => return None, }
218 }
219
220 let file_len =
221 self.file_manager.get_file_length(self.cursor_file).ok()?;
222
223 if self.cursor_offset >= file_len {
224 let next =
226 file_nums.iter().find(|&&n| n > self.cursor_file).copied();
227 match next {
228 Some(n) => {
229 self.cursor_file = n;
230 self.cursor_offset = FILE_HEADER_SIZE as u64;
231 continue;
232 }
233 None => return None, }
235 }
236
237 match self.read_raw_entry(self.cursor_file, self.cursor_offset) {
238 None => {
239 let next = file_nums
241 .iter()
242 .find(|&&n| n > self.cursor_file)
243 .copied();
244 match next {
245 Some(n) => {
246 self.cursor_file = n;
247 self.cursor_offset = FILE_HEADER_SIZE as u64;
248 continue;
249 }
250 None => return None,
251 }
252 }
253 Some((entry_size, vlsn_opt, entry_type_byte, payload)) => {
254 self.cursor_offset += entry_size as u64;
255
256 if let Some(vlsn) = vlsn_opt
257 && vlsn >= from_vlsn
258 && vlsn > self.last_returned_vlsn
259 {
260 self.last_returned_vlsn = vlsn;
261 return Some((vlsn, entry_type_byte, payload));
262 }
263 }
266 }
267 }
268 }
269}
270
271const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; pub struct FeederRunner {
306 channel: Arc<dyn Channel>,
308 vlsn_start: u64,
310 known_replica_vlsn: Mutex<u64>,
314}
315
316impl FeederRunner {
317 pub fn new(channel: Arc<dyn Channel>, vlsn_start: u64) -> Self {
323 Self { channel, vlsn_start, known_replica_vlsn: Mutex::new(0) }
324 }
325
326 pub fn known_replica_vlsn(&self) -> u64 {
328 *self.known_replica_vlsn.lock()
329 }
330
331 pub fn run(&self, log_scanner: &mut dyn LogScanner) -> Result<()> {
339 let mut next_vlsn = self.vlsn_start;
340 let poll_interval = Duration::from_millis(5);
341 let ack_timeout = Duration::from_millis(1);
342
343 loop {
344 while let Some((vlsn, entry_type, payload)) =
348 log_scanner.next_entry(next_vlsn)
349 {
350 self.send_entry(vlsn, entry_type, &payload)?;
351 next_vlsn = vlsn + 1;
352 }
353
354 match self.channel.receive(ack_timeout) {
358 Ok(Some(ack_bytes)) => {
359 if ack_bytes.len() >= 8 {
360 let vlsn = u64::from_le_bytes(
361 ack_bytes[..8].try_into().unwrap(),
362 );
363 let mut guard = self.known_replica_vlsn.lock();
364 if vlsn > *guard {
365 *guard = vlsn;
366 }
367 }
368 continue;
370 }
371 Ok(None) => {
372 std::thread::sleep(poll_interval);
374 continue;
375 }
376 Err(RepError::ChannelClosed(_)) => {
377 return Ok(());
379 }
380 Err(e) => return Err(e),
381 }
382 }
383 }
384
385 fn send_entry(
390 &self,
391 vlsn: u64,
392 entry_type: u8,
393 payload: &[u8],
394 ) -> Result<()> {
395 let crc = crc32fast::hash(payload);
396 let mut frame = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
397 frame.extend_from_slice(&vlsn.to_le_bytes());
398 frame.push(entry_type);
399 frame.extend_from_slice(&(payload.len() as u32).to_le_bytes());
400 frame.extend_from_slice(&crc.to_le_bytes());
401 frame.extend_from_slice(payload);
402 self.channel.send(&frame)
403 }
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410pub enum FeederState {
411 Idle,
413 Handshaking,
415 Streaming,
417 Shutdown,
419}
420
421pub struct Feeder {
429 replica_name: String,
431 state: Mutex<FeederState>,
433 current_vlsn: Mutex<u64>,
435 acked_vlsn: Mutex<u64>,
437 last_activity: Mutex<Instant>,
439 output_queue: Mutex<Vec<Vec<u8>>>,
442}
443
444impl Feeder {
445 pub fn new(replica_name: String) -> Self {
447 Feeder {
448 replica_name,
449 state: Mutex::new(FeederState::Idle),
450 current_vlsn: Mutex::new(0),
451 acked_vlsn: Mutex::new(0),
452 last_activity: Mutex::new(Instant::now()),
453 output_queue: Mutex::new(Vec::new()),
454 }
455 }
456
457 pub fn get_replica_name(&self) -> String {
459 self.replica_name.clone()
460 }
461
462 pub fn get_state(&self) -> FeederState {
464 *self.state.lock()
465 }
466
467 pub fn set_state(&self, state: FeederState) {
469 *self.state.lock() = state;
470 }
471
472 pub fn get_current_vlsn(&self) -> u64 {
474 *self.current_vlsn.lock()
475 }
476
477 pub fn get_acked_vlsn(&self) -> u64 {
479 *self.acked_vlsn.lock()
480 }
481
482 pub fn queue_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
490 let mut msg = Vec::with_capacity(9 + data.len());
491 msg.extend_from_slice(&vlsn.to_le_bytes());
492 msg.push(entry_type);
493 msg.extend_from_slice(&data);
494
495 self.output_queue.lock().push(msg);
496
497 let mut current = self.current_vlsn.lock();
498 if vlsn >= *current {
499 *current = vlsn + 1;
500 }
501
502 *self.last_activity.lock() = Instant::now();
503 }
504
505 pub fn record_ack(&self, vlsn: u64) {
511 let mut acked = self.acked_vlsn.lock();
512 if vlsn > *acked {
513 *acked = vlsn;
514 }
515 *self.last_activity.lock() = Instant::now();
516 }
517
518 pub fn get_lag(&self) -> u64 {
523 let current = *self.current_vlsn.lock();
524 let acked = *self.acked_vlsn.lock();
525 current.saturating_sub(acked)
526 }
527
528 pub fn drain_queue(&self) -> Vec<Vec<u8>> {
533 let mut queue = self.output_queue.lock();
534 std::mem::take(&mut *queue)
535 }
536
537 pub fn is_timed_out(&self, timeout: Duration) -> bool {
540 self.last_activity.lock().elapsed() > timeout
541 }
542
543 pub fn touch(&self) {
545 *self.last_activity.lock() = Instant::now();
546 }
547}
548
549impl std::fmt::Debug for Feeder {
550 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551 f.debug_struct("Feeder")
552 .field("replica_name", &self.replica_name)
553 .field("state", &self.get_state())
554 .field("current_vlsn", &self.get_current_vlsn())
555 .field("acked_vlsn", &self.get_acked_vlsn())
556 .field("lag", &self.get_lag())
557 .finish()
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564 use crate::net::channel::LocalChannelPair;
565 use std::collections::VecDeque;
566
567 struct VecLogScanner {
573 entries: VecDeque<(u64, u8, Vec<u8>)>,
574 }
575
576 impl VecLogScanner {
577 fn new(entries: Vec<(u64, u8, Vec<u8>)>) -> Self {
578 Self { entries: entries.into_iter().collect() }
579 }
580 }
581
582 impl LogScanner for VecLogScanner {
583 fn next_entry(&mut self, from_vlsn: u64) -> Option<(u64, u8, Vec<u8>)> {
584 if let Some(&(vlsn, _, _)) = self.entries.front()
585 && vlsn >= from_vlsn
586 {
587 return self.entries.pop_front();
588 }
589 None
590 }
591 }
592
593 #[test]
598 fn test_feeder_runner_sends_entries_via_local_channel() {
599 let entries = vec![
601 (1u64, 10u8, vec![0xAA]),
602 (2u64, 20u8, vec![0xBB, 0xCC]),
603 (3u64, 30u8, vec![]),
604 ];
605 let pair = LocalChannelPair::new();
606 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
607 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
608
609 let recv_handle = {
611 let receiver = Arc::clone(&receiver);
612 std::thread::spawn(move || {
613 let mut received: Vec<(u64, u8, Vec<u8>)> = Vec::new();
614 let timeout = Duration::from_secs(5);
615
616 for _ in 0..3 {
617 let frame = receiver.receive(timeout).unwrap().unwrap();
618 let vlsn =
620 u64::from_le_bytes(frame[0..8].try_into().unwrap());
621 let entry_type = frame[8];
622 let payload_len =
623 u32::from_le_bytes(frame[9..13].try_into().unwrap())
624 as usize;
625 let expected_crc =
626 u32::from_le_bytes(frame[13..17].try_into().unwrap());
627 let payload = frame[17..17 + payload_len].to_vec();
628 let actual_crc = crc32fast::hash(&payload);
629 assert_eq!(
630 actual_crc, expected_crc,
631 "CRC mismatch for vlsn={vlsn}"
632 );
633 received.push((vlsn, entry_type, payload));
634
635 let mut ack = Vec::with_capacity(8);
637 ack.extend_from_slice(&vlsn.to_le_bytes());
638 receiver.send(&ack).unwrap();
639 }
640
641 received
642 })
643 };
644
645 let mut scanner = VecLogScanner::new(entries);
646 let runner = FeederRunner::new(Arc::clone(&sender), 1);
649
650 let runner_arc = Arc::new(runner);
652 let runner_ref = Arc::clone(&runner_arc);
653 let sender_ref = Arc::clone(&sender);
654 let run_handle =
655 std::thread::spawn(move || runner_ref.run(&mut scanner));
656
657 let received = recv_handle.join().unwrap();
659 assert_eq!(received.len(), 3);
660 assert_eq!(received[0], (1, 10, vec![0xAA]));
661 assert_eq!(received[1], (2, 20, vec![0xBB, 0xCC]));
662 assert_eq!(received[2], (3, 30, vec![]));
663
664 let deadline = Instant::now() + Duration::from_millis(100);
676 while runner_arc.known_replica_vlsn() < 3 && Instant::now() < deadline {
677 std::thread::sleep(Duration::from_millis(2));
678 }
679 assert!(
680 runner_arc.known_replica_vlsn() == 3,
681 "FeederRunner did not drain all 3 acks within 100 ms; \
682 known_replica_vlsn() == {}, expected 3 (the receiver thread \
683 sent 3 acks before exiting; the runner reads them one at a \
684 time with a 1 ms timeout + 5 ms sleep cycle)",
685 runner_arc.known_replica_vlsn()
686 );
687
688 sender_ref.close().unwrap();
690 run_handle.join().unwrap().unwrap();
691 }
692
693 #[test]
694 fn test_feeder_runner_empty_scanner_returns_on_close() {
695 let pair = LocalChannelPair::new();
696 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
697 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
698
699 let runner = FeederRunner::new(Arc::clone(&sender), 1);
700 let sender_clone = Arc::clone(&sender);
701
702 let close_handle = std::thread::spawn(move || {
704 std::thread::sleep(Duration::from_millis(50));
705 receiver.close().unwrap();
706 sender_clone.close().unwrap();
707 });
708
709 let mut scanner = VecLogScanner::new(vec![]);
710 let result = runner.run(&mut scanner);
711 assert!(
712 result.is_ok(),
713 "expected Ok on channel close, got {:?}",
714 result
715 );
716 close_handle.join().unwrap();
717 }
718
719 #[test]
724 fn test_new_feeder() {
725 let feeder = Feeder::new("replica1".to_string());
726 assert_eq!(feeder.get_replica_name(), "replica1");
727 assert_eq!(feeder.get_state(), FeederState::Idle);
728 assert_eq!(feeder.get_current_vlsn(), 0);
729 assert_eq!(feeder.get_acked_vlsn(), 0);
730 assert_eq!(feeder.get_lag(), 0);
731 }
732
733 #[test]
734 fn test_state_transitions() {
735 let feeder = Feeder::new("r1".to_string());
736 assert_eq!(feeder.get_state(), FeederState::Idle);
737
738 feeder.set_state(FeederState::Handshaking);
739 assert_eq!(feeder.get_state(), FeederState::Handshaking);
740
741 feeder.set_state(FeederState::Streaming);
742 assert_eq!(feeder.get_state(), FeederState::Streaming);
743
744 feeder.set_state(FeederState::Shutdown);
745 assert_eq!(feeder.get_state(), FeederState::Shutdown);
746 }
747
748 #[test]
749 fn test_queue_and_drain() {
750 let feeder = Feeder::new("r1".to_string());
751 feeder.queue_entry(1, 10, vec![0xAA, 0xBB]);
752 feeder.queue_entry(2, 20, vec![0xCC]);
753 feeder.queue_entry(3, 30, vec![]);
754
755 let messages = feeder.drain_queue();
756 assert_eq!(messages.len(), 3);
757
758 assert_eq!(messages[0].len(), 8 + 1 + 2);
760 assert_eq!(messages[1].len(), 8 + 1 + 1);
761 assert_eq!(messages[2].len(), (8 + 1));
762
763 let vlsn_bytes: [u8; 8] = messages[0][0..8].try_into().unwrap();
765 assert_eq!(u64::from_le_bytes(vlsn_bytes), 1);
766 assert_eq!(messages[0][8], 10); let messages2 = feeder.drain_queue();
770 assert!(messages2.is_empty());
771 }
772
773 #[test]
774 fn test_current_vlsn_advances() {
775 let feeder = Feeder::new("r1".to_string());
776 feeder.queue_entry(5, 1, vec![]);
777 assert_eq!(feeder.get_current_vlsn(), 6);
778
779 feeder.queue_entry(10, 1, vec![]);
780 assert_eq!(feeder.get_current_vlsn(), 11);
781
782 feeder.queue_entry(3, 1, vec![]);
784 assert_eq!(feeder.get_current_vlsn(), 11);
785 }
786
787 #[test]
788 fn test_ack_recording() {
789 let feeder = Feeder::new("r1".to_string());
790 feeder.queue_entry(1, 1, vec![]);
791 feeder.queue_entry(2, 1, vec![]);
792 feeder.queue_entry(3, 1, vec![]);
793
794 feeder.record_ack(1);
795 assert_eq!(feeder.get_acked_vlsn(), 1);
796
797 feeder.record_ack(3);
798 assert_eq!(feeder.get_acked_vlsn(), 3);
799
800 feeder.record_ack(2);
802 assert_eq!(feeder.get_acked_vlsn(), 3);
803 }
804
805 #[test]
806 fn test_lag_calculation() {
807 let feeder = Feeder::new("r1".to_string());
808 assert_eq!(feeder.get_lag(), 0);
809
810 feeder.queue_entry(1, 1, vec![]);
811 feeder.queue_entry(2, 1, vec![]);
812 feeder.queue_entry(3, 1, vec![]);
813 assert_eq!(feeder.get_lag(), 4);
815
816 feeder.record_ack(2);
817 assert_eq!(feeder.get_lag(), 2);
819
820 feeder.record_ack(4);
821 assert_eq!(feeder.get_lag(), 0);
823 }
824
825 #[test]
826 fn test_timeout() {
827 let feeder = Feeder::new("r1".to_string());
828 assert!(!feeder.is_timed_out(Duration::from_secs(60)));
830
831 assert!(feeder.is_timed_out(Duration::from_nanos(0)));
835 }
836
837 #[test]
838 fn test_touch_resets_activity() {
839 let feeder = Feeder::new("r1".to_string());
840 std::thread::sleep(Duration::from_millis(5));
842 feeder.touch();
843 assert!(!feeder.is_timed_out(Duration::from_secs(1)));
846 }
847
848 #[test]
849 fn test_debug_format() {
850 let feeder = Feeder::new("replica_debug".to_string());
851 feeder.set_state(FeederState::Streaming);
852 let debug = format!("{:?}", feeder);
853 assert!(debug.contains("replica_debug"));
854 assert!(debug.contains("Streaming"));
855 }
856
857 fn make_runner_with_pair(
865 vlsn_start: u64,
866 ) -> (Arc<dyn Channel>, Arc<dyn Channel>, FeederRunner) {
867 let pair = LocalChannelPair::new();
868 let sender: Arc<dyn Channel> = Arc::new(pair.channel_a);
869 let receiver: Arc<dyn Channel> = Arc::new(pair.channel_b);
870 let runner = FeederRunner::new(Arc::clone(&sender), vlsn_start);
871 (sender, receiver, runner)
872 }
873
874 #[test]
875 fn test_feeder_runner_short_ack_is_ignored_then_close() {
876 let (sender, receiver, runner) = make_runner_with_pair(1);
880 let receiver_clone = Arc::clone(&receiver);
881
882 let close_handle = std::thread::spawn(move || {
883 std::thread::sleep(Duration::from_millis(20));
885 receiver_clone.send(&[0xAA, 0xBB, 0xCC]).unwrap();
886 std::thread::sleep(Duration::from_millis(40));
888 sender.close().unwrap();
889 receiver_clone.close().unwrap();
890 });
891
892 let mut scanner = VecLogScanner::new(vec![]);
893 let r = runner.run(&mut scanner);
894 assert!(r.is_ok(), "short-ack path should not error: {:?}", r);
895 assert_eq!(
896 runner.known_replica_vlsn(),
897 0,
898 "short ack must NOT advance known_replica_vlsn"
899 );
900 close_handle.join().unwrap();
901 }
902
903 #[test]
904 fn test_feeder_runner_ack_advances_known_replica_vlsn() {
905 let (sender, receiver, runner) = make_runner_with_pair(1);
906 let receiver_clone = Arc::clone(&receiver);
907
908 let close_handle = std::thread::spawn(move || {
909 std::thread::sleep(Duration::from_millis(20));
912 receiver_clone.send(&42u64.to_le_bytes()).unwrap();
913 std::thread::sleep(Duration::from_millis(20));
914 receiver_clone.send(&10u64.to_le_bytes()).unwrap();
915 std::thread::sleep(Duration::from_millis(40));
916 sender.close().unwrap();
917 receiver_clone.close().unwrap();
918 });
919
920 let mut scanner = VecLogScanner::new(vec![]);
921 let r = runner.run(&mut scanner);
922 assert!(r.is_ok(), "ack path should not error: {:?}", r);
923 assert_eq!(
924 runner.known_replica_vlsn(),
925 42,
926 "ack must advance to highest, never regress"
927 );
928 close_handle.join().unwrap();
929 }
930
931 #[test]
932 fn test_feeder_runner_restart_resumes_from_provided_vlsn() {
933 let entries: [(u64, u8, Vec<u8>); 5] = [
936 (1u64, 0u8, b"e1".to_vec()),
937 (2, 0, b"e2".to_vec()),
938 (3, 0, b"e3".to_vec()),
939 (4, 0, b"e4".to_vec()),
940 (5, 0, b"e5".to_vec()),
941 ];
942
943 let (sender_a, receiver_a, runner_a) = make_runner_with_pair(1);
945 let close_a = {
946 let s = Arc::clone(&sender_a);
947 let r = Arc::clone(&receiver_a);
948 std::thread::spawn(move || {
949 std::thread::sleep(Duration::from_millis(60));
950 s.close().unwrap();
951 r.close().unwrap();
952 })
953 };
954 let received_a = {
955 let r = Arc::clone(&receiver_a);
956 std::thread::spawn(move || {
957 let mut got = Vec::new();
958 while let Ok(Some(frame)) =
959 r.receive(Duration::from_millis(100))
960 {
961 got.push(frame);
962 }
963 got
964 })
965 };
966 let mut scanner_a = VecLogScanner::new(entries[0..3].to_vec());
967 runner_a.run(&mut scanner_a).unwrap();
968 close_a.join().unwrap();
969 let frames_a = received_a.join().unwrap();
970 assert_eq!(
971 frames_a.len(),
972 3,
973 "first runner must send 3 entries, got {}",
974 frames_a.len()
975 );
976
977 let (sender_b, receiver_b, runner_b) = make_runner_with_pair(4);
980 let close_b = {
981 let s = Arc::clone(&sender_b);
982 let r = Arc::clone(&receiver_b);
983 std::thread::spawn(move || {
984 std::thread::sleep(Duration::from_millis(60));
985 s.close().unwrap();
986 r.close().unwrap();
987 })
988 };
989 let received_b = {
990 let r = Arc::clone(&receiver_b);
991 std::thread::spawn(move || {
992 let mut got = Vec::new();
993 while let Ok(Some(frame)) =
994 r.receive(Duration::from_millis(100))
995 {
996 got.push(frame);
997 }
998 got
999 })
1000 };
1001 let mut scanner_b = VecLogScanner::new(entries[3..].to_vec());
1002 runner_b.run(&mut scanner_b).unwrap();
1003 close_b.join().unwrap();
1004 let frames_b = received_b.join().unwrap();
1005 assert_eq!(
1006 frames_b.len(),
1007 2,
1008 "second runner must send 2 entries (4 and 5), got {}",
1009 frames_b.len()
1010 );
1011 }
1012
1013 #[test]
1014 fn test_feeder_runner_known_replica_vlsn_initial_zero() {
1015 let (_sender, _receiver, runner) = make_runner_with_pair(1);
1016 assert_eq!(runner.known_replica_vlsn(), 0);
1017 }
1018
1019 #[test]
1024 fn test_environment_log_scanner_new_with_empty_env() {
1025 let dir = tempfile::tempdir().expect("tempdir");
1029 let env = EnvironmentImpl::new(dir.path(), false, true)
1030 .expect("EnvironmentImpl::new");
1031
1032 let scanner = EnvironmentLogScanner::new(&env, None);
1033 assert!(scanner.is_some(), "scanner construction should succeed");
1034 let mut scanner = scanner.unwrap();
1035
1036 let r = scanner.next_entry(0);
1038 assert!(
1039 r.is_none(),
1040 "next_entry on empty log must return None, got {:?}",
1041 r
1042 );
1043 }
1044
1045 #[test]
1046 fn test_environment_log_scanner_with_explicit_null_lsn() {
1047 let dir = tempfile::tempdir().expect("tempdir");
1048 let env = EnvironmentImpl::new(dir.path(), false, true)
1049 .expect("EnvironmentImpl::new");
1050
1051 let scanner = EnvironmentLogScanner::new(&env, Some(NULL_LSN));
1054 assert!(scanner.is_some());
1055 }
1056
1057 #[test]
1058 fn test_environment_log_scanner_with_explicit_start_lsn() {
1059 let dir = tempfile::tempdir().expect("tempdir");
1060 let env = EnvironmentImpl::new(dir.path(), false, true)
1061 .expect("EnvironmentImpl::new");
1062
1063 let lsn = Lsn::new(5, 128);
1068 let scanner = EnvironmentLogScanner::new(&env, Some(lsn));
1069 assert!(scanner.is_some());
1070 let mut scanner = scanner.unwrap();
1071 assert!(scanner.next_entry(0).is_none());
1073 }
1074}