1use noxu_log::{LogEntryType, Provisional};
16use noxu_sync::Mutex;
17use std::sync::Arc;
18use std::time::Duration;
19
20use crate::error::{RepError, Result};
21use crate::net::channel::Channel;
22
23use crc32fast::hash as crc32_hash;
26
27pub trait LogWriter: Send {
37 fn write_entry(
44 &mut self,
45 vlsn: u64,
46 entry_type: u8,
47 payload: &[u8],
48 ) -> Result<()>;
49}
50
51pub struct EnvironmentLogWriter {
65 log_manager: Arc<noxu_log::LogManager>,
67 vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
69}
70
71impl EnvironmentLogWriter {
72 pub fn new(
78 log_manager: Arc<noxu_log::LogManager>,
79 vlsn_index: Arc<crate::vlsn::vlsn_index::VlsnIndex>,
80 ) -> Self {
81 Self { log_manager, vlsn_index }
82 }
83}
84
85impl LogWriter for EnvironmentLogWriter {
86 fn write_entry(
92 &mut self,
93 vlsn: u64,
94 entry_type: u8,
95 payload: &[u8],
96 ) -> crate::error::Result<()> {
97 let log_entry_type = LogEntryType::from_type_num(entry_type)
99 .ok_or_else(|| {
100 crate::error::RepError::ProtocolError(format!(
101 "replica: unknown entry_type byte {}",
102 entry_type
103 ))
104 })?;
105
106 let lsn = self
110 .log_manager
111 .log(log_entry_type, payload, Provisional::No, false, false)
112 .map_err(|e| {
113 crate::error::RepError::DatabaseError(format!(
114 "replica log write failed: {}",
115 e
116 ))
117 })?;
118
119 if vlsn > 0 {
123 self.vlsn_index.put(vlsn, lsn.file_number(), lsn.file_offset());
124 }
125
126 log::trace!(
127 "replica: wrote entry vlsn={} type={} lsn=({},{})",
128 vlsn,
129 log_entry_type,
130 lsn.file_number(),
131 lsn.file_offset(),
132 );
133
134 Ok(())
135 }
136}
137
138const FRAME_HEADER_LEN: usize = 8 + 1 + 4 + 4; pub struct ReplicaReceiver {
161 channel: Arc<dyn Channel>,
163}
164
165impl ReplicaReceiver {
166 pub fn new(channel: Arc<dyn Channel>) -> Self {
168 Self { channel }
169 }
170
171 pub fn run(&self, log_writer: &mut dyn LogWriter) -> Result<()> {
199 let recv_timeout = Duration::from_secs(30);
200 let mut received_vlsn_high_water: u64 = 0;
204
205 loop {
206 let frame = match self.channel.receive(recv_timeout) {
210 Ok(Some(f)) => f,
211 Ok(None) => {
212 continue;
214 }
215 Err(RepError::ChannelClosed(_)) => {
216 return Ok(());
218 }
219 Err(e) => return Err(e),
220 };
221
222 if frame.len() < FRAME_HEADER_LEN {
227 return Err(RepError::ProtocolError(format!(
228 "replica: short frame: {} bytes",
229 frame.len()
230 )));
231 }
232
233 let vlsn = u64::from_le_bytes(frame[0..8].try_into().unwrap());
234 let entry_type = frame[8];
235 let payload_len =
236 u32::from_le_bytes(frame[9..13].try_into().unwrap()) as usize;
237 let expected_crc =
238 u32::from_le_bytes(frame[13..17].try_into().unwrap());
239
240 if frame.len() < FRAME_HEADER_LEN + payload_len {
241 return Err(RepError::ProtocolError(format!(
242 "replica: frame payload truncated: expected {} bytes, got {}",
243 payload_len,
244 frame.len() - FRAME_HEADER_LEN,
245 )));
246 }
247
248 let payload =
249 &frame[FRAME_HEADER_LEN..FRAME_HEADER_LEN + payload_len];
250
251 let actual_crc = crc32_hash(payload);
255 if actual_crc != expected_crc {
256 return Err(RepError::FrameCorrupted {
257 vlsn,
258 expected: expected_crc,
259 actual: actual_crc,
260 });
261 }
262
263 if vlsn != 0 && vlsn <= received_vlsn_high_water {
274 return Err(RepError::ProtocolError(format!(
275 "replica: VLSN ordering violation: incoming vlsn={vlsn} \
276 <= received high-water {received_vlsn_high_water}; \
277 possible replay attack or master clock-skew"
278 )));
279 }
280
281 if LogEntryType::from_type_num(entry_type).is_none() {
288 log::error!(
289 "replica: unknown entry_type byte {entry_type} on frame \
290 vlsn={vlsn}; skipping (LOG-10)"
291 );
292 continue;
296 }
297
298 log_writer.write_entry(vlsn, entry_type, payload)?;
302
303 if vlsn != 0 {
305 received_vlsn_high_water = vlsn;
306 }
307
308 let ack = vlsn.to_le_bytes();
312 match self.channel.send(&ack) {
313 Ok(()) => {}
314 Err(RepError::ChannelClosed(_)) => return Ok(()),
315 Err(e) => return Err(e),
316 }
317 }
318 }
319}
320
321#[derive(Debug, Clone, Copy, PartialEq, Eq)]
325pub enum ReplicaStreamState {
326 Idle,
328 Connecting,
330 Streaming,
332 CatchingUp,
334 Shutdown,
336}
337
338pub struct ReplicaStream {
346 master_name: Mutex<Option<String>>,
348 state: Mutex<ReplicaStreamState>,
350 applied_vlsn: Mutex<u64>,
352 received_vlsn: Mutex<u64>,
354 master_vlsn: Mutex<u64>,
356 pending_entries: Mutex<Vec<(u64, u8, Vec<u8>)>>,
358}
359
360impl Default for ReplicaStream {
361 fn default() -> Self {
362 Self::new()
363 }
364}
365
366impl ReplicaStream {
367 pub fn new() -> Self {
369 ReplicaStream {
370 master_name: Mutex::new(None),
371 state: Mutex::new(ReplicaStreamState::Idle),
372 applied_vlsn: Mutex::new(0),
373 received_vlsn: Mutex::new(0),
374 master_vlsn: Mutex::new(0),
375 pending_entries: Mutex::new(Vec::new()),
376 }
377 }
378
379 pub fn get_state(&self) -> ReplicaStreamState {
381 *self.state.lock()
382 }
383
384 pub fn set_state(&self, state: ReplicaStreamState) {
386 *self.state.lock() = state;
387 }
388
389 pub fn get_applied_vlsn(&self) -> u64 {
391 *self.applied_vlsn.lock()
392 }
393
394 pub fn get_received_vlsn(&self) -> u64 {
397 *self.received_vlsn.lock()
398 }
399
400 pub fn get_master_vlsn(&self) -> u64 {
402 *self.master_vlsn.lock()
403 }
404
405 pub fn set_master(&self, name: &str) {
407 *self.master_name.lock() = Some(name.to_string());
408 }
409
410 pub fn get_master(&self) -> Option<String> {
412 self.master_name.lock().clone()
413 }
414
415 pub fn receive_entry(&self, vlsn: u64, entry_type: u8, data: Vec<u8>) {
420 self.pending_entries.lock().push((vlsn, entry_type, data));
421 let mut received = self.received_vlsn.lock();
422 if vlsn > *received {
423 *received = vlsn;
424 }
425 }
426
427 pub fn mark_applied(&self, vlsn: u64) {
432 let mut applied = self.applied_vlsn.lock();
433 if vlsn > *applied {
434 *applied = vlsn;
435 }
436 }
437
438 pub fn update_master_vlsn(&self, vlsn: u64) {
441 let mut master = self.master_vlsn.lock();
442 if vlsn > *master {
443 *master = vlsn;
444 }
445 }
446
447 pub fn get_lag(&self) -> u64 {
452 let master = *self.master_vlsn.lock();
453 let applied = *self.applied_vlsn.lock();
454 master.saturating_sub(applied)
455 }
456
457 pub fn drain_pending(&self) -> Vec<(u64, u8, Vec<u8>)> {
462 let mut pending = self.pending_entries.lock();
463 std::mem::take(&mut *pending)
464 }
465
466 pub fn is_caught_up(&self) -> bool {
476 let applied = *self.applied_vlsn.lock();
477 let master = *self.master_vlsn.lock();
478 let pending_empty = self.pending_entries.lock().is_empty();
479 applied >= master && master > 0 && pending_empty
480 }
481}
482
483impl std::fmt::Debug for ReplicaStream {
484 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485 f.debug_struct("ReplicaStream")
486 .field("master", &self.get_master())
487 .field("state", &self.get_state())
488 .field("applied_vlsn", &self.get_applied_vlsn())
489 .field("received_vlsn", &self.get_received_vlsn())
490 .field("master_vlsn", &self.get_master_vlsn())
491 .field("lag", &self.get_lag())
492 .finish()
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::net::channel::LocalChannelPair;
500
501 struct RecordingWriter {
506 entries: Vec<(u64, u8, Vec<u8>)>,
507 }
508
509 impl RecordingWriter {
510 fn new() -> Self {
511 Self { entries: Vec::new() }
512 }
513 }
514
515 impl LogWriter for RecordingWriter {
516 fn write_entry(
517 &mut self,
518 vlsn: u64,
519 entry_type: u8,
520 payload: &[u8],
521 ) -> Result<()> {
522 self.entries.push((vlsn, entry_type, payload.to_vec()));
523 Ok(())
524 }
525 }
526
527 fn make_frame(vlsn: u64, entry_type: u8, payload: &[u8]) -> Vec<u8> {
532 let crc = crc32_hash(payload);
533 let mut f = Vec::with_capacity(FRAME_HEADER_LEN + payload.len());
534 f.extend_from_slice(&vlsn.to_le_bytes());
535 f.push(entry_type);
536 f.extend_from_slice(&(payload.len() as u32).to_le_bytes());
537 f.extend_from_slice(&crc.to_le_bytes());
538 f.extend_from_slice(payload);
539 f
540 }
541
542 #[test]
543 fn test_replica_receiver_receives_and_acks() {
544 let pair = LocalChannelPair::new();
545 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
546 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
547
548 let frames = vec![
550 make_frame(1, 10, &[0xAA]),
551 make_frame(2, 20, &[0xBB, 0xCC]),
552 make_frame(3, 30, &[]),
553 ];
554
555 let master_clone = Arc::clone(&master_side);
556 let send_handle = std::thread::spawn(move || {
557 for f in &frames {
558 master_clone.send(f).unwrap();
559 }
560 let mut acked = Vec::new();
562 let timeout = Duration::from_secs(5);
563 for _ in 0..3 {
564 let ack = master_clone.receive(timeout).unwrap().unwrap();
565 let vlsn = u64::from_le_bytes(ack[..8].try_into().unwrap());
566 acked.push(vlsn);
567 }
568 master_clone.close().unwrap();
570 acked
571 });
572
573 let receiver = ReplicaReceiver::new(Arc::clone(&replica_side));
574 let mut writer = RecordingWriter::new();
575 receiver.run(&mut writer).unwrap();
576
577 let acked = send_handle.join().unwrap();
578 assert_eq!(acked, vec![1, 2, 3]);
579
580 assert_eq!(writer.entries.len(), 3);
581 assert_eq!(writer.entries[0], (1, 10, vec![0xAA]));
582 assert_eq!(writer.entries[1], (2, 20, vec![0xBB, 0xCC]));
583 assert_eq!(writer.entries[2], (3, 30, vec![]));
584 }
585
586 #[test]
587 fn test_replica_receiver_stops_on_channel_close() {
588 let pair = LocalChannelPair::new();
589 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
590 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
591
592 master_side.close().unwrap();
594
595 let receiver = ReplicaReceiver::new(replica_side);
596 let mut writer = RecordingWriter::new();
597 let res = receiver.run(&mut writer);
603 assert!(res.is_ok() || matches!(res, Err(RepError::ChannelClosed(_))));
604 }
605
606 #[test]
611 fn test_feeder_to_replica_round_trip() {
612 use crate::stream::feeder::{FeederRunner, LogScanner};
613 use std::collections::VecDeque;
614
615 struct SimpleScanner {
616 items: VecDeque<(u64, u8, Vec<u8>)>,
617 }
618 impl LogScanner for SimpleScanner {
619 fn next_entry(
620 &mut self,
621 from_vlsn: u64,
622 ) -> Option<(u64, u8, Vec<u8>)> {
623 if let Some(&(v, _, _)) = self.items.front()
624 && v >= from_vlsn
625 {
626 return self.items.pop_front();
627 }
628 None
629 }
630 }
631
632 let pair = LocalChannelPair::new();
633 let master_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
634 let replica_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
635
636 let valid_types: [u8; 5] = [1, 2, 3, 4, 10];
640 let entries: Vec<(u64, u8, Vec<u8>)> = (1..=5)
641 .map(|i| {
642 let etype = valid_types[(i - 1) as usize];
643 (i, etype, vec![i as u8; i as usize])
644 })
645 .collect();
646
647 let replica_ch_clone = Arc::clone(&replica_ch);
649 let replica_handle = std::thread::spawn(move || {
650 let receiver = ReplicaReceiver::new(replica_ch_clone);
651 let mut writer = RecordingWriter::new();
652 receiver.run(&mut writer).unwrap();
653 writer.entries
654 });
655
656 let master_ch_clone = Arc::clone(&master_ch);
658 let feeder_handle = std::thread::spawn(move || {
659 let runner = FeederRunner::new(Arc::clone(&master_ch_clone), 1);
660 let mut scanner =
661 SimpleScanner { items: entries.into_iter().collect() };
662 runner.run(&mut scanner).unwrap();
663 runner.known_replica_vlsn()
664 });
665
666 std::thread::sleep(Duration::from_millis(200));
668 master_ch.close().unwrap();
669 replica_ch.close().unwrap();
670
671 let last_acked = feeder_handle.join().unwrap();
672 let written = replica_handle.join().unwrap();
673
674 assert_eq!(written.len(), 5);
675 for (i, (vlsn, etype, payload)) in written.iter().enumerate() {
676 let expected_vlsn = (i + 1) as u64;
677 assert_eq!(*vlsn, expected_vlsn);
678 assert_eq!(*etype, valid_types[i]);
679 assert_eq!(payload.len(), expected_vlsn as usize);
680 }
681 assert_eq!(last_acked, 5);
682 }
683
684 #[test]
690 fn test_replica_rejects_replayed_vlsn() {
691 let pair = LocalChannelPair::new();
692 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
693 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
694
695 let frames =
697 vec![make_frame(5, 10, b"first"), make_frame(3, 10, b"replay")];
698
699 let master_clone = Arc::clone(&master_side);
700 let _send_handle = std::thread::spawn(move || {
701 for f in &frames {
702 let _ = master_clone.send(f);
703 }
704 let _ = master_clone.receive(Duration::from_secs(2));
706 });
707
708 let receiver = ReplicaReceiver::new(replica_side);
709 let mut writer = RecordingWriter::new();
710 let res = receiver.run(&mut writer);
711
712 match res {
713 Err(RepError::ProtocolError(msg)) => {
714 assert!(
715 msg.contains("VLSN ordering violation"),
716 "expected VLSN-ordering protocol error, got: {msg}"
717 );
718 }
719 other => {
720 panic!("expected ProtocolError on replay, got {other:?}")
721 }
722 }
723
724 assert_eq!(writer.entries.len(), 1);
727 assert_eq!(writer.entries[0].0, 5);
728 }
729
730 #[test]
733 fn test_replica_rejects_duplicate_vlsn() {
734 let pair = LocalChannelPair::new();
735 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
736 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
737
738 let frames = vec![make_frame(7, 10, b"a"), make_frame(7, 10, b"b")];
739
740 let master_clone = Arc::clone(&master_side);
741 let _send_handle = std::thread::spawn(move || {
742 for f in &frames {
743 let _ = master_clone.send(f);
744 }
745 let _ = master_clone.receive(Duration::from_secs(2));
746 });
747
748 let receiver = ReplicaReceiver::new(replica_side);
749 let mut writer = RecordingWriter::new();
750 let res = receiver.run(&mut writer);
751 assert!(
752 matches!(res, Err(RepError::ProtocolError(_))),
753 "expected ProtocolError on duplicate VLSN, got {res:?}"
754 );
755 assert_eq!(writer.entries.len(), 1);
756 }
757
758 #[test]
762 fn test_replica_allows_vlsn_gap() {
763 let pair = LocalChannelPair::new();
764 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
765 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
766
767 let frames = vec![
769 make_frame(1, 10, b"a"),
770 make_frame(5, 10, b"b"),
771 make_frame(100, 10, b"c"),
772 ];
773
774 let master_clone = Arc::clone(&master_side);
775 let send_handle = std::thread::spawn(move || {
776 for f in &frames {
777 master_clone.send(f).unwrap();
778 }
779 for _ in 0..3 {
780 let _ = master_clone.receive(Duration::from_secs(2));
781 }
782 master_clone.close().unwrap();
783 });
784
785 let receiver = ReplicaReceiver::new(replica_side);
786 let mut writer = RecordingWriter::new();
787 receiver.run(&mut writer).unwrap();
788 send_handle.join().unwrap();
789
790 assert_eq!(writer.entries.len(), 3);
791 assert_eq!(writer.entries[0].0, 1);
792 assert_eq!(writer.entries[1].0, 5);
793 assert_eq!(writer.entries[2].0, 100);
794 }
795
796 #[test]
800 fn test_replica_skips_unknown_entry_type() {
801 let pair = LocalChannelPair::new();
802 let master_side: Arc<dyn Channel> = Arc::new(pair.channel_a);
803 let replica_side: Arc<dyn Channel> = Arc::new(pair.channel_b);
804
805 let frames =
810 vec![make_frame(1, 200, b"bogus"), make_frame(2, 10, b"good")];
811
812 let master_clone = Arc::clone(&master_side);
813 let send_handle = std::thread::spawn(move || {
814 for f in &frames {
815 master_clone.send(f).unwrap();
816 }
817 let ack = master_clone.receive(Duration::from_secs(2)).unwrap();
819 master_clone.close().unwrap();
820 ack
821 });
822
823 let receiver = ReplicaReceiver::new(replica_side);
824 let mut writer = RecordingWriter::new();
825 receiver.run(&mut writer).unwrap();
826
827 let ack = send_handle.join().unwrap();
828 let acked_vlsn =
829 u64::from_le_bytes(ack.unwrap()[..8].try_into().unwrap());
830
831 assert_eq!(writer.entries.len(), 1, "bogus frame must be skipped");
832 assert_eq!(writer.entries[0].0, 2);
833 assert_eq!(writer.entries[0].1, 10);
834 assert_eq!(acked_vlsn, 2);
835 }
836
837 #[test]
842 fn test_new_replica_stream() {
843 let stream = ReplicaStream::new();
844 assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
845 assert_eq!(stream.get_applied_vlsn(), 0);
846 assert_eq!(stream.get_received_vlsn(), 0);
847 assert_eq!(stream.get_master_vlsn(), 0);
848 assert!(stream.get_master().is_none());
849 assert_eq!(stream.get_lag(), 0);
850 }
851
852 #[test]
853 fn test_default() {
854 let stream = ReplicaStream::default();
855 assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
856 }
857
858 #[test]
859 fn test_state_transitions() {
860 let stream = ReplicaStream::new();
861 assert_eq!(stream.get_state(), ReplicaStreamState::Idle);
862
863 stream.set_state(ReplicaStreamState::Connecting);
864 assert_eq!(stream.get_state(), ReplicaStreamState::Connecting);
865
866 stream.set_state(ReplicaStreamState::Streaming);
867 assert_eq!(stream.get_state(), ReplicaStreamState::Streaming);
868
869 stream.set_state(ReplicaStreamState::CatchingUp);
870 assert_eq!(stream.get_state(), ReplicaStreamState::CatchingUp);
871
872 stream.set_state(ReplicaStreamState::Shutdown);
873 assert_eq!(stream.get_state(), ReplicaStreamState::Shutdown);
874 }
875
876 #[test]
877 fn test_master_name() {
878 let stream = ReplicaStream::new();
879 assert!(stream.get_master().is_none());
880
881 stream.set_master("master-node-1");
882 assert_eq!(stream.get_master(), Some("master-node-1".to_string()));
883
884 stream.set_master("master-node-2");
885 assert_eq!(stream.get_master(), Some("master-node-2".to_string()));
886 }
887
888 #[test]
889 fn test_receive_and_drain() {
890 let stream = ReplicaStream::new();
891 stream.receive_entry(1, 10, vec![0xAA]);
892 stream.receive_entry(2, 20, vec![0xBB, 0xCC]);
893 stream.receive_entry(3, 30, vec![]);
894
895 assert_eq!(stream.get_received_vlsn(), 3);
896
897 let entries = stream.drain_pending();
898 assert_eq!(entries.len(), 3);
899 assert_eq!(entries[0], (1, 10, vec![0xAA]));
900 assert_eq!(entries[1], (2, 20, vec![0xBB, 0xCC]));
901 assert_eq!(entries[2], (3, 30, vec![]));
902
903 let entries2 = stream.drain_pending();
905 assert!(entries2.is_empty());
906 }
907
908 #[test]
909 fn test_received_vlsn_monotonic() {
910 let stream = ReplicaStream::new();
911 stream.receive_entry(5, 1, vec![]);
912 assert_eq!(stream.get_received_vlsn(), 5);
913
914 stream.receive_entry(3, 1, vec![]);
916 assert_eq!(stream.get_received_vlsn(), 5);
917
918 stream.receive_entry(7, 1, vec![]);
919 assert_eq!(stream.get_received_vlsn(), 7);
920 }
921
922 #[test]
923 fn test_mark_applied() {
924 let stream = ReplicaStream::new();
925 stream.mark_applied(5);
926 assert_eq!(stream.get_applied_vlsn(), 5);
927
928 stream.mark_applied(10);
929 assert_eq!(stream.get_applied_vlsn(), 10);
930
931 stream.mark_applied(7);
933 assert_eq!(stream.get_applied_vlsn(), 10);
934 }
935
936 #[test]
937 fn test_update_master_vlsn() {
938 let stream = ReplicaStream::new();
939 stream.update_master_vlsn(100);
940 assert_eq!(stream.get_master_vlsn(), 100);
941
942 stream.update_master_vlsn(150);
943 assert_eq!(stream.get_master_vlsn(), 150);
944
945 stream.update_master_vlsn(120);
947 assert_eq!(stream.get_master_vlsn(), 150);
948 }
949
950 #[test]
951 fn test_lag_calculation() {
952 let stream = ReplicaStream::new();
953 stream.update_master_vlsn(100);
954 assert_eq!(stream.get_lag(), 100);
955
956 stream.mark_applied(50);
957 assert_eq!(stream.get_lag(), 50);
958
959 stream.mark_applied(100);
960 assert_eq!(stream.get_lag(), 0);
961
962 stream.mark_applied(110);
964 assert_eq!(stream.get_lag(), 0);
965 }
966
967 #[test]
968 fn test_is_caught_up() {
969 let stream = ReplicaStream::new();
970 assert!(!stream.is_caught_up());
972
973 stream.update_master_vlsn(10);
974 assert!(!stream.is_caught_up());
976
977 stream.mark_applied(10);
978 assert!(stream.is_caught_up());
980
981 stream.receive_entry(11, 1, vec![]);
983 stream.update_master_vlsn(11);
984 assert!(!stream.is_caught_up());
985
986 stream.drain_pending();
988 stream.mark_applied(11);
989 assert!(stream.is_caught_up());
990 }
991
992 #[test]
993 fn test_caught_up_with_excess_applied() {
994 let stream = ReplicaStream::new();
995 stream.update_master_vlsn(5);
996 stream.mark_applied(10);
997 assert!(stream.is_caught_up());
999 }
1000
1001 #[test]
1002 fn test_receive_apply_cycle() {
1003 let stream = ReplicaStream::new();
1004 stream.set_master("master1");
1005 stream.set_state(ReplicaStreamState::Streaming);
1006 stream.update_master_vlsn(5);
1007
1008 for i in 1..=5 {
1010 stream.receive_entry(i, 1, vec![i as u8]);
1011 }
1012 assert_eq!(stream.get_received_vlsn(), 5);
1013 assert_eq!(stream.get_lag(), 5);
1014
1015 let entries = stream.drain_pending();
1017 assert_eq!(entries.len(), 5);
1018 for (vlsn, _, _) in &entries {
1019 stream.mark_applied(*vlsn);
1020 }
1021
1022 assert_eq!(stream.get_applied_vlsn(), 5);
1023 assert_eq!(stream.get_lag(), 0);
1024 assert!(stream.is_caught_up());
1025 }
1026
1027 #[test]
1028 fn test_debug_format() {
1029 let stream = ReplicaStream::new();
1030 stream.set_master("test-master");
1031 stream.set_state(ReplicaStreamState::Streaming);
1032 let debug = format!("{:?}", stream);
1033 assert!(debug.contains("test-master"));
1034 assert!(debug.contains("Streaming"));
1035 }
1036}