1use crate::config::ReplicationConfig;
26use crate::error::{ClusterError, Result};
27use crate::node::NodeId;
28use crate::partition::PartitionId;
29use crate::protocol::Acks;
30use dashmap::DashMap;
31use std::collections::{HashMap, HashSet};
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::sync::{oneshot, Mutex, RwLock};
36use tracing::{debug, error, info, warn};
37
38#[derive(Debug)]
40pub struct PartitionReplication {
41 pub partition_id: PartitionId,
43
44 local_node: NodeId,
46
47 is_leader: AtomicBool,
49
50 leader_epoch: AtomicU64,
52
53 log_end_offset: AtomicU64,
55
56 high_watermark: AtomicU64,
58
59 replicas: RwLock<HashMap<NodeId, ReplicaProgress>>,
61
62 isr: RwLock<HashSet<NodeId>>,
64
65 pending_acks: DashMap<u64, PendingAck>,
67
68 config: ReplicationConfig,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReplicaProgress {
75 pub node_id: NodeId,
77
78 pub log_end_offset: u64,
80
81 pub last_fetch: Instant,
83
84 pub in_sync: bool,
86
87 pub lag: u64,
89}
90
91#[derive(Debug)]
96#[allow(dead_code)]
97struct PendingAck {
98 offset: u64,
100 acked_nodes: HashSet<NodeId>,
102 required_acks: usize,
104 completion: oneshot::Sender<Result<()>>,
106 created: Instant,
108}
109
110impl PartitionReplication {
111 pub fn new(
113 partition_id: PartitionId,
114 local_node: NodeId,
115 is_leader: bool,
116 config: ReplicationConfig,
117 ) -> Self {
118 Self {
119 partition_id,
120 local_node,
121 is_leader: AtomicBool::new(is_leader),
122 leader_epoch: AtomicU64::new(0),
123 log_end_offset: AtomicU64::new(0),
124 high_watermark: AtomicU64::new(0),
125 replicas: RwLock::new(HashMap::new()),
126 isr: RwLock::new(HashSet::new()),
127 pending_acks: DashMap::new(),
128 config,
129 }
130 }
131
132 pub async fn become_leader(&self, epoch: u64, replicas: Vec<NodeId>) {
134 self.is_leader.store(true, Ordering::SeqCst);
135 self.leader_epoch.store(epoch, Ordering::SeqCst);
136
137 let mut replica_map = self.replicas.write().await;
139 replica_map.clear();
140
141 for node_id in &replicas {
142 if node_id != &self.local_node {
143 replica_map.insert(
144 node_id.clone(),
145 ReplicaProgress {
146 node_id: node_id.clone(),
147 log_end_offset: 0,
148 last_fetch: Instant::now(),
149 in_sync: true,
150 lag: 0,
151 },
152 );
153 }
154 }
155
156 let mut isr = self.isr.write().await;
161 isr.clear();
162 for node_id in &replicas {
163 isr.insert(node_id.clone());
164 }
165 isr.insert(self.local_node.clone());
166
167 info!(
168 partition = %self.partition_id,
169 epoch = epoch,
170 replicas = replicas.len(),
171 "Became partition leader"
172 );
173 }
174
175 pub fn become_follower(&self, epoch: u64) {
177 self.is_leader.store(false, Ordering::SeqCst);
178 self.leader_epoch.store(epoch, Ordering::SeqCst);
179
180 info!(
181 partition = %self.partition_id,
182 epoch = epoch,
183 "Became partition follower"
184 );
185 }
186
187 pub async fn record_appended(&self, offset: u64) -> Result<()> {
189 let is_leader = self.is_leader.load(Ordering::SeqCst);
193 if is_leader && !self.has_min_isr().await {
194 return Err(ClusterError::NotEnoughIsr {
195 required: self.config.min_isr,
196 current: self.isr.read().await.len() as u16,
197 });
198 }
199
200 self.log_end_offset.fetch_max(offset + 1, Ordering::SeqCst);
202
203 if is_leader {
205 self.maybe_advance_hwm().await;
206 }
207
208 Ok(())
209 }
210
211 pub async fn handle_replica_fetch(
214 &self,
215 replica_id: &NodeId,
216 fetch_offset: u64,
217 ) -> Result<bool> {
218 if !self.is_leader.load(Ordering::SeqCst) {
219 return Err(ClusterError::NotLeader { leader: None });
220 }
221
222 let mut isr_changed = false;
223 let mut replicas = self.replicas.write().await;
224
225 if let Some(progress) = replicas.get_mut(replica_id) {
226 progress.last_fetch = Instant::now();
227 progress.log_end_offset = fetch_offset;
228
229 let leader_leo = self.log_end_offset.load(Ordering::SeqCst);
230 progress.lag = leader_leo.saturating_sub(fetch_offset);
231
232 let should_be_in_sync = progress.lag <= self.config.replica_lag_max_messages;
234
235 if should_be_in_sync != progress.in_sync {
236 progress.in_sync = should_be_in_sync;
237 isr_changed = true;
238
239 let mut isr = self.isr.write().await;
241 if should_be_in_sync {
242 isr.insert(replica_id.clone());
243 info!(
244 partition = %self.partition_id,
245 replica = %replica_id,
246 "Replica joined ISR"
247 );
248 } else {
249 isr.remove(replica_id);
250 warn!(
251 partition = %self.partition_id,
252 replica = %replica_id,
253 lag = progress.lag,
254 "Replica removed from ISR due to lag"
255 );
256 }
257 }
258 }
259
260 drop(replicas);
261
262 self.maybe_advance_hwm().await;
264
265 Ok(isr_changed)
266 }
267
268 pub async fn check_replica_health(&self) -> Vec<NodeId> {
270 if !self.is_leader.load(Ordering::SeqCst) {
271 return vec![];
272 }
273
274 let now = Instant::now();
275 let mut removed = vec![];
276
277 let mut replicas = self.replicas.write().await;
278 let mut isr = self.isr.write().await;
279
280 for (node_id, progress) in replicas.iter_mut() {
281 if progress.in_sync {
282 let since_fetch = now.duration_since(progress.last_fetch);
283
284 if since_fetch > self.config.replica_lag_max_time {
285 progress.in_sync = false;
286 isr.remove(node_id);
287 removed.push(node_id.clone());
288
289 warn!(
290 partition = %self.partition_id,
291 replica = %node_id,
292 lag_time = ?since_fetch,
293 "Replica removed from ISR due to time lag"
294 );
295 }
296 }
297 }
298
299 removed
300 }
301
302 async fn maybe_advance_hwm(&self) {
306 let replicas = self.replicas.read().await;
308 let isr = self.isr.read().await;
309
310 let mut min_leo = self.log_end_offset.load(Ordering::SeqCst);
312
313 for node_id in isr.iter() {
314 if node_id == &self.local_node {
315 continue;
316 }
317 if let Some(progress) = replicas.get(node_id) {
318 min_leo = min_leo.min(progress.log_end_offset);
319 }
320 }
321
322 drop(isr);
324 drop(replicas);
325
326 let prev_hwm = self.high_watermark.fetch_max(min_leo, Ordering::SeqCst);
329 if min_leo > prev_hwm {
330 self.complete_pending_acks(min_leo).await;
332 }
333 }
334
335 pub async fn wait_for_acks(&self, offset: u64, acks: Acks) -> Result<()> {
337 match acks {
338 Acks::None => Ok(()),
339 Acks::Leader => {
340 Ok(())
342 }
343 Acks::All => {
344 let isr = self.isr.read().await;
348 let required = isr.len();
349
350 if required <= 1 {
351 return Ok(());
353 }
354
355 let (tx, rx) = oneshot::channel();
357 let mut acked = HashSet::new();
358 acked.insert(self.local_node.clone());
359
360 self.pending_acks.insert(
361 offset,
362 PendingAck {
363 offset,
364 acked_nodes: acked,
365 required_acks: required,
366 completion: tx,
367 created: Instant::now(),
368 },
369 );
370
371 drop(isr);
374
375 match tokio::time::timeout(Duration::from_secs(30), rx).await {
377 Ok(Ok(result)) => result,
378 Ok(Err(_)) => Err(ClusterError::ChannelClosed),
379 Err(_) => {
380 self.pending_acks.remove(&offset);
381 Err(ClusterError::Timeout)
382 }
383 }
384 }
385 }
386 }
387
388 async fn complete_pending_acks(&self, up_to_offset: u64) {
394 let to_complete: Vec<u64> = self
396 .pending_acks
397 .iter()
398 .filter_map(|e| {
399 if *e.key() <= up_to_offset {
400 Some(*e.key())
401 } else {
402 None
403 }
404 })
405 .collect();
406
407 for offset in to_complete {
409 if let Some((_, pending)) = self.pending_acks.remove(&offset) {
410 let _ = pending.completion.send(Ok(()));
411 }
412 }
413 }
414
415 pub fn high_watermark(&self) -> u64 {
417 self.high_watermark.load(Ordering::SeqCst)
418 }
419
420 pub fn log_end_offset(&self) -> u64 {
422 self.log_end_offset.load(Ordering::SeqCst)
423 }
424
425 pub fn leader_epoch(&self) -> u64 {
427 self.leader_epoch.load(Ordering::SeqCst)
428 }
429
430 pub async fn get_isr(&self) -> HashSet<NodeId> {
432 self.isr.read().await.clone()
433 }
434
435 pub async fn has_min_isr(&self) -> bool {
437 let isr = self.isr.read().await;
438 isr.len() >= self.config.min_isr as usize
439 }
440
441 pub fn cleanup_stale_pending_acks(&self, timeout: Duration) -> usize {
449 let now = Instant::now();
450 let mut cleaned = 0;
451
452 let stale_keys: Vec<u64> = self
454 .pending_acks
455 .iter()
456 .filter_map(|e| {
457 if now.duration_since(e.value().created) >= timeout {
458 Some(*e.key())
459 } else {
460 None
461 }
462 })
463 .collect();
464
465 for key in stale_keys {
467 if let Some((_, pending)) = self.pending_acks.remove(&key) {
468 let _ = pending.completion.send(Err(ClusterError::Timeout));
469 cleaned += 1;
470 }
471 }
472
473 if cleaned > 0 {
474 debug!(
475 partition = %self.partition_id,
476 cleaned = cleaned,
477 "Cleaned up stale pending acks"
478 );
479 }
480
481 cleaned
482 }
483}
484
485pub struct ReplicationManager {
487 local_node: NodeId,
489
490 partitions: DashMap<PartitionId, Arc<PartitionReplication>>,
492
493 config: ReplicationConfig,
495
496 raft_node: Option<Arc<RwLock<crate::raft::RaftNode>>>,
498}
499
500impl ReplicationManager {
501 pub fn new(local_node: NodeId, config: ReplicationConfig) -> Self {
503 Self {
504 local_node,
505 partitions: DashMap::new(),
506 config,
507 raft_node: None,
508 }
509 }
510
511 pub fn set_raft_node(&mut self, raft_node: Arc<RwLock<crate::raft::RaftNode>>) {
513 self.raft_node = Some(raft_node);
514 }
515
516 pub fn get_or_create(
518 &self,
519 partition_id: PartitionId,
520 is_leader: bool,
521 ) -> Arc<PartitionReplication> {
522 self.partitions
523 .entry(partition_id.clone())
524 .or_insert_with(|| {
525 Arc::new(PartitionReplication::new(
526 partition_id,
527 self.local_node.clone(),
528 is_leader,
529 self.config.clone(),
530 ))
531 })
532 .clone()
533 }
534
535 pub fn get(&self, partition_id: &PartitionId) -> Option<Arc<PartitionReplication>> {
537 self.partitions.get(partition_id).map(|e| e.value().clone())
538 }
539
540 pub fn remove(&self, partition_id: &PartitionId) -> Option<Arc<PartitionReplication>> {
542 self.partitions.remove(partition_id).map(|(_, v)| v)
543 }
544
545 pub fn leading_partitions(&self) -> Vec<PartitionId> {
547 self.partitions
548 .iter()
549 .filter(|e| e.value().is_leader.load(Ordering::Relaxed))
550 .map(|e| e.key().clone())
551 .collect()
552 }
553
554 pub async fn handle_replica_fetch(
556 &self,
557 partition_id: &PartitionId,
558 replica_id: &NodeId,
559 fetch_offset: u64,
560 ) -> Result<()> {
561 let partition = self
562 .get(partition_id)
563 .ok_or_else(|| ClusterError::PartitionNotFound {
564 topic: partition_id.topic.clone(),
565 partition: partition_id.partition,
566 })?;
567
568 let isr_changed = partition
569 .handle_replica_fetch(replica_id, fetch_offset)
570 .await?;
571
572 if isr_changed {
574 if let Err(e) = self.propagate_isr_change(partition_id).await {
575 warn!(
576 partition = %partition_id,
577 error = %e,
578 "Failed to propagate ISR change (will retry on next health check)"
579 );
580 }
581 }
582
583 Ok(())
584 }
585
586 pub async fn run_health_checks(&self) {
588 for entry in self.partitions.iter() {
589 let partition = entry.value();
590 if partition.is_leader.load(Ordering::Relaxed) {
591 let removed = partition.check_replica_health().await;
592 if !removed.is_empty() {
593 warn!(
594 partition = %partition.partition_id,
595 removed = ?removed,
596 "Removed replicas from ISR - propagating via Raft"
597 );
598
599 if let Err(e) = self.propagate_isr_change(&partition.partition_id).await {
601 error!(
602 partition = %partition.partition_id,
603 error = %e,
604 "Failed to propagate ISR change via Raft"
605 );
606 }
607 }
608 }
609 }
610 }
611
612 async fn propagate_isr_change(&self, partition_id: &PartitionId) -> Result<()> {
614 let partition = match self.get(partition_id) {
616 Some(p) => p,
617 None => return Ok(()), };
619
620 let isr = partition.isr.read().await;
621 let isr_vec: Vec<NodeId> = isr.iter().cloned().collect();
622 drop(isr);
623
624 if let Some(raft_node) = &self.raft_node {
626 let node = raft_node.read().await;
627
628 let cmd = crate::metadata::MetadataCommand::UpdatePartitionIsr {
629 partition: partition_id.clone(),
630 isr: isr_vec.clone(),
631 leader_epoch: partition.leader_epoch(),
633 };
634
635 match node.propose(cmd).await {
636 Ok(_response) => {
637 info!(
638 partition = %partition_id,
639 isr = ?isr_vec,
640 "ISR change propagated via Raft"
641 );
642 Ok(())
643 }
644 Err(e) => {
645 error!(
646 partition = %partition_id,
647 error = %e,
648 "Failed to propose ISR change to Raft"
649 );
650 Err(e)
651 }
652 }
653 } else {
654 debug!(
655 partition = %partition_id,
656 "No Raft node configured - ISR change local only"
657 );
658 Ok(())
659 }
660 }
661}
662
663pub struct FollowerFetcher {
671 local_node: NodeId,
673
674 partition_id: PartitionId,
676
677 leader_id: NodeId,
679
680 fetch_offset: u64,
682
683 high_watermark: u64,
685
686 transport: Arc<Mutex<crate::Transport>>,
688
689 local_partition: Arc<rivven_core::Partition>,
693
694 config: ReplicationConfig,
696
697 shutdown: tokio::sync::broadcast::Receiver<()>,
699
700 last_fetch: std::time::Instant,
702
703 report_failures: u32,
705}
706
707impl FollowerFetcher {
708 #[allow(clippy::too_many_arguments)]
709 pub fn new(
710 local_node: NodeId,
711 partition_id: PartitionId,
712 leader_id: NodeId,
713 start_offset: u64,
714 transport: Arc<Mutex<crate::Transport>>,
715 local_partition: Arc<rivven_core::Partition>,
716 config: ReplicationConfig,
717 shutdown: tokio::sync::broadcast::Receiver<()>,
718 ) -> Self {
719 Self {
720 local_node,
721 partition_id,
722 leader_id,
723 fetch_offset: start_offset,
724 high_watermark: 0,
725 transport,
726 local_partition,
727 config,
728 shutdown,
729 last_fetch: std::time::Instant::now(),
730 report_failures: 0,
731 }
732 }
733
734 pub async fn run(mut self) -> Result<()> {
736 let mut interval = tokio::time::interval(self.config.fetch_interval);
737
738 loop {
739 tokio::select! {
740 _ = interval.tick() => {
741 if let Err(e) = self.fetch_from_leader().await {
742 error!(
743 partition = %self.partition_id,
744 error = %e,
745 "Fetch from leader failed"
746 );
747 }
748 }
749 _ = self.shutdown.recv() => {
750 info!(partition = %self.partition_id, "Follower fetcher shutting down");
751 break;
752 }
753 }
754 }
755
756 Ok(())
757 }
758
759 async fn fetch_from_leader(&mut self) -> Result<()> {
761 use crate::protocol::{ClusterRequest, ClusterResponse, RequestHeader};
762
763 let header = RequestHeader::new(
765 rand::random(), self.local_node.clone(),
767 );
768
769 let request = ClusterRequest::Fetch {
770 header,
771 partition: self.partition_id.clone(),
772 offset: self.fetch_offset,
773 max_bytes: self.config.fetch_max_bytes,
774 };
775
776 let response = {
778 let transport = self.transport.lock().await;
779 transport.send(&self.leader_id, request).await?
780 };
781
782 match response {
784 ClusterResponse::Fetch {
785 header,
786 partition,
787 high_watermark,
788 log_start_offset: _,
789 records,
790 } => {
791 if !header.is_success() {
792 return Err(ClusterError::Network(format!(
793 "Fetch failed: {}",
794 header.error_message.unwrap_or_default()
795 )));
796 }
797
798 if partition != self.partition_id {
799 return Err(ClusterError::Network(format!(
800 "Partition mismatch: expected {}, got {}",
801 self.partition_id, partition
802 )));
803 }
804
805 if !records.is_empty() {
807 self.apply_records(&records).await?;
808 debug!(
809 partition = %self.partition_id,
810 records_bytes = records.len(),
811 new_offset = self.fetch_offset,
812 hwm = high_watermark,
813 "Applied records from leader"
814 );
815 }
816
817 self.high_watermark = high_watermark;
819 self.last_fetch = std::time::Instant::now();
820
821 self.report_replica_state().await?;
823
824 Ok(())
825 }
826 _ => Err(ClusterError::Network(format!(
827 "Unexpected response type: {:?}",
828 response
829 ))),
830 }
831 }
832
833 async fn apply_records(&mut self, records: &[u8]) -> Result<()> {
842 let mut cursor = 0;
845 let mut messages: Vec<rivven_core::Message> = Vec::new();
846 let mut last_offset: Option<u64> = None;
847
848 while cursor + 4 <= records.len() {
849 let len = u32::from_be_bytes([
850 records[cursor],
851 records[cursor + 1],
852 records[cursor + 2],
853 records[cursor + 3],
854 ]) as usize;
855 cursor += 4;
856
857 if cursor + len > records.len() {
858 tracing::warn!(
859 "Truncated record at byte {} in apply_records (expected {} bytes, have {})",
860 cursor,
861 len,
862 records.len() - cursor
863 );
864 break;
865 }
866
867 match rivven_core::Message::from_bytes(&records[cursor..cursor + len]) {
868 Ok(msg) => {
869 last_offset = Some(msg.offset);
870 messages.push(msg);
871 }
872 Err(e) => {
873 tracing::warn!("Failed to deserialize record at byte {}: {}", cursor, e);
874 break;
875 }
876 }
877 cursor += len;
878 }
879
880 if !messages.is_empty() {
884 self.local_partition
885 .append_replicated_batch(messages)
886 .await
887 .map_err(|e| {
888 ClusterError::Internal(format!(
889 "Failed to persist replicated records to partition {}: {}",
890 self.partition_id, e
891 ))
892 })?;
893 }
894
895 if let Some(offset) = last_offset {
897 self.fetch_offset = offset + 1;
898 }
899
900 Ok(())
901 }
902
903 async fn report_replica_state(&mut self) -> Result<()> {
905 use crate::protocol::{ClusterRequest, RequestHeader};
906
907 let header = RequestHeader::new(rand::random(), self.local_node.clone());
908
909 let request = ClusterRequest::ReplicaState {
910 header,
911 partition: self.partition_id.clone(),
912 log_end_offset: self.fetch_offset,
913 high_watermark: self.high_watermark,
914 };
915
916 let transport = self.transport.lock().await;
917 match transport.send(&self.leader_id, request).await {
918 Ok(_) => {
919 if self.report_failures > 0 {
920 info!(
921 partition = %self.partition_id,
922 previous_failures = self.report_failures,
923 "Replica state report succeeded after previous failures"
924 );
925 }
926 self.report_failures = 0;
927 Ok(())
928 }
929 Err(e) => {
930 self.report_failures += 1;
931 if self.report_failures >= 5 {
932 warn!(
933 partition = %self.partition_id,
934 consecutive_failures = self.report_failures,
935 error = %e,
936 "Replica state report to leader consistently failing — \
937 leader may not update ISR for this replica"
938 );
939 }
940 Ok(())
944 }
945 }
946 }
947
948 pub fn lag(&self) -> u64 {
950 self.high_watermark.saturating_sub(self.fetch_offset)
951 }
952
953 pub fn fetch_age(&self) -> std::time::Duration {
955 self.last_fetch.elapsed()
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use super::*;
962
963 #[tokio::test]
964 async fn test_partition_replication_leader() {
965 let config = ReplicationConfig::default();
966 let partition_id = PartitionId::new("test", 0);
967 let replication =
968 PartitionReplication::new(partition_id.clone(), "node-1".to_string(), false, config);
969
970 replication
972 .become_leader(1, vec!["node-1".to_string(), "node-2".to_string()])
973 .await;
974
975 assert!(replication.is_leader.load(Ordering::Relaxed));
976 assert_eq!(replication.leader_epoch(), 1);
977 }
978
979 #[tokio::test]
980 async fn test_hwm_advancement() {
981 let config = ReplicationConfig {
982 min_isr: 1,
986 ..ReplicationConfig::default()
987 };
988 let partition_id = PartitionId::new("test", 0);
989 let replication = PartitionReplication::new(
990 partition_id.clone(),
991 "node-1".to_string(),
992 false,
993 config.clone(),
994 );
995
996 replication
998 .become_leader(1, vec!["node-1".to_string(), "node-2".to_string()])
999 .await;
1000
1001 replication.record_appended(0).await.unwrap();
1003 replication.record_appended(1).await.unwrap();
1004 replication.record_appended(2).await.unwrap();
1005
1006 assert_eq!(replication.high_watermark(), 0);
1009
1010 replication
1012 .handle_replica_fetch(&"node-2".to_string(), 2)
1013 .await
1014 .unwrap();
1015
1016 let isr = replication.get_isr().await;
1019 assert!(isr.contains("node-2"));
1020 assert_eq!(replication.high_watermark(), 2);
1021
1022 replication
1024 .handle_replica_fetch(&"node-2".to_string(), 3)
1025 .await
1026 .unwrap();
1027
1028 assert_eq!(replication.high_watermark(), 3);
1030 }
1031
1032 #[tokio::test]
1033 async fn test_acks_none() {
1034 let config = ReplicationConfig::default();
1035 let partition_id = PartitionId::new("test", 0);
1036 let replication =
1037 PartitionReplication::new(partition_id, "node-1".to_string(), true, config);
1038
1039 let result = replication.wait_for_acks(100, Acks::None).await;
1041 assert!(result.is_ok());
1042 }
1043
1044 #[tokio::test]
1045 async fn test_follower_fetcher_lag_tracking() {
1046 use crate::Transport;
1047
1048 let config = ReplicationConfig::default();
1049 let partition_id = PartitionId::new("test", 0);
1050 let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
1051
1052 let transport_config = crate::TransportConfig::default();
1054 let transport = Transport::new(
1055 "follower-1".into(),
1056 "127.0.0.1:9093".parse().unwrap(),
1057 transport_config,
1058 );
1059
1060 let core_config = rivven_core::Config {
1062 data_dir: format!("/tmp/rivven-test-follower-{}", uuid::Uuid::new_v4()),
1063 ..Default::default()
1064 };
1065 let local_partition = Arc::new(
1066 rivven_core::Partition::new(&core_config, "test", 0)
1067 .await
1068 .unwrap(),
1069 );
1070
1071 let fetcher = FollowerFetcher::new(
1072 "follower-1".to_string(),
1073 partition_id,
1074 "leader-1".to_string(),
1075 0,
1076 Arc::new(Mutex::new(transport)),
1077 local_partition,
1078 config,
1079 shutdown_rx,
1080 );
1081
1082 assert_eq!(fetcher.fetch_offset, 0);
1084 assert_eq!(fetcher.high_watermark, 0);
1085 assert_eq!(fetcher.lag(), 0);
1086
1087 drop(shutdown_tx);
1089 let _ = std::fs::remove_dir_all(&core_config.data_dir);
1090 }
1091
1092 #[tokio::test]
1093 async fn test_replication_manager_partition_tracking() {
1094 let config = ReplicationConfig::default();
1095 let manager = ReplicationManager::new("node-1".to_string(), config);
1096
1097 let partition_id1 = PartitionId::new("topic-1", 0);
1098 let partition_id2 = PartitionId::new("topic-1", 1);
1099
1100 manager.get_or_create(partition_id1.clone(), true);
1102 manager.get_or_create(partition_id2.clone(), false);
1103
1104 assert_eq!(manager.leading_partitions().len(), 1);
1106 assert!(manager.get(&partition_id1).is_some());
1107 assert!(manager.get(&partition_id2).is_some());
1108
1109 manager.remove(&partition_id1);
1111 assert!(manager.get(&partition_id1).is_none());
1112 assert_eq!(manager.leading_partitions().len(), 0);
1113 }
1114}