Skip to main content

rivven_cluster/
replication.rs

1//! ISR (In-Sync Replica) replication implementation
2//!
3//! This module implements Kafka-style ISR replication:
4//! - Leaders handle all reads and writes
5//! - Followers fetch from leaders and replicate
6//! - High watermark tracks committed offsets
7//! - ISR tracks which replicas are caught up
8//!
9//! # Ack Modes
10//!
11//! - `acks=0`: Fire and forget (no durability guarantee)
12//! - `acks=1`: Leader acknowledgment (data written to leader)
13//! - `acks=all`: All ISR acknowledgment (full durability)
14//!
15//! # Lock Ordering
16//!
17//! When acquiring multiple locks in this module, always follow this order
18//! to prevent deadlocks:
19//!
20//! 1. `replicas` lock (RwLock)
21//! 2. `isr` lock (RwLock)
22//!
23//! Never acquire `isr` before `replicas` in any code path.
24
25use crate::config::ReplicationConfig;
26use crate::error::{ClusterError, Result};
27use crate::node::NodeId;
28use crate::partition::PartitionId;
29use crate::protocol::Acks;
30use dashmap::DashMap;
31use std::collections::{HashMap, HashSet};
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::sync::{oneshot, Mutex, RwLock};
36use tracing::{debug, error, info, warn};
37
38/// Replication state for a single partition
39#[derive(Debug)]
40pub struct PartitionReplication {
41    /// Partition identifier
42    pub partition_id: PartitionId,
43
44    /// Our node ID
45    local_node: NodeId,
46
47    /// Whether we are the leader (AtomicBool for safe concurrent access through Arc)
48    is_leader: AtomicBool,
49
50    /// Current leader epoch
51    leader_epoch: AtomicU64,
52
53    /// Log end offset (latest message written)
54    log_end_offset: AtomicU64,
55
56    /// High watermark (committed offset)
57    high_watermark: AtomicU64,
58
59    /// Replica states (only tracked by leader)
60    replicas: RwLock<HashMap<NodeId, ReplicaProgress>>,
61
62    /// In-sync replica set
63    isr: RwLock<HashSet<NodeId>>,
64
65    /// Pending acks waiting for replication
66    pending_acks: DashMap<u64, PendingAck>,
67
68    /// Configuration
69    config: ReplicationConfig,
70}
71
72/// Progress tracking for a replica
73#[derive(Debug, Clone)]
74pub struct ReplicaProgress {
75    /// Node ID
76    pub node_id: NodeId,
77
78    /// Log end offset reported by replica
79    pub log_end_offset: u64,
80
81    /// Last fetch time
82    pub last_fetch: Instant,
83
84    /// Whether replica is in sync
85    pub in_sync: bool,
86
87    /// Lag in messages
88    pub lag: u64,
89}
90
91/// Pending acknowledgment for acks=all writes
92///
93/// This struct tracks in-flight writes awaiting acknowledgment from replicas.
94/// Fields are used by the replication protocol during write completion.
95#[derive(Debug)]
96#[allow(dead_code)]
97struct PendingAck {
98    /// Offset being acknowledged
99    offset: u64,
100    /// Nodes that have acknowledged
101    acked_nodes: HashSet<NodeId>,
102    /// Required ack count
103    required_acks: usize,
104    /// Completion sender
105    completion: oneshot::Sender<Result<()>>,
106    /// Created time for timeout
107    created: Instant,
108}
109
110impl PartitionReplication {
111    /// Create new partition replication state
112    pub fn new(
113        partition_id: PartitionId,
114        local_node: NodeId,
115        is_leader: bool,
116        config: ReplicationConfig,
117    ) -> Self {
118        Self {
119            partition_id,
120            local_node,
121            is_leader: AtomicBool::new(is_leader),
122            leader_epoch: AtomicU64::new(0),
123            log_end_offset: AtomicU64::new(0),
124            high_watermark: AtomicU64::new(0),
125            replicas: RwLock::new(HashMap::new()),
126            isr: RwLock::new(HashSet::new()),
127            pending_acks: DashMap::new(),
128            config,
129        }
130    }
131
132    /// Become leader for this partition
133    pub async fn become_leader(&self, epoch: u64, replicas: Vec<NodeId>) {
134        self.is_leader.store(true, Ordering::SeqCst);
135        self.leader_epoch.store(epoch, Ordering::SeqCst);
136
137        // Initialize replica tracking
138        let mut replica_map = self.replicas.write().await;
139        replica_map.clear();
140
141        for node_id in &replicas {
142            if node_id != &self.local_node {
143                replica_map.insert(
144                    node_id.clone(),
145                    ReplicaProgress {
146                        node_id: node_id.clone(),
147                        log_end_offset: 0,
148                        last_fetch: Instant::now(),
149                        in_sync: true,
150                        lag: 0,
151                    },
152                );
153            }
154        }
155
156        // Initialize ISR with all replicas (including ourselves).
157        // Replicas that participated in the leader election are assumed
158        // healthy; the normal ISR-shrink mechanism will remove any that
159        // subsequently fall behind.
160        let mut isr = self.isr.write().await;
161        isr.clear();
162        for node_id in &replicas {
163            isr.insert(node_id.clone());
164        }
165        isr.insert(self.local_node.clone());
166
167        info!(
168            partition = %self.partition_id,
169            epoch = epoch,
170            replicas = replicas.len(),
171            "Became partition leader"
172        );
173    }
174
175    /// Become follower for this partition
176    pub fn become_follower(&self, epoch: u64) {
177        self.is_leader.store(false, Ordering::SeqCst);
178        self.leader_epoch.store(epoch, Ordering::SeqCst);
179
180        info!(
181            partition = %self.partition_id,
182            epoch = epoch,
183            "Became partition follower"
184        );
185    }
186
187    /// Record appended locally (called by storage layer)
188    pub async fn record_appended(&self, offset: u64) -> Result<()> {
189        // Enforce min-ISR before accepting writes.
190        // If the ISR has fallen below the configured minimum, reject the write
191        // to prevent data loss from under-replicated partitions.
192        let is_leader = self.is_leader.load(Ordering::SeqCst);
193        if is_leader && !self.has_min_isr().await {
194            return Err(ClusterError::NotEnoughIsr {
195                required: self.config.min_isr,
196                current: self.isr.read().await.len() as u16,
197            });
198        }
199
200        // Update our log end offset (fetch_max prevents regression under concurrent appends)
201        self.log_end_offset.fetch_max(offset + 1, Ordering::SeqCst);
202
203        // If we're the leader, check if we can advance HWM
204        if is_leader {
205            self.maybe_advance_hwm().await;
206        }
207
208        Ok(())
209    }
210
211    /// Handle replica fetch and update progress
212    /// Returns true if ISR changed
213    pub async fn handle_replica_fetch(
214        &self,
215        replica_id: &NodeId,
216        fetch_offset: u64,
217    ) -> Result<bool> {
218        if !self.is_leader.load(Ordering::SeqCst) {
219            return Err(ClusterError::NotLeader { leader: None });
220        }
221
222        let mut isr_changed = false;
223        let mut replicas = self.replicas.write().await;
224
225        if let Some(progress) = replicas.get_mut(replica_id) {
226            progress.last_fetch = Instant::now();
227            progress.log_end_offset = fetch_offset;
228
229            let leader_leo = self.log_end_offset.load(Ordering::SeqCst);
230            progress.lag = leader_leo.saturating_sub(fetch_offset);
231
232            // Check if replica should be in ISR
233            let should_be_in_sync = progress.lag <= self.config.replica_lag_max_messages;
234
235            if should_be_in_sync != progress.in_sync {
236                progress.in_sync = should_be_in_sync;
237                isr_changed = true;
238
239                // Update ISR
240                let mut isr = self.isr.write().await;
241                if should_be_in_sync {
242                    isr.insert(replica_id.clone());
243                    info!(
244                        partition = %self.partition_id,
245                        replica = %replica_id,
246                        "Replica joined ISR"
247                    );
248                } else {
249                    isr.remove(replica_id);
250                    warn!(
251                        partition = %self.partition_id,
252                        replica = %replica_id,
253                        lag = progress.lag,
254                        "Replica removed from ISR due to lag"
255                    );
256                }
257            }
258        }
259
260        drop(replicas);
261
262        // Try to advance HWM
263        self.maybe_advance_hwm().await;
264
265        Ok(isr_changed)
266    }
267
268    /// Check for lagging replicas and remove from ISR
269    pub async fn check_replica_health(&self) -> Vec<NodeId> {
270        if !self.is_leader.load(Ordering::SeqCst) {
271            return vec![];
272        }
273
274        let now = Instant::now();
275        let mut removed = vec![];
276
277        let mut replicas = self.replicas.write().await;
278        let mut isr = self.isr.write().await;
279
280        for (node_id, progress) in replicas.iter_mut() {
281            if progress.in_sync {
282                let since_fetch = now.duration_since(progress.last_fetch);
283
284                if since_fetch > self.config.replica_lag_max_time {
285                    progress.in_sync = false;
286                    isr.remove(node_id);
287                    removed.push(node_id.clone());
288
289                    warn!(
290                        partition = %self.partition_id,
291                        replica = %node_id,
292                        lag_time = ?since_fetch,
293                        "Replica removed from ISR due to time lag"
294                    );
295                }
296            }
297        }
298
299        removed
300    }
301
302    /// Maybe advance the high watermark
303    ///
304    /// Note: Acquires locks in order: replicas -> isr (following module lock ordering)
305    async fn maybe_advance_hwm(&self) {
306        // Acquire replicas first to follow lock ordering (replicas -> isr)
307        let replicas = self.replicas.read().await;
308        let isr = self.isr.read().await;
309
310        // HWM is the minimum LEO across all ISR members
311        let mut min_leo = self.log_end_offset.load(Ordering::SeqCst);
312
313        for node_id in isr.iter() {
314            if node_id == &self.local_node {
315                continue;
316            }
317            if let Some(progress) = replicas.get(node_id) {
318                min_leo = min_leo.min(progress.log_end_offset);
319            }
320        }
321
322        // Drop locks before potentially acquiring more locks in complete_pending_acks
323        drop(isr);
324        drop(replicas);
325
326        // Use fetch_max to atomically advance HWM without regression.
327        // load+store was racy — two concurrent callers could regress HWM.
328        let prev_hwm = self.high_watermark.fetch_max(min_leo, Ordering::SeqCst);
329        if min_leo > prev_hwm {
330            // Complete any pending acks
331            self.complete_pending_acks(min_leo).await;
332        }
333    }
334
335    /// Wait for replication based on ack mode
336    pub async fn wait_for_acks(&self, offset: u64, acks: Acks) -> Result<()> {
337        match acks {
338            Acks::None => Ok(()),
339            Acks::Leader => {
340                // Just need local write, which is done
341                Ok(())
342            }
343            Acks::All => {
344                // Read ISR size and insert pending ack atomically
345                // under the same lock to prevent TOCTOU race where ISR changes
346                // between reading the count and inserting the ack.
347                let isr = self.isr.read().await;
348                let required = isr.len();
349
350                if required <= 1 {
351                    // Only leader in ISR, done
352                    return Ok(());
353                }
354
355                // Create pending ack while still holding ISR lock
356                let (tx, rx) = oneshot::channel();
357                let mut acked = HashSet::new();
358                acked.insert(self.local_node.clone());
359
360                self.pending_acks.insert(
361                    offset,
362                    PendingAck {
363                        offset,
364                        acked_nodes: acked,
365                        required_acks: required,
366                        completion: tx,
367                        created: Instant::now(),
368                    },
369                );
370
371                // Now safe to drop ISR lock — the pending ack's required count
372                // is consistent with the ISR snapshot taken above.
373                drop(isr);
374
375                // Wait for completion or timeout
376                match tokio::time::timeout(Duration::from_secs(30), rx).await {
377                    Ok(Ok(result)) => result,
378                    Ok(Err(_)) => Err(ClusterError::ChannelClosed),
379                    Err(_) => {
380                        self.pending_acks.remove(&offset);
381                        Err(ClusterError::Timeout)
382                    }
383                }
384            }
385        }
386    }
387
388    /// Complete pending acks up to the given offset.
389    ///
390    /// Collects completeable offsets in a single scan, then batch-removes
391    /// and sends completion signals. This avoids holding DashMap shard locks
392    /// while executing `oneshot::send()`.
393    async fn complete_pending_acks(&self, up_to_offset: u64) {
394        // Phase 1: Collect keys to complete (single scan, shard locks held briefly)
395        let to_complete: Vec<u64> = self
396            .pending_acks
397            .iter()
398            .filter_map(|e| {
399                if *e.key() <= up_to_offset {
400                    Some(*e.key())
401                } else {
402                    None
403                }
404            })
405            .collect();
406
407        // Phase 2: Remove and signal (no shard locks held during send)
408        for offset in to_complete {
409            if let Some((_, pending)) = self.pending_acks.remove(&offset) {
410                let _ = pending.completion.send(Ok(()));
411            }
412        }
413    }
414
415    /// Get current high watermark
416    pub fn high_watermark(&self) -> u64 {
417        self.high_watermark.load(Ordering::SeqCst)
418    }
419
420    /// Get current log end offset
421    pub fn log_end_offset(&self) -> u64 {
422        self.log_end_offset.load(Ordering::SeqCst)
423    }
424
425    /// Get current leader epoch
426    pub fn leader_epoch(&self) -> u64 {
427        self.leader_epoch.load(Ordering::SeqCst)
428    }
429
430    /// Get current ISR
431    pub async fn get_isr(&self) -> HashSet<NodeId> {
432        self.isr.read().await.clone()
433    }
434
435    /// Check if we have enough ISR for writes
436    pub async fn has_min_isr(&self) -> bool {
437        let isr = self.isr.read().await;
438        isr.len() >= self.config.min_isr as usize
439    }
440
441    /// Clean up stale pending acks that have exceeded the timeout
442    ///
443    /// Sends `Err(Timeout)` to waiters before removing stale entries, so callers
444    /// get a clear timeout error instead of a misleading `ChannelClosed`.
445    /// This should be called periodically to prevent memory leaks from
446    /// pending acks that were never completed (e.g., due to network partitions).
447    /// Returns the number of cleaned up entries.
448    pub fn cleanup_stale_pending_acks(&self, timeout: Duration) -> usize {
449        let now = Instant::now();
450        let mut cleaned = 0;
451
452        // Phase 1: Collect stale keys
453        let stale_keys: Vec<u64> = self
454            .pending_acks
455            .iter()
456            .filter_map(|e| {
457                if now.duration_since(e.value().created) >= timeout {
458                    Some(*e.key())
459                } else {
460                    None
461                }
462            })
463            .collect();
464
465        // Phase 2: Remove and signal timeout (no shard locks held during send)
466        for key in stale_keys {
467            if let Some((_, pending)) = self.pending_acks.remove(&key) {
468                let _ = pending.completion.send(Err(ClusterError::Timeout));
469                cleaned += 1;
470            }
471        }
472
473        if cleaned > 0 {
474            debug!(
475                partition = %self.partition_id,
476                cleaned = cleaned,
477                "Cleaned up stale pending acks"
478            );
479        }
480
481        cleaned
482    }
483}
484
485/// Manages replication across all partitions
486pub struct ReplicationManager {
487    /// Our node ID
488    local_node: NodeId,
489
490    /// Per-partition replication state
491    partitions: DashMap<PartitionId, Arc<PartitionReplication>>,
492
493    /// Configuration
494    config: ReplicationConfig,
495
496    /// Raft node for ISR propagation (optional, set when integrated with coordinator)
497    raft_node: Option<Arc<RwLock<crate::raft::RaftNode>>>,
498}
499
500impl ReplicationManager {
501    /// Create new replication manager
502    pub fn new(local_node: NodeId, config: ReplicationConfig) -> Self {
503        Self {
504            local_node,
505            partitions: DashMap::new(),
506            config,
507            raft_node: None,
508        }
509    }
510
511    /// Set Raft node for ISR propagation
512    pub fn set_raft_node(&mut self, raft_node: Arc<RwLock<crate::raft::RaftNode>>) {
513        self.raft_node = Some(raft_node);
514    }
515
516    /// Get or create partition replication state
517    pub fn get_or_create(
518        &self,
519        partition_id: PartitionId,
520        is_leader: bool,
521    ) -> Arc<PartitionReplication> {
522        self.partitions
523            .entry(partition_id.clone())
524            .or_insert_with(|| {
525                Arc::new(PartitionReplication::new(
526                    partition_id,
527                    self.local_node.clone(),
528                    is_leader,
529                    self.config.clone(),
530                ))
531            })
532            .clone()
533    }
534
535    /// Get partition replication state
536    pub fn get(&self, partition_id: &PartitionId) -> Option<Arc<PartitionReplication>> {
537        self.partitions.get(partition_id).map(|e| e.value().clone())
538    }
539
540    /// Remove partition replication state
541    pub fn remove(&self, partition_id: &PartitionId) -> Option<Arc<PartitionReplication>> {
542        self.partitions.remove(partition_id).map(|(_, v)| v)
543    }
544
545    /// Get all partitions we're leading
546    pub fn leading_partitions(&self) -> Vec<PartitionId> {
547        self.partitions
548            .iter()
549            .filter(|e| e.value().is_leader.load(Ordering::Relaxed))
550            .map(|e| e.key().clone())
551            .collect()
552    }
553
554    /// Handle replica fetch and propagate ISR changes if needed
555    pub async fn handle_replica_fetch(
556        &self,
557        partition_id: &PartitionId,
558        replica_id: &NodeId,
559        fetch_offset: u64,
560    ) -> Result<()> {
561        let partition = self
562            .get(partition_id)
563            .ok_or_else(|| ClusterError::PartitionNotFound {
564                topic: partition_id.topic.clone(),
565                partition: partition_id.partition,
566            })?;
567
568        let isr_changed = partition
569            .handle_replica_fetch(replica_id, fetch_offset)
570            .await?;
571
572        // Propagate ISR change to cluster via Raft
573        if isr_changed {
574            if let Err(e) = self.propagate_isr_change(partition_id).await {
575                warn!(
576                    partition = %partition_id,
577                    error = %e,
578                    "Failed to propagate ISR change (will retry on next health check)"
579                );
580            }
581        }
582
583        Ok(())
584    }
585
586    /// Run periodic health checks
587    pub async fn run_health_checks(&self) {
588        for entry in self.partitions.iter() {
589            let partition = entry.value();
590            if partition.is_leader.load(Ordering::Relaxed) {
591                let removed = partition.check_replica_health().await;
592                if !removed.is_empty() {
593                    warn!(
594                        partition = %partition.partition_id,
595                        removed = ?removed,
596                        "Removed replicas from ISR - propagating via Raft"
597                    );
598
599                    // Propagate ISR change to cluster via Raft
600                    if let Err(e) = self.propagate_isr_change(&partition.partition_id).await {
601                        error!(
602                            partition = %partition.partition_id,
603                            error = %e,
604                            "Failed to propagate ISR change via Raft"
605                        );
606                    }
607                }
608            }
609        }
610    }
611
612    /// Propagate ISR change to cluster via Raft
613    async fn propagate_isr_change(&self, partition_id: &PartitionId) -> Result<()> {
614        // Get current ISR
615        let partition = match self.get(partition_id) {
616            Some(p) => p,
617            None => return Ok(()), // Partition removed
618        };
619
620        let isr = partition.isr.read().await;
621        let isr_vec: Vec<NodeId> = isr.iter().cloned().collect();
622        drop(isr);
623
624        // Send via Raft if available
625        if let Some(raft_node) = &self.raft_node {
626            let node = raft_node.read().await;
627
628            let cmd = crate::metadata::MetadataCommand::UpdatePartitionIsr {
629                partition: partition_id.clone(),
630                isr: isr_vec.clone(),
631                // attach current leader epoch for fencing
632                leader_epoch: partition.leader_epoch(),
633            };
634
635            match node.propose(cmd).await {
636                Ok(_response) => {
637                    info!(
638                        partition = %partition_id,
639                        isr = ?isr_vec,
640                        "ISR change propagated via Raft"
641                    );
642                    Ok(())
643                }
644                Err(e) => {
645                    error!(
646                        partition = %partition_id,
647                        error = %e,
648                        "Failed to propose ISR change to Raft"
649                    );
650                    Err(e)
651                }
652            }
653        } else {
654            debug!(
655                partition = %partition_id,
656                "No Raft node configured - ISR change local only"
657            );
658            Ok(())
659        }
660    }
661}
662
663/// Follower fetcher that pulls data from leader
664///
665/// Implements ISR (In-Sync Replica) follower logic:
666/// 1. Periodically fetches records from partition leader
667/// 2. Applies records to local storage via the local `Partition`
668/// 3. Reports replica state back to leader
669/// 4. Leader tracks follower progress and updates ISR
670pub struct FollowerFetcher {
671    /// Our node ID (used for replica identification in fetch requests)
672    local_node: NodeId,
673
674    /// Partition we're fetching for
675    partition_id: PartitionId,
676
677    /// Leader node ID
678    leader_id: NodeId,
679
680    /// Current fetch offset
681    fetch_offset: u64,
682
683    /// High watermark (committed offset)
684    high_watermark: u64,
685
686    /// Transport for network communication
687    transport: Arc<Mutex<crate::Transport>>,
688
689    /// Local partition storage for persisting replicated data.
690    /// Without this, followers would track offsets without actually writing
691    /// data — causing complete data loss on leader failover.
692    local_partition: Arc<rivven_core::Partition>,
693
694    /// Configuration
695    config: ReplicationConfig,
696
697    /// Shutdown signal
698    shutdown: tokio::sync::broadcast::Receiver<()>,
699
700    /// Last successful fetch timestamp
701    last_fetch: std::time::Instant,
702
703    /// Consecutive report failures (retry with backoff)
704    report_failures: u32,
705}
706
707impl FollowerFetcher {
708    #[allow(clippy::too_many_arguments)]
709    pub fn new(
710        local_node: NodeId,
711        partition_id: PartitionId,
712        leader_id: NodeId,
713        start_offset: u64,
714        transport: Arc<Mutex<crate::Transport>>,
715        local_partition: Arc<rivven_core::Partition>,
716        config: ReplicationConfig,
717        shutdown: tokio::sync::broadcast::Receiver<()>,
718    ) -> Self {
719        Self {
720            local_node,
721            partition_id,
722            leader_id,
723            fetch_offset: start_offset,
724            high_watermark: 0,
725            transport,
726            local_partition,
727            config,
728            shutdown,
729            last_fetch: std::time::Instant::now(),
730            report_failures: 0,
731        }
732    }
733
734    /// Run the fetcher loop
735    pub async fn run(mut self) -> Result<()> {
736        let mut interval = tokio::time::interval(self.config.fetch_interval);
737
738        loop {
739            tokio::select! {
740                _ = interval.tick() => {
741                    if let Err(e) = self.fetch_from_leader().await {
742                        error!(
743                            partition = %self.partition_id,
744                            error = %e,
745                            "Fetch from leader failed"
746                        );
747                    }
748                }
749                _ = self.shutdown.recv() => {
750                    info!(partition = %self.partition_id, "Follower fetcher shutting down");
751                    break;
752                }
753            }
754        }
755
756        Ok(())
757    }
758
759    /// Fetch records from leader and apply them locally
760    async fn fetch_from_leader(&mut self) -> Result<()> {
761        use crate::protocol::{ClusterRequest, ClusterResponse, RequestHeader};
762
763        // Build fetch request
764        let header = RequestHeader::new(
765            rand::random(), // correlation_id
766            self.local_node.clone(),
767        );
768
769        let request = ClusterRequest::Fetch {
770            header,
771            partition: self.partition_id.clone(),
772            offset: self.fetch_offset,
773            max_bytes: self.config.fetch_max_bytes,
774        };
775
776        // Send to leader
777        let response = {
778            let transport = self.transport.lock().await;
779            transport.send(&self.leader_id, request).await?
780        };
781
782        // Handle response
783        match response {
784            ClusterResponse::Fetch {
785                header,
786                partition,
787                high_watermark,
788                log_start_offset: _,
789                records,
790            } => {
791                if !header.is_success() {
792                    return Err(ClusterError::Network(format!(
793                        "Fetch failed: {}",
794                        header.error_message.unwrap_or_default()
795                    )));
796                }
797
798                if partition != self.partition_id {
799                    return Err(ClusterError::Network(format!(
800                        "Partition mismatch: expected {}, got {}",
801                        self.partition_id, partition
802                    )));
803                }
804
805                // Apply fetched records
806                if !records.is_empty() {
807                    self.apply_records(&records).await?;
808                    debug!(
809                        partition = %self.partition_id,
810                        records_bytes = records.len(),
811                        new_offset = self.fetch_offset,
812                        hwm = high_watermark,
813                        "Applied records from leader"
814                    );
815                }
816
817                // Update high watermark
818                self.high_watermark = high_watermark;
819                self.last_fetch = std::time::Instant::now();
820
821                // Report our state back to leader
822                self.report_replica_state().await?;
823
824                Ok(())
825            }
826            _ => Err(ClusterError::Network(format!(
827                "Unexpected response type: {:?}",
828                response
829            ))),
830        }
831    }
832
833    /// Apply fetched records to local storage.
834    ///
835    /// Deserializes each record from the leader's response and persists it
836    /// to the local `Partition` via `append_replicated_batch`, which writes
837    /// to the log segment preserving the leader-assigned offset.
838    ///
839    /// The fetch_offset is advanced only AFTER successful persistence,
840    /// guaranteeing at-least-once delivery to local storage.
841    async fn apply_records(&mut self, records: &[u8]) -> Result<()> {
842        // Properly deserialize the record batch.
843        // Each record is length-prefixed (4-byte BE u32).
844        let mut cursor = 0;
845        let mut messages: Vec<rivven_core::Message> = Vec::new();
846        let mut last_offset: Option<u64> = None;
847
848        while cursor + 4 <= records.len() {
849            let len = u32::from_be_bytes([
850                records[cursor],
851                records[cursor + 1],
852                records[cursor + 2],
853                records[cursor + 3],
854            ]) as usize;
855            cursor += 4;
856
857            if cursor + len > records.len() {
858                tracing::warn!(
859                    "Truncated record at byte {} in apply_records (expected {} bytes, have {})",
860                    cursor,
861                    len,
862                    records.len() - cursor
863                );
864                break;
865            }
866
867            match rivven_core::Message::from_bytes(&records[cursor..cursor + len]) {
868                Ok(msg) => {
869                    last_offset = Some(msg.offset);
870                    messages.push(msg);
871                }
872                Err(e) => {
873                    tracing::warn!("Failed to deserialize record at byte {}: {}", cursor, e);
874                    break;
875                }
876            }
877            cursor += len;
878        }
879
880        // Persist to local storage BEFORE advancing fetch_offset.
881        // This ensures that if we crash after persisting but before reporting,
882        // we'll re-fetch (idempotent) rather than lose data.
883        if !messages.is_empty() {
884            self.local_partition
885                .append_replicated_batch(messages)
886                .await
887                .map_err(|e| {
888                    ClusterError::Internal(format!(
889                        "Failed to persist replicated records to partition {}: {}",
890                        self.partition_id, e
891                    ))
892                })?;
893        }
894
895        // Advance fetch_offset to one past the last successfully persisted record
896        if let Some(offset) = last_offset {
897            self.fetch_offset = offset + 1;
898        }
899
900        Ok(())
901    }
902
903    /// Report replica state to leader (retry with backoff + logging)
904    async fn report_replica_state(&mut self) -> Result<()> {
905        use crate::protocol::{ClusterRequest, RequestHeader};
906
907        let header = RequestHeader::new(rand::random(), self.local_node.clone());
908
909        let request = ClusterRequest::ReplicaState {
910            header,
911            partition: self.partition_id.clone(),
912            log_end_offset: self.fetch_offset,
913            high_watermark: self.high_watermark,
914        };
915
916        let transport = self.transport.lock().await;
917        match transport.send(&self.leader_id, request).await {
918            Ok(_) => {
919                if self.report_failures > 0 {
920                    info!(
921                        partition = %self.partition_id,
922                        previous_failures = self.report_failures,
923                        "Replica state report succeeded after previous failures"
924                    );
925                }
926                self.report_failures = 0;
927                Ok(())
928            }
929            Err(e) => {
930                self.report_failures += 1;
931                if self.report_failures >= 5 {
932                    warn!(
933                        partition = %self.partition_id,
934                        consecutive_failures = self.report_failures,
935                        error = %e,
936                        "Replica state report to leader consistently failing — \
937                         leader may not update ISR for this replica"
938                    );
939                }
940                // Still return Ok — report failure should not stop the fetch loop.
941                // The leader will eventually notice the stale last_fetch and may
942                // shrink the ISR, which is the correct self-healing behavior.
943                Ok(())
944            }
945        }
946    }
947
948    /// Get current replication lag (bytes behind leader)
949    pub fn lag(&self) -> u64 {
950        self.high_watermark.saturating_sub(self.fetch_offset)
951    }
952
953    /// Get time since last successful fetch
954    pub fn fetch_age(&self) -> std::time::Duration {
955        self.last_fetch.elapsed()
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use super::*;
962
963    #[tokio::test]
964    async fn test_partition_replication_leader() {
965        let config = ReplicationConfig::default();
966        let partition_id = PartitionId::new("test", 0);
967        let replication =
968            PartitionReplication::new(partition_id.clone(), "node-1".to_string(), false, config);
969
970        // Become leader
971        replication
972            .become_leader(1, vec!["node-1".to_string(), "node-2".to_string()])
973            .await;
974
975        assert!(replication.is_leader.load(Ordering::Relaxed));
976        assert_eq!(replication.leader_epoch(), 1);
977    }
978
979    #[tokio::test]
980    async fn test_hwm_advancement() {
981        let config = ReplicationConfig {
982            // Use min_isr=1 so writes succeed with only the leader in ISR.
983            // The test verifies HWM advancement after replica catch-up, not
984            // min-ISR enforcement (covered by test_min_isr_enforcement).
985            min_isr: 1,
986            ..ReplicationConfig::default()
987        };
988        let partition_id = PartitionId::new("test", 0);
989        let replication = PartitionReplication::new(
990            partition_id.clone(),
991            "node-1".to_string(),
992            false,
993            config.clone(),
994        );
995
996        // Become leader with replicas
997        replication
998            .become_leader(1, vec!["node-1".to_string(), "node-2".to_string()])
999            .await;
1000
1001        // Write some records
1002        replication.record_appended(0).await.unwrap();
1003        replication.record_appended(1).await.unwrap();
1004        replication.record_appended(2).await.unwrap();
1005
1006        // HWM should be 0: all replicas are in ISR from the start, and
1007        // node-2 has not fetched yet (LEO = 0), so min(LEO) across ISR = 0.
1008        assert_eq!(replication.high_watermark(), 0);
1009
1010        // Replica fetches up to offset 2 (LEO = 2)
1011        replication
1012            .handle_replica_fetch(&"node-2".to_string(), 2)
1013            .await
1014            .unwrap();
1015
1016        // node-2 is still in ISR (lag = 1, within replica_lag_max_messages).
1017        // HWM advances to min(3, 2) = 2.
1018        let isr = replication.get_isr().await;
1019        assert!(isr.contains("node-2"));
1020        assert_eq!(replication.high_watermark(), 2);
1021
1022        // Replica catches up fully (LEO = 3)
1023        replication
1024            .handle_replica_fetch(&"node-2".to_string(), 3)
1025            .await
1026            .unwrap();
1027
1028        // HWM advances to min(3, 3) = 3
1029        assert_eq!(replication.high_watermark(), 3);
1030    }
1031
1032    #[tokio::test]
1033    async fn test_acks_none() {
1034        let config = ReplicationConfig::default();
1035        let partition_id = PartitionId::new("test", 0);
1036        let replication =
1037            PartitionReplication::new(partition_id, "node-1".to_string(), true, config);
1038
1039        // acks=none should return immediately
1040        let result = replication.wait_for_acks(100, Acks::None).await;
1041        assert!(result.is_ok());
1042    }
1043
1044    #[tokio::test]
1045    async fn test_follower_fetcher_lag_tracking() {
1046        use crate::Transport;
1047
1048        let config = ReplicationConfig::default();
1049        let partition_id = PartitionId::new("test", 0);
1050        let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
1051
1052        // Create mock transport
1053        let transport_config = crate::TransportConfig::default();
1054        let transport = Transport::new(
1055            "follower-1".into(),
1056            "127.0.0.1:9093".parse().unwrap(),
1057            transport_config,
1058        );
1059
1060        // Create local partition for the follower
1061        let core_config = rivven_core::Config {
1062            data_dir: format!("/tmp/rivven-test-follower-{}", uuid::Uuid::new_v4()),
1063            ..Default::default()
1064        };
1065        let local_partition = Arc::new(
1066            rivven_core::Partition::new(&core_config, "test", 0)
1067                .await
1068                .unwrap(),
1069        );
1070
1071        let fetcher = FollowerFetcher::new(
1072            "follower-1".to_string(),
1073            partition_id,
1074            "leader-1".to_string(),
1075            0,
1076            Arc::new(Mutex::new(transport)),
1077            local_partition,
1078            config,
1079            shutdown_rx,
1080        );
1081
1082        // Verify initial state
1083        assert_eq!(fetcher.fetch_offset, 0);
1084        assert_eq!(fetcher.high_watermark, 0);
1085        assert_eq!(fetcher.lag(), 0);
1086
1087        // Cleanup
1088        drop(shutdown_tx);
1089        let _ = std::fs::remove_dir_all(&core_config.data_dir);
1090    }
1091
1092    #[tokio::test]
1093    async fn test_replication_manager_partition_tracking() {
1094        let config = ReplicationConfig::default();
1095        let manager = ReplicationManager::new("node-1".to_string(), config);
1096
1097        let partition_id1 = PartitionId::new("topic-1", 0);
1098        let partition_id2 = PartitionId::new("topic-1", 1);
1099
1100        // Create partitions
1101        manager.get_or_create(partition_id1.clone(), true);
1102        manager.get_or_create(partition_id2.clone(), false);
1103
1104        // Verify tracking
1105        assert_eq!(manager.leading_partitions().len(), 1);
1106        assert!(manager.get(&partition_id1).is_some());
1107        assert!(manager.get(&partition_id2).is_some());
1108
1109        // Remove partition
1110        manager.remove(&partition_id1);
1111        assert!(manager.get(&partition_id1).is_none());
1112        assert_eq!(manager.leading_partitions().len(), 0);
1113    }
1114}