use std::collections::HashMap;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::FlushPolicy;
use crate::HardState;
use crate::LogStore;
use crate::MetaStore;
use crate::NetworkError;
use crate::PersistenceConfig;
use crate::RaftLog;
use crate::Result;
use crate::StorageEngine;
use crate::TypeConfig;
use crate::alias::SOF;
use crate::scoped_timer::ScopedTimer;
use async_trait::async_trait;
use crossbeam_skiplist::SkipMap;
use d_engine_proto::common::Entry;
use d_engine_proto::common::LogId;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::debug;
use tracing::error;
use tracing::warn;
const MAX_TERM_SEGMENTS: usize = 1024;
pub(crate) struct TermSegments {
pub(crate) last_term: AtomicU64,
pub(crate) last_term_start: AtomicU64,
seg_count: AtomicUsize,
seg_starts: [AtomicU64; MAX_TERM_SEGMENTS],
seg_terms: [AtomicU64; MAX_TERM_SEGMENTS],
}
impl TermSegments {
pub(crate) fn new() -> Self {
Self {
last_term: AtomicU64::new(0),
last_term_start: AtomicU64::new(0),
seg_count: AtomicUsize::new(0),
seg_starts: std::array::from_fn(|_| AtomicU64::new(0)),
seg_terms: std::array::from_fn(|_| AtomicU64::new(0)),
}
}
pub(crate) fn get(
&self,
index: u64,
) -> Option<u64> {
let last_start = self.last_term_start.load(Ordering::Acquire);
let last_term = self.last_term.load(Ordering::Acquire);
if last_term == 0 {
return None;
}
if index >= last_start {
return Some(last_term);
}
let count = self.seg_count.load(Ordering::Acquire);
(0..count).rev().find_map(|i| {
let start = self.seg_starts[i].load(Ordering::Acquire);
if start <= index {
Some(self.seg_terms[i].load(Ordering::Acquire))
} else {
None
}
})
}
pub(crate) fn on_append(
&self,
entries: &[Entry],
) {
for entry in entries {
let lt = self.last_term.load(Ordering::Acquire);
if entry.term == lt {
let ls = self.last_term_start.load(Ordering::Acquire);
if entry.index < ls {
self.last_term_start.store(entry.index, Ordering::Release);
}
continue;
}
if lt == 0 {
self.last_term_start.store(entry.index, Ordering::Release);
self.last_term.store(entry.term, Ordering::Release);
continue;
}
let ls = self.last_term_start.load(Ordering::Acquire);
let i = self.seg_count.fetch_add(1, Ordering::AcqRel);
if i < MAX_TERM_SEGMENTS {
self.seg_starts[i].store(ls, Ordering::Release);
self.seg_terms[i].store(lt, Ordering::Release);
}
self.last_term_start.store(entry.index, Ordering::Release);
self.last_term.store(entry.term, Ordering::Release);
}
}
pub(crate) fn clear(&self) {
self.seg_count.store(0, Ordering::Release);
self.last_term.store(0, Ordering::Release);
self.last_term_start.store(0, Ordering::Release);
}
}
#[derive(Debug)]
pub enum IOTask {
ReplaceRange {
truncate_from: u64,
new_entries: Vec<Entry>,
done: oneshot::Sender<Result<()>>,
},
Purge {
cutoff: LogId,
done: oneshot::Sender<()>,
},
Reset { done: oneshot::Sender<Result<()>> },
Flush(oneshot::Sender<Result<()>>),
Shutdown,
}
pub struct BufferedRaftLog<T>
where
T: TypeConfig,
{
#[allow(dead_code)]
node_id: u32,
pub(crate) log_store: Arc<<SOF<T> as StorageEngine>::LogStore>,
pub(crate) meta_store: Arc<<SOF<T> as StorageEngine>::MetaStore>,
idle_flush_interval_ms: u64,
pub(crate) entries: SkipMap<u64, Entry>,
pub(crate) durable_index: AtomicU64,
pub(crate) next_id: AtomicU64,
min_index: AtomicU64, max_index: AtomicU64,
last_purged_index: AtomicU64,
last_purged_term: AtomicU64,
term_first_index: SkipMap<u64, AtomicU64>,
term_last_index: SkipMap<u64, AtomicU64>,
term_segments: TermSegments,
pub(crate) write_notify: Arc<Notify>,
pub(crate) command_sender: mpsc::UnboundedSender<IOTask>,
log_flush_tx: Option<mpsc::UnboundedSender<crate::RoleEvent>>,
io_thread_handle: std::sync::Mutex<Option<std::thread::JoinHandle<()>>>,
}
#[async_trait]
impl<T> RaftLog for BufferedRaftLog<T>
where
T: TypeConfig,
{
fn entry(
&self,
index: u64,
) -> Result<Option<Entry>> {
Ok(self.entries.get(&index).map(|e| e.value().clone()))
}
fn first_entry_id(&self) -> u64 {
self.min_index.load(Ordering::Acquire)
}
fn last_entry_id(&self) -> u64 {
self.max_index.load(Ordering::Acquire)
}
fn durable_index(&self) -> u64 {
self.durable_index.load(Ordering::Acquire)
}
fn last_entry(&self) -> Option<Entry> {
let last_index = self.last_entry_id();
if last_index > 0 {
self.entry(last_index).ok().flatten()
} else {
None
}
}
fn last_log_id(&self) -> Option<LogId> {
let last_index = self.last_entry_id();
if last_index > 0 {
self.entry(last_index).ok().flatten().map(|entry| LogId {
term: entry.term,
index: entry.index,
})
} else {
let purged_index = self.last_purged_index.load(Ordering::Acquire);
if purged_index > 0 {
Some(LogId {
index: purged_index,
term: self.last_purged_term.load(Ordering::Acquire),
})
} else {
None
}
}
}
fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn entry_term(
&self,
entry_id: u64,
) -> Option<u64> {
let max = self.max_index.load(Ordering::Acquire);
let min = self.min_index.load(Ordering::Acquire);
if max == 0 || entry_id < min || entry_id > max {
let purged_index = self.last_purged_index.load(Ordering::Acquire);
if purged_index > 0 && entry_id == purged_index {
return Some(self.last_purged_term.load(Ordering::Acquire));
}
return None;
}
if let Some(term) = self.term_segments.get(entry_id) {
return Some(term);
}
self.entries.get(&entry_id).map(|e| e.value().term)
}
fn first_index_for_term(
&self,
term: u64,
) -> Option<u64> {
self.term_first_index.get(&term).map(|e| e.value().load(Ordering::Acquire))
}
fn last_index_for_term(
&self,
term: u64,
) -> Option<u64> {
self.term_last_index.get(&term).map(|e| e.value().load(Ordering::Acquire))
}
fn pre_allocate_raft_logs_next_index(&self) -> u64 {
self.next_id.fetch_add(1, Ordering::SeqCst)
}
fn pre_allocate_id_range(
&self,
count: u64,
) -> RangeInclusive<u64> {
match count {
0 => u64::MAX..=u64::MAX, _ => {
let cur = self.next_id.load(Ordering::SeqCst);
assert!(cur <= u64::MAX - count, "ID overflow");
let start = self.next_id.fetch_add(count, Ordering::SeqCst);
start..=(start + count - 1)
}
}
}
fn get_entries_range(
&self,
range: RangeInclusive<u64>,
) -> Result<Vec<Entry>> {
let capacity = (range.end().saturating_sub(*range.start()) + 1) as usize;
let mut result = Vec::with_capacity(capacity);
result.extend(self.entries.range(range).map(|e| e.value().clone()));
Ok(result)
}
async fn append_entries(
&self,
entries: Vec<Entry>,
) -> Result<()> {
let _timer = ScopedTimer::new("append_entries");
if entries.is_empty() {
return Ok(());
}
self.insert_to_memory(&entries);
self.write_notify.notify_one();
Ok(())
}
async fn insert_batch(
&self,
logs: Vec<Entry>,
) -> Result<()> {
self.append_entries(logs).await?;
Ok(())
}
async fn filter_out_conflicts_and_append(
&self,
prev_log_index: u64,
prev_log_term: u64,
new_entries: Vec<Entry>,
) -> Result<Option<LogId>> {
let _timer = ScopedTimer::new("filter_out_conflicts_and_append");
if prev_log_index == 0 && prev_log_term == 0 {
self.reset().await?;
self.append_entries(new_entries.clone()).await?;
return Ok(new_entries.last().map(|e| LogId {
term: e.term,
index: e.index,
}));
}
if self.entry_term(prev_log_index) != Some(prev_log_term) {
return Ok(self.last_log_id());
}
let last_current_index = self.last_entry_id();
let skip = new_entries.partition_point(|e| e.index <= last_current_index);
let overlap = &new_entries[..skip];
let tail = &new_entries[skip..];
let overlap_safe = match overlap.first() {
None => true,
Some(first) => {
let ft = self.term_segments.last_term.load(Ordering::Acquire);
let fs = self.term_segments.last_term_start.load(Ordering::Acquire);
first.index >= fs
&& first.term == ft
&& overlap.last().is_none_or(|last| last.term == ft)
}
};
let last_log_id = if overlap_safe {
if tail.is_empty() {
new_entries.last().map(|e| LogId {
term: e.term,
index: e.index,
})
} else {
self.append_entries(tail.to_vec()).await?;
tail.last().map(|e| LogId {
term: e.term,
index: e.index,
})
}
} else {
let diverge_pos = new_entries.iter().position(|e| {
e.index > last_current_index || self.entry_term(e.index) != Some(e.term)
});
match diverge_pos {
None => {
new_entries.last().map(|e| LogId {
term: e.term,
index: e.index,
})
}
Some(pos) => {
let tail = &new_entries[pos..];
let diverge_index = new_entries[pos].index;
if diverge_index <= last_current_index {
self.remove_range(diverge_index..=u64::MAX);
self.insert_to_memory(tail);
let (done_tx, done_rx) = oneshot::channel();
self.command_sender
.send(IOTask::ReplaceRange {
truncate_from: diverge_index,
new_entries: tail.to_vec(),
done: done_tx,
})
.map_err(|e| {
NetworkError::SingalSendFailed(format!(
"Failed to send ReplaceRange: {e:?}"
))
})?;
done_rx.await.map_err(|_| {
NetworkError::SingalSendFailed(
"ReplaceRange done channel closed".into(),
)
})??;
} else {
self.append_entries(tail.to_vec()).await?;
}
tail.last().map(|e| LogId {
term: e.term,
index: e.index,
})
}
}
};
Ok(last_log_id)
}
fn calculate_majority_matched_index(
&self,
current_term: u64,
commit_index: u64,
mut peer_matched_ids: Vec<u64>,
) -> Option<u64> {
let _timer = ScopedTimer::new("calculate_majority_matched_index");
peer_matched_ids.push(self.last_entry_id());
peer_matched_ids.sort_unstable_by(|a, b| b.cmp(a));
let majority_index = peer_matched_ids[peer_matched_ids.len() / 2];
debug!(
"Majority calculation: peers={:?}, majority_index={}",
peer_matched_ids, majority_index,
);
if majority_index < commit_index {
return None;
}
match self.entry(majority_index) {
Ok(Some(entry)) if entry.term == current_term => Some(majority_index),
_ => None,
}
}
async fn purge_logs_up_to(
&self,
cutoff_index: LogId,
) -> Result<()> {
let _timer = ScopedTimer::new("purge_logs_up_to");
debug!(?cutoff_index, "purge_logs_up_to");
self.remove_range(0..=cutoff_index.index);
let new_min = self.entries.front().map(|e| *e.key()).unwrap_or(0);
self.min_index.store(new_min, Ordering::Release);
let new_max = self.entries.back().map(|e| *e.key()).unwrap_or(0);
self.max_index.store(new_max, Ordering::Release);
let current_durable = self.durable_index.load(Ordering::Acquire);
if cutoff_index.index >= current_durable {
self.durable_index.store(cutoff_index.index, Ordering::Release);
}
self.last_purged_term.store(cutoff_index.term, Ordering::Release);
self.last_purged_index.store(cutoff_index.index, Ordering::Release);
let (done_tx, done_rx) = oneshot::channel();
self.command_sender
.send(IOTask::Purge {
cutoff: cutoff_index,
done: done_tx,
})
.map_err(|e| NetworkError::SingalSendFailed(format!("Failed to send Purge: {e:?}")))?;
done_rx
.await
.map_err(|_| NetworkError::SingalSendFailed("Purge channel closed".into()))?;
Ok(())
}
async fn flush(&self) -> Result<()> {
let max_index = self.max_index.load(Ordering::Acquire);
if max_index == 0 {
return Ok(());
}
if self.durable_index.load(Ordering::Acquire) >= max_index {
return Ok(());
}
let (tx, rx) = oneshot::channel();
self.command_sender
.send(IOTask::Flush(tx))
.map_err(|e| NetworkError::SingalSendFailed(format!("flush send failed: {e:?}")))?;
rx.await
.map_err(|_| NetworkError::SingalSendFailed("flush channel closed".into()))?
}
async fn reset(&self) -> Result<()> {
let _timer = ScopedTimer::new("buffered_raft_log::reset");
self.reset_internal().await
}
fn load_hard_state(&self) -> Result<Option<HardState>> {
self.meta_store.load_hard_state()
}
fn save_hard_state(
&self,
hard_state: &HardState,
) -> Result<()> {
self.meta_store.save_hard_state(hard_state)
}
async fn close(&self) {
let _ = self.command_sender.send(IOTask::Shutdown);
let handle = self.io_thread_handle.lock().unwrap().take();
if let Some(handle) = handle {
tokio::task::spawn_blocking(move || {
let _ = handle.join();
})
.await
.ok();
}
}
}
impl<T> BufferedRaftLog<T>
where
T: TypeConfig,
{
pub fn new(
node_id: u32,
persistence_config: PersistenceConfig,
storage: Arc<SOF<T>>,
) -> (Self, mpsc::UnboundedReceiver<IOTask>) {
let log_store = storage.log_store();
let meta_store = storage.meta_store();
let disk_len = log_store.last_index();
let FlushPolicy::Batch {
idle_flush_interval_ms,
} = persistence_config.flush_policy;
debug!(
"Creating BufferedRaftLog with node_id: {}, strategy: {:?}, idle_flush_interval_ms: {:?}, disk_len: {:?}",
node_id, persistence_config.strategy, idle_flush_interval_ms, disk_len
);
let (command_sender, command_receiver) = mpsc::unbounded_channel();
let entries = SkipMap::new();
let term_first_index = SkipMap::new();
let term_last_index = SkipMap::new();
let term_segments = TermSegments::new();
let mut loaded_count = 0;
if disk_len > 0 {
match log_store.get_entries(1..=disk_len) {
Ok(all_entries) if !all_entries.is_empty() => {
loaded_count = all_entries.len();
debug!("Successfully loaded {} entries from disk", loaded_count);
for entry in &all_entries {
let index = entry.index;
entries.insert(index, entry.clone());
term_first_index
.get_or_insert(entry.term, AtomicU64::new(u64::MAX))
.value()
.fetch_min(index, Ordering::AcqRel);
term_last_index
.get_or_insert(entry.term, AtomicU64::new(0))
.value()
.fetch_max(index, Ordering::AcqRel);
}
term_segments.on_append(&all_entries);
}
Ok(_empty_entries) => {
warn!("Disk reported length {} but loaded 0 entries", disk_len);
}
Err(e) => {
error!("Failed to load entries from storage: {:?}", e);
}
}
}
let min_index = entries.front().map(|e| *e.key()).unwrap_or(0);
let max_index = entries.back().map(|e| *e.key()).unwrap_or(0);
if disk_len > 0 && loaded_count == 0 {
warn!(
"Inconsistent state: disk_len={} but loaded_count=0",
disk_len
);
}
let (last_purged_index_val, last_purged_term_val) = match log_store.load_purge_boundary() {
Ok(Some(lid)) => (lid.index, lid.term),
Ok(None) => (0, 0),
Err(e) => {
warn!("Failed to load purge boundary: {:?}, defaulting to 0", e);
(0, 0)
}
};
(
Self {
node_id,
log_store,
meta_store,
idle_flush_interval_ms,
entries,
min_index: AtomicU64::new(min_index),
max_index: AtomicU64::new(max_index),
last_purged_index: AtomicU64::new(last_purged_index_val),
last_purged_term: AtomicU64::new(last_purged_term_val),
durable_index: AtomicU64::new(disk_len),
next_id: AtomicU64::new(disk_len + 1),
write_notify: Arc::new(Notify::new()),
command_sender: command_sender.clone(),
term_first_index,
term_last_index,
term_segments,
log_flush_tx: None, io_thread_handle: std::sync::Mutex::new(None), },
command_receiver,
)
}
pub fn start(
mut self,
receiver: mpsc::UnboundedReceiver<IOTask>,
log_flush_tx: Option<mpsc::UnboundedSender<crate::RoleEvent>>,
) -> Arc<Self> {
self.log_flush_tx = log_flush_tx;
let arc_self = Arc::new(self);
let weak_self = Arc::downgrade(&arc_self);
let idle_flush_interval_ms = arc_self.idle_flush_interval_ms;
let node_id = arc_self.node_id;
let io_handle = std::thread::Builder::new()
.name(format!("raft-io-{}", node_id))
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build raft-io runtime");
rt.block_on(Self::batch_processor(
weak_self,
receiver,
idle_flush_interval_ms,
));
})
.expect("failed to spawn raft-io thread");
*arc_self.io_thread_handle.lock().unwrap() = Some(io_handle);
arc_self
}
async fn batch_processor(
this: std::sync::Weak<Self>,
mut receiver: mpsc::UnboundedReceiver<IOTask>,
idle_flush_interval_ms: u64,
) {
let Some(this) = this.upgrade() else { return };
let start = tokio::time::Instant::now() + Duration::from_millis(idle_flush_interval_ms);
let mut safety_timer =
tokio::time::interval_at(start, Duration::from_millis(idle_flush_interval_ms));
safety_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut pending_max: u64 = 0;
loop {
tokio::select! {
_ = this.write_notify.notified() => {
let end = this.max_index.load(Ordering::Acquire);
let start = this.durable_index.load(Ordering::Acquire) + 1;
if start <= end {
match this.get_entries_range(start..=end) {
Ok(entries) if !entries.is_empty() => {
if let Err(e) = this.log_store.persist_entries(entries).await {
error!("write_notify persist_entries failed: {e:?}");
} else {
pending_max = pending_max.max(end);
}
}
Ok(_) => {}
Err(e) => error!("write_notify get_entries_range failed: {e:?}"),
}
}
let mut seen_shutdown = false;
let mut fatal_exit = false;
let mut flush_replies: Option<Vec<oneshot::Sender<Result<()>>>> = None;
while let Ok(cmd) = receiver.try_recv() {
match cmd {
IOTask::Shutdown => { seen_shutdown = true; break; }
IOTask::Flush(reply) => { flush_replies.get_or_insert_with(Vec::new).push(reply); }
cmd => {
if Self::handle_non_write_cmd(cmd, &this, &mut pending_max).await {
fatal_exit = true;
break;
}
}
}
}
if fatal_exit {
break; }
if flush_replies.is_some() {
let current_max = this.max_index.load(Ordering::Acquire);
if current_max > pending_max {
let new_start = pending_max + 1;
if new_start <= current_max {
match this.get_entries_range(new_start..=current_max) {
Ok(entries) if !entries.is_empty() => {
if let Ok(()) = this.log_store.persist_entries(entries).await {
pending_max = current_max;
}
}
Ok(_) => {}
Err(e) => error!("write_notify flush catch-up persist failed: {e:?}"),
}
}
}
}
let fsync_ok = if pending_max > 0 {
match this.advance_durable_after_write(pending_max).await {
Ok(()) => { pending_max = 0; true }
Err(e) => { error!("write_notify fsync failed: {e:?}"); false }
}
} else {
true
};
if let Some(replies) = flush_replies {
for reply in replies {
let _ = if fsync_ok {
reply.send(Ok(()))
} else {
reply.send(Err(NetworkError::SingalSendFailed("fsync failed".into()).into()))
};
}
}
if seen_shutdown {
let _ = this.log_store.flush();
let _ = this.meta_store.flush();
break;
}
}
cmd = receiver.recv() => {
let Some(cmd) = cmd else { break };
match cmd {
IOTask::Shutdown => {
let end = this.max_index.load(Ordering::Acquire);
let start = this.durable_index.load(Ordering::Acquire) + 1;
if start <= end
&& let Ok(entries) = this.get_entries_range(start..=end)
&& !entries.is_empty()
{
if let Err(e) = this.log_store.persist_entries(entries).await {
error!("Shutdown persist_entries failed: {e:?}");
} else {
pending_max = pending_max.max(end);
}
}
if pending_max > 0 {
let _ = this.advance_durable_after_write(pending_max).await;
}
let _ = this.log_store.flush();
let _ = this.meta_store.flush();
break;
}
IOTask::Flush(reply) => {
let end = this.max_index.load(Ordering::Acquire);
let start = this.durable_index.load(Ordering::Acquire) + 1;
if start <= end
&& let Ok(entries) = this.get_entries_range(start..=end)
&& !entries.is_empty()
{
if let Err(e) = this.log_store.persist_entries(entries).await {
error!("Flush persist_entries failed: {e:?}");
let _ = reply.send(Err(e));
continue;
} else {
pending_max = pending_max.max(end);
}
}
let mut seen_shutdown = false;
let mut fatal_exit = false;
let mut extra_replies: Vec<oneshot::Sender<Result<()>>> = Vec::new();
while let Ok(cmd) = receiver.try_recv() {
match cmd {
IOTask::Shutdown => { seen_shutdown = true; break; }
IOTask::Flush(extra) => { extra_replies.push(extra); }
cmd => {
if Self::handle_non_write_cmd(cmd, &this, &mut pending_max).await {
fatal_exit = true;
break;
}
}
}
}
if fatal_exit {
break; }
let fsync_result = if pending_max > 0 {
match this.advance_durable_after_write(pending_max).await {
Ok(()) => {
pending_max = 0;
Ok(())
}
Err(e) => {
error!("Flush fsync failed: {e:?}");
Err(e)
}
}
} else {
Ok(())
};
let send_result = |r: oneshot::Sender<Result<()>>| {
let _ = match &fsync_result {
Ok(()) => r.send(Ok(())),
Err(e) => r.send(Err(NetworkError::SingalSendFailed(
format!("{e:?}"),
)
.into())),
};
};
send_result(reply);
for extra in extra_replies {
send_result(extra);
}
if seen_shutdown {
let _ = this.log_store.flush();
let _ = this.meta_store.flush();
break;
}
}
cmd => {
if Self::handle_non_write_cmd(cmd, &this, &mut pending_max).await {
break; }
}
}
}
_ = safety_timer.tick() => {
let end = this.max_index.load(Ordering::Acquire);
let start = this.durable_index.load(Ordering::Acquire) + 1;
if start <= end
&& let Ok(entries) = this.get_entries_range(start..=end)
&& !entries.is_empty()
{
if let Err(e) = this.log_store.persist_entries(entries).await {
error!("safety-net persist_entries failed: {e:?}");
} else {
pending_max = pending_max.max(end);
}
}
if pending_max > 0 {
if let Err(e) = this.advance_durable_after_write(pending_max).await {
error!("safety-net timer fsync failed: {e:?}");
} else {
pending_max = 0;
}
}
}
}
}
}
async fn handle_non_write_cmd(
cmd: IOTask,
this: &Arc<Self>,
pending_max: &mut u64,
) -> bool {
match cmd {
IOTask::Flush(_) => {
unreachable!(
"Flush must be intercepted in the drain loop before handle_non_write_cmd"
)
}
IOTask::Shutdown => {
unreachable!("Shutdown is always filtered out before reaching handle_non_write_cmd")
}
IOTask::ReplaceRange {
truncate_from,
new_entries,
done,
} => {
let max_idx = new_entries.last().map(|e| e.index).unwrap_or(0);
let result = this.log_store.replace_range(truncate_from, new_entries).await;
if let Err(ref e) = result {
error!("IOTask::ReplaceRange failed (fatal): {e:?}");
let _ = done.send(result);
return true; }
if max_idx > 0 {
*pending_max = (*pending_max).max(max_idx);
}
let _ = done.send(result);
false
}
IOTask::Purge { cutoff, done } => {
let _ = this.log_store.purge(cutoff).await;
let _ = done.send(());
false
}
IOTask::Reset { done } => {
let result = this.log_store.reset().await;
*pending_max = 0; let _ = done.send(result);
false
}
}
}
async fn advance_durable_after_write(
&self,
max_index: u64,
) -> Result<()> {
if !self.log_store.is_write_durable() {
self.log_store.flush()?;
}
self.advance_durable_and_notify(max_index);
Ok(())
}
async fn reset_internal(&self) -> Result<()> {
self.entries.clear();
self.durable_index.store(0, Ordering::Release);
self.next_id.store(1, Ordering::Release);
self.min_index.store(0, Ordering::Release);
self.max_index.store(0, Ordering::Release);
self.term_first_index.clear();
self.term_last_index.clear();
self.term_segments.clear();
let (done_tx, done_rx) = oneshot::channel();
self.command_sender
.send(IOTask::Reset { done: done_tx })
.map_err(|e| crate::Error::Fatal(format!("IOTask::Reset send failed: {e:?}")))?;
done_rx
.await
.map_err(|e| crate::Error::Fatal(format!("IOTask::Reset recv failed: {e:?}")))??;
Ok(())
}
fn insert_to_memory(
&self,
entries: &[Entry],
) {
for entry in entries {
self.entries.insert(entry.index, entry.clone());
}
self.update_term_indexes(entries);
self.term_segments.on_append(entries);
let max_index = entries.iter().map(|e| e.index).max().unwrap_or(0);
let current_next = self.next_id.load(Ordering::Acquire);
if max_index >= current_next {
self.next_id.store(max_index + 1, Ordering::Release);
}
if let Some(first_entry) = entries.first() {
let mut current_min = self.min_index.load(Ordering::Relaxed);
while first_entry.index < current_min || current_min == 0 {
match self.min_index.compare_exchange_weak(
current_min,
first_entry.index,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(e) => current_min = e,
}
}
}
if let Some(last_entry) = entries.last() {
let mut current_max = self.max_index.load(Ordering::Relaxed);
while last_entry.index > current_max {
match self.max_index.compare_exchange_weak(
current_max,
last_entry.index,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(e) => current_max = e,
}
}
}
}
fn advance_durable_and_notify(
&self,
new_durable: u64,
) {
let prev = self.durable_index.fetch_max(new_durable, Ordering::AcqRel);
if new_durable > prev
&& let Some(ref tx) = self.log_flush_tx
{
let _ = tx.send(crate::RoleEvent::LogFlushed {
durable_index: new_durable,
});
}
}
pub fn remove_range(
&self,
range: RangeInclusive<u64>,
) {
let (start, end) = range.into_inner();
let mut affected_terms: HashMap<u64, (Option<u64>, Option<u64>)> = HashMap::new();
let mut current = start;
while current <= end {
if let Some(entry) = self.entries.range(current..=end).next() {
let key = *entry.key();
let term = entry.value().term;
let (min_idx, max_idx) = affected_terms.entry(term).or_insert((None, None));
if min_idx.is_none() || key < min_idx.unwrap() {
*min_idx = Some(key);
}
if max_idx.is_none() || key > max_idx.unwrap() {
*max_idx = Some(key);
}
self.entries.remove(&key);
current = key + 1;
} else {
break;
}
}
for (term, (removed_min, removed_max)) in affected_terms {
if let Some(term_first) = self.term_first_index.get(&term) {
let current_first = term_first.value().load(Ordering::Acquire);
if removed_min.is_some() && current_first >= removed_min.unwrap() {
let new_first =
self.entries.iter().find(|e| e.value().term == term).map(|e| *e.key());
if let Some(idx) = new_first {
term_first.value().store(idx, Ordering::Release);
} else {
self.term_first_index.remove(&term);
}
}
}
if let Some(term_last) = self.term_last_index.get(&term) {
let current_last = term_last.value().load(Ordering::Acquire);
if removed_max.is_some() && current_last <= removed_max.unwrap() {
let new_last = self
.entries
.iter()
.rev()
.find(|e| e.value().term == term)
.map(|e| *e.key());
if let Some(idx) = new_last {
term_last.value().store(idx, Ordering::Release);
} else {
self.term_last_index.remove(&term);
}
}
}
}
let new_min = self.entries.front().map(|e| *e.key()).unwrap_or(0);
let new_max = self.entries.back().map(|e| *e.key()).unwrap_or(0);
self.min_index.store(new_min, Ordering::Release);
self.max_index.store(new_max, Ordering::Release);
}
fn update_term_indexes(
&self,
entries: &[Entry],
) {
for entry in entries {
let term = entry.term;
self.term_first_index
.get_or_insert(term, AtomicU64::new(u64::MAX))
.value()
.fetch_min(entry.index, Ordering::AcqRel);
self.term_last_index
.get_or_insert(term, AtomicU64::new(0))
.value()
.fetch_max(entry.index, Ordering::AcqRel);
}
}
#[cfg(any(test, feature = "__test_support"))]
#[allow(dead_code)]
pub fn len(&self) -> usize {
self.entries.len()
}
#[cfg(test)]
pub fn next_id(&self) -> &std::sync::atomic::AtomicU64 {
&self.next_id
}
#[cfg(any(test, feature = "__test_support"))]
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
impl<T> Drop for BufferedRaftLog<T>
where
T: TypeConfig,
{
fn drop(&mut self) {
if let Err(e) = self.command_sender.clone().send(IOTask::Shutdown) {
debug!(
"Shutdown command send failed (receiver already closed): {:?}",
e
);
}
}
}
impl<T> std::fmt::Debug for BufferedRaftLog<T>
where
T: TypeConfig,
{
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
f.debug_struct("BufferedRaftLog").finish()
}
}