use std::io;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use chunked_wal::ChunkPersistedFn;
use chunked_wal::ChunkedWal;
use log::info;
use crate::ChunkId;
use crate::Config;
use crate::RaftLogRecord;
use crate::RaftWalTypes;
use crate::Types;
use crate::WALRecord;
use crate::api::raft_log_writer::RaftLogWriter;
use crate::api::state_machine::StateMachine;
use crate::api::wal::WAL;
use crate::errors::LogIndexNotFound;
use crate::errors::RaftLogStateError;
use crate::raft_log::access_state::AccessStat;
use crate::raft_log::dump::RefDump;
use crate::raft_log::dump_raft_log::DumpRaftLog;
use crate::raft_log::raft_log_action::RaftLogAction;
use crate::raft_log::stat::Stat;
use crate::raft_log::state_machine::RaftLogStateMachine;
use crate::raft_log::state_machine::raft_log_state::RaftLogState;
use crate::types::Segment;
#[derive(Debug)]
pub struct RaftLog<T: Types> {
pub(crate) config: Arc<Config>,
pub(crate) wal: ChunkedWal<RaftWalTypes<T>>,
pub(crate) state_machine: RaftLogStateMachine<T>,
removed_chunks: Vec<ChunkId>,
access_stat: AccessStat,
}
impl<T: Types> RaftLogWriter<T> for RaftLog<T> {
fn save_user_data(
&mut self,
user_data: Option<T::UserData>,
) -> Result<Segment, io::Error> {
let mut state = self.state_machine.checkpoint();
state.user_data = user_data;
let record = RaftLogRecord::Checkpoint(state);
self.append_and_apply(&record)
}
fn save_vote(&mut self, vote: T::Vote) -> Result<Segment, io::Error> {
let record = RaftLogRecord::Action(RaftLogAction::SaveVote(vote));
self.append_and_apply(&record)
}
fn append<I>(&mut self, entries: I) -> Result<Segment, io::Error>
where I: IntoIterator<Item = (T::LogId, T::LogPayload)> {
for (log_id, payload) in entries {
let record =
RaftLogRecord::Action(RaftLogAction::Append(log_id, payload));
self.append_and_apply(&record)?;
}
Ok(self.wal.last_segment())
}
fn truncate(&mut self, index: u64) -> Result<Segment, io::Error> {
let purged = self.log_state().purged.as_ref();
let log_id = if index == T::next_log_index(purged) {
purged.cloned()
} else {
let log_id = self.get_log_id(index - 1)?;
Some(log_id)
};
let record =
RaftLogRecord::Action(RaftLogAction::TruncateAfter(log_id));
self.append_and_apply(&record)
}
fn purge(&mut self, upto: T::LogId) -> Result<Segment, io::Error> {
let purged = self.log_state().purged.as_ref();
info!(
"RaftLog purge upto: {:?}; current purged: {:?}",
upto, purged
);
if T::log_index(&upto) < T::next_log_index(purged) {
return Ok(self.wal.last_segment());
}
let record =
RaftLogRecord::Action(RaftLogAction::PurgeUpto(upto.clone()));
let res = self.append_and_apply(&record)?;
let chunk_ids = self.wal.drain_closed_chunks_while(|state| {
state.last.as_ref() <= Some(&upto)
});
for chunk_id in chunk_ids {
info!(
"RaftLog: scheduled to remove chunk after next flush: {}",
self.config.wal.chunk_path(chunk_id)
);
self.removed_chunks.push(chunk_id);
}
Ok(res)
}
fn commit(&mut self, log_id: T::LogId) -> Result<Segment, io::Error> {
let record = RaftLogRecord::Action(RaftLogAction::Commit(log_id));
self.append_and_apply(&record)
}
fn flush(
&mut self,
sync: bool,
callback: Option<T::Callback>,
) -> Result<(), io::Error> {
self.wal.send_pending(sync, callback)?;
if sync && !self.removed_chunks.is_empty() {
let chunk_ids = self.removed_chunks.drain(..).collect::<Vec<_>>();
self.wal.send_remove_chunks(chunk_ids)?;
}
Ok(())
}
}
impl<T: Types> RaftLog<T> {
pub fn dump_data(&self) -> DumpRaftLog<T> {
let logs = self.state_machine.log.values().cloned().collect::<Vec<_>>();
let cache =
self.state_machine.payload_cache.read().unwrap().cache.clone();
let record_reader = self.wal.closed_chunk_reader();
DumpRaftLog {
state: self.state_machine.checkpoint(),
logs,
cache,
record_reader,
cache_hit: 0,
cache_miss: 0,
}
}
pub fn dump(&self) -> RefDump<'_, T> {
RefDump { raft_log: self }
}
pub fn config(&self) -> &Config {
self.config.as_ref()
}
pub fn open(config: Arc<Config>) -> Result<Self, io::Error> {
let wal_config = Arc::new(config.wal.clone());
let mut sm = RaftLogStateMachine::new(&config);
let cache = sm.payload_cache.clone();
let on_chunk_persisted: ChunkPersistedFn<RaftWalTypes<T>> =
Arc::new(move |_persisted, prev_chunk_checkpoint: Option<Arc<RaftLogState<T>>>| {
let Some(prev_chunk_checkpoint) = prev_chunk_checkpoint else {
return;
};
cache
.write()
.unwrap()
.set_last_evictable(prev_chunk_checkpoint.last().cloned());
});
let wal = ChunkedWal::open(wal_config, &mut sm, on_chunk_persisted)?;
let s = Self {
config,
state_machine: sm,
wal,
access_stat: Default::default(),
removed_chunks: vec![],
};
Ok(s)
}
pub fn update_state(
&mut self,
state: RaftLogState<T>,
) -> Result<Segment, io::Error> {
let record = RaftLogRecord::Checkpoint(state);
self.append_and_apply(&record)
}
pub fn read(
&self,
from: u64,
to: u64,
) -> impl Iterator<Item = Result<(T::LogId, T::LogPayload), io::Error>> + '_
{
self.state_machine.log.range(from..to).map(|(_, log_data)| {
let log_id = log_data.log_id.clone();
let payload =
self.state_machine.payload_cache.read().unwrap().get(&log_id);
let payload = if let Some(payload) = payload {
self.access_stat.cache_hit.fetch_add(1, Ordering::Relaxed);
payload
} else {
self.access_stat.cache_miss.fetch_add(1, Ordering::Relaxed);
let record = self
.wal
.load_record(&log_data.chunk_id, log_data.record_segment)?;
if let WALRecord::Action(RaftLogAction::Append(
log_id,
payload,
)) = record
{
debug_assert_eq!(log_id, log_data.log_id);
payload
} else {
panic!("Expect Record::Append but: {:?}", record);
}
};
Ok((log_id, payload))
})
}
pub fn log_state(&self) -> &RaftLogState<T> {
&self.state_machine.log_state
}
#[allow(dead_code)]
pub(crate) fn log_state_mut(&mut self) -> &mut RaftLogState<T> {
&mut self.state_machine.log_state
}
pub fn stat(&self) -> Stat<T> {
let closed = self.wal.closed_chunk_stats();
let open_stat =
self.wal.open_chunk_stat(self.state_machine.checkpoint());
let cache = self.state_machine.payload_cache.read().unwrap();
Stat {
closed_chunks: closed,
open_chunk: open_stat,
payload_cache_last_evictable: cache.last_evictable().cloned(),
payload_cache_item_count: cache.item_count() as u64,
payload_cache_max_item: cache.max_items() as u64,
payload_cache_size: cache.total_size() as u64,
payload_cache_capacity: cache.capacity() as u64,
payload_cache_miss: self
.access_stat
.cache_miss
.load(Ordering::Relaxed),
payload_cache_hit: self
.access_stat
.cache_hit
.load(Ordering::Relaxed),
flush_metrics: self.wal.flush_metrics(),
}
}
pub fn access_stat(&self) -> &AccessStat {
&self.access_stat
}
pub fn wait_worker_idle(&self) -> Result<(), io::Error> {
self.wal.wait_worker_idle()
}
pub fn drain_cache_evictable(&self) {
self.state_machine.payload_cache.write().unwrap().drain_evictable();
}
fn get_log_id(&self, index: u64) -> Result<T::LogId, RaftLogStateError<T>> {
let entry = self
.state_machine
.log
.get(&index)
.ok_or_else(|| LogIndexNotFound::new(index))?;
Ok(entry.log_id.clone())
}
fn append_and_apply(
&mut self,
rec: &RaftLogRecord<T>,
) -> Result<Segment, io::Error> {
WAL::append(&mut self.wal, rec)?;
StateMachine::apply(
&mut self.state_machine,
rec,
self.wal.open_chunk_id(),
self.wal.last_segment(),
)?;
self.wal.try_close_full_chunk(&self.state_machine)?;
Ok(self.wal.last_segment())
}
pub fn on_disk_size(&self) -> u64 {
self.wal.on_disk_size()
}
}