use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use display_more::DisplayOptionExt;
use openraft_macros::since;
use validit::Valid;
use crate::LogIdOptionExt;
use crate::RaftLogReader;
use crate::RaftSnapshotBuilder;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::base::shared_id_generator::SharedIdGenerator;
use crate::engine::LogIdList;
use crate::entry::RaftEntry;
use crate::entry::RaftPayload;
use crate::errors::StorageIOResult;
use crate::raft_state::IOState;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::storage::log_reader_ext::RaftLogReaderExt;
use crate::type_config::TypeConfigExt;
use crate::type_config::alias::CommittedLeaderIdOf;
use crate::type_config::alias::EffectiveMembershipOf;
use crate::type_config::alias::LeaderIdOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::MembershipStateOf;
use crate::type_config::alias::StoredMembershipOf;
use crate::type_config::alias::TermOf;
use crate::type_config::alias::VoteOf;
use crate::utime::Leased;
use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
pub struct StorageHelper<'a, C, LS, SM>
where
C: RaftTypeConfig,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
pub(crate) log_store: &'a mut LS,
pub(crate) state_machine: &'a mut SM,
id: Option<C::NodeId>,
id_str: String,
_p: PhantomData<C>,
}
impl<'a, C, LS, SM> StorageHelper<'a, C, LS, SM>
where
C: RaftTypeConfig,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
pub fn new(sto: &'a mut LS, sm: &'a mut SM) -> Self {
Self {
log_store: sto,
state_machine: sm,
id: None,
id_str: "xx".to_string(),
_p: Default::default(),
}
}
#[since(version = "0.10.0")]
pub fn with_id(mut self, id: C::NodeId) -> Self {
self.id_str = id.to_string();
self.id = Some(id);
self
}
pub async fn get_initial_state(&mut self) -> Result<RaftState<C>, StorageError<C>> {
let mut log_reader = self.log_store.get_log_reader().await;
let vote = log_reader.read_vote().await.sto_read_vote()?;
let vote = vote.unwrap_or_else(|| {
let leader_id = LeaderIdOf::<C>::new(TermOf::<C>::default(), self.id.clone().unwrap());
VoteOf::<C>::from_leader_id(leader_id, false)
});
let mut committed = self.log_store.read_committed().await.sto_read_logs()?;
let st = self.log_store.get_log_state().await.sto_read_logs()?;
let mut last_purged_log_id = st.last_purged_log_id;
let mut last_log_id = st.last_log_id;
let (last_applied, _) = self.state_machine.applied_state().await.sto_read_sm()?;
tracing::info!(
"get_initial_state: vote: {}, last_purged_log_id: {}, last_applied: {}, committed: {}, last_log_id: {}",
vote,
last_purged_log_id.display(),
last_applied.display(),
committed.display(),
last_log_id.display()
);
if committed < last_applied {
committed = last_applied.clone();
}
self.restore_from_snapshot().await?;
let (mut last_applied, _) = self.state_machine.applied_state().await.sto_read_sm()?;
if last_applied < committed {
let start = last_applied.next_index();
let end = committed.next_index();
if start < last_purged_log_id.next_index() {
let err = C::err_from_string(format!(
"Cannot re-apply logs: need logs from index {}, but purged up to {}",
start,
last_purged_log_id.display()
));
return Err(StorageError::read_log_at_index(start, err));
}
tracing::info!(
"Re-applying committed logs to restore state machine to latest state: start: {}, end: {}",
start,
end
);
self.reapply_committed(start, end).await?;
last_applied = committed.clone();
}
let mem_state = self.get_membership().await?;
if last_log_id < last_applied {
tracing::info!(
"Clean the hole between last_log_id({}) and last_applied({}) by purging logs to {}",
last_log_id.display(),
last_applied.display(),
last_applied.display(),
);
self.log_store.purge(last_applied.clone().unwrap()).await.sto_write_logs()?;
last_log_id = last_applied.clone();
last_purged_log_id = last_applied.clone();
}
tracing::info!(
"load key log ids from ({},{}]",
last_purged_log_id.display(),
last_log_id.display()
);
let log_id_list = self.get_key_log_ids(last_purged_log_id.clone(), last_log_id.clone()).await?;
let snapshot = self.state_machine.get_current_snapshot().await.sto_read_snapshot(None)?;
let snapshot = match snapshot {
None => {
if last_purged_log_id.is_some() {
let mut b = self.state_machine.try_create_snapshot_builder(true).await.unwrap();
let s = b.build_snapshot().await.sto_write_snapshot(None)?;
Some(s)
} else {
None
}
}
s @ Some(_) => s,
};
let snapshot_meta = snapshot.map(|x| x.meta).unwrap_or_default();
let io_state = IOState::new(
&self.id_str,
&vote,
last_applied.clone(),
snapshot_meta.last_log_id.clone(),
last_purged_log_id.clone(),
);
let now = C::now();
Ok(RaftState {
vote: Leased::new(now, Duration::default(), vote),
log_ids: log_id_list,
membership_state: mem_state,
snapshot_meta,
last_inflight_id: 0,
server_state: Default::default(),
io_state: Valid::new(io_state),
purge_upto: last_purged_log_id,
progress_id_gen: SharedIdGenerator::new(),
})
}
async fn restore_from_snapshot(&mut self) -> Result<(), StorageError<C>> {
let (last_applied, _) = self.state_machine.applied_state().await.sto_read_sm()?;
let snapshot = self.state_machine.get_current_snapshot().await.sto_read_snapshot(None)?;
let Some(snap) = snapshot else {
return Ok(());
};
if snap.meta.last_log_id > last_applied {
tracing::info!(
"Installing snapshot to restore transient state machine: snapshot_last_log_id: {}, last_applied: {}",
snap.meta.last_log_id.display(),
last_applied.display()
);
self.state_machine
.install_snapshot(&snap.meta, snap.snapshot)
.await
.sto_write_snapshot(Some(snap.meta.signature()))?;
tracing::info!(
"Snapshot installed, state machine restored to snapshot position: new_last_applied: {}",
snap.meta.last_log_id.display()
);
}
Ok(())
}
pub(crate) async fn reapply_committed(&mut self, mut start: u64, end: u64) -> Result<(), StorageError<C>> {
let chunk_size = 64;
tracing::info!(
"re-apply log [{}..{}) in {} item chunks to state machine",
start,
end,
chunk_size,
);
let mut log_reader = self.log_store.get_log_reader().await;
while start < end {
let chunk_end = std::cmp::min(end, start + chunk_size);
let entries = log_reader.try_get_log_entries(start..chunk_end).await.sto_read_logs()?;
let first = entries.first().map(|ent| ent.index());
let last = entries.last().map(|ent| ent.index());
let make_err = || {
let err = C::err_from_string(format!(
"Failed to get log entries, expected index: [{}, {}), got [{:?}, {:?})",
start, chunk_end, first, last
));
tracing::error!("{}", err);
err
};
if first != Some(start) {
return Err(StorageError::read_log_at_index(start, make_err()));
}
if last != Some(chunk_end - 1) {
return Err(StorageError::read_log_at_index(chunk_end - 1, make_err()));
}
tracing::info!(
"re-apply {} log entries: [{}, {}),",
chunk_end - start,
start,
chunk_end
);
let last_applied = entries.last().map(|e| e.log_id()).unwrap();
let apply_items = entries.into_iter().map(|entry| Ok((entry, None)));
let apply_stream = futures_util::stream::iter(apply_items);
self.state_machine.apply(apply_stream).await.sto_apply(last_applied)?;
start = chunk_end;
}
Ok(())
}
pub async fn get_membership(&mut self) -> Result<MembershipStateOf<C>, StorageError<C>> {
let (last_applied, sm_mem) = self.state_machine.applied_state().await.sto_read_sm()?;
let log_mem = self.last_membership_in_log(last_applied.next_index()).await?;
tracing::debug!(
"{}: membership_in_sm={:?}, membership_in_log={:?}",
func_name!(),
sm_mem,
log_mem
);
if log_mem.len() == 2 {
return Ok(MembershipStateOf::<C>::new(
Arc::new(EffectiveMembershipOf::<C>::new_from_stored_membership(
log_mem[0].clone(),
)),
Arc::new(EffectiveMembershipOf::<C>::new_from_stored_membership(
log_mem[1].clone(),
)),
));
}
let effective = if log_mem.is_empty() {
EffectiveMembershipOf::<C>::new_from_stored_membership(sm_mem.clone())
} else {
EffectiveMembershipOf::<C>::new_from_stored_membership(log_mem[0].clone())
};
let res = MembershipStateOf::<C>::new(
Arc::new(EffectiveMembershipOf::<C>::new_from_stored_membership(sm_mem)),
Arc::new(effective),
);
Ok(res)
}
#[tracing::instrument(level = "trace", skip_all)]
pub async fn last_membership_in_log(
&mut self,
since_index: u64,
) -> Result<Vec<StoredMembershipOf<C>>, StorageError<C>> {
let st = self.log_store.get_log_state().await.sto_read_logs()?;
let mut end = st.last_log_id.next_index();
tracing::info!("load membership from log: [{}..{})", since_index, end);
let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index);
let step = 64;
let mut res = vec![];
let mut log_reader = self.log_store.get_log_reader().await;
while start < end {
let step_start = std::cmp::max(start, end.saturating_sub(step));
let entries = log_reader.try_get_log_entries(step_start..end).await.sto_read_logs()?;
for ent in entries.iter().rev() {
if let Some(mem) = ent.get_membership() {
let em = StoredMembershipOf::<C>::new(Some(ent.log_id()), mem);
res.insert(0, em);
if res.len() == 2 {
return Ok(res);
}
}
}
end = end.saturating_sub(step);
}
Ok(res)
}
async fn get_key_log_ids(
&mut self,
purged: Option<LogIdOf<C>>,
last: Option<LogIdOf<C>>,
) -> Result<LogIdList<CommittedLeaderIdOf<C>>, StorageError<C>> {
let mut log_reader = self.log_store.get_log_reader().await;
let last = match last {
None => return Ok(LogIdList::new(purged, vec![])),
Some(x) => x,
};
if purged.index() == Some(last.index()) {
return Ok(LogIdList::new(Some(last), vec![]));
}
let first = log_reader.get_log_id(purged.next_index()).await?;
let log_ids = log_reader.get_key_log_ids(first..=last).await.sto_read_logs()?;
Ok(LogIdList::new(purged, log_ids))
}
}