use super::ballot_leader_election::Ballot;
#[cfg(feature = "unicache")]
use crate::{unicache::*, util::NodeId};
use crate::{
util::{AcceptedMetaData, IndexEntry, LogEntry, SnapshottedEntry},
ClusterConfig, CompactionErr,
};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
error::Error,
fmt::Debug,
marker::PhantomData,
ops::{Bound, RangeBounds},
};
pub trait Entry: Clone + Debug {
#[cfg(not(feature = "serde"))]
type Snapshot: Snapshot<Self>;
#[cfg(feature = "serde")]
type Snapshot: Snapshot<Self> + Serialize + for<'a> Deserialize<'a>;
#[cfg(feature = "unicache")]
type Encoded: Encoded;
#[cfg(feature = "unicache")]
type Encodable: Encodable;
#[cfg(feature = "unicache")]
type NotEncodable: NotEncodable;
#[cfg(all(feature = "unicache", not(feature = "serde")))]
type EncodeResult: Clone + Debug;
#[cfg(all(feature = "unicache", feature = "serde"))]
type EncodeResult: Clone + Debug + Serialize + for<'a> Deserialize<'a>;
#[cfg(all(feature = "unicache", not(feature = "serde")))]
type UniCache: UniCache<T = Self>;
#[cfg(all(feature = "unicache", feature = "serde"))]
type UniCache: UniCache<T = Self> + Serialize + for<'a> Deserialize<'a>;
}
#[derive(Clone, Debug, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct StopSign {
pub next_config: ClusterConfig,
pub metadata: Option<Vec<u8>>,
}
impl StopSign {
pub fn with(next_config: ClusterConfig, metadata: Option<Vec<u8>>) -> Self {
StopSign {
next_config,
metadata,
}
}
}
#[allow(missing_docs)]
#[derive(Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum SnapshotType<T>
where
T: Entry,
{
Complete(T::Snapshot),
Delta(T::Snapshot),
}
pub trait Snapshot<T>: Clone + Debug
where
T: Entry,
{
fn create(entries: &[T]) -> Self;
fn merge(&mut self, delta: Self);
fn use_snapshots() -> bool;
}
pub type StorageResult<T> = Result<T, Box<dyn Error>>;
pub trait Storage<T>
where
T: Entry,
{
fn append_entry(&mut self, entry: T) -> StorageResult<u64>;
fn append_entries(&mut self, entries: Vec<T>) -> StorageResult<u64>;
fn append_on_prefix(&mut self, from_idx: u64, entries: Vec<T>) -> StorageResult<u64>;
fn set_promise(&mut self, n_prom: Ballot) -> StorageResult<()>;
fn set_decided_idx(&mut self, ld: u64) -> StorageResult<()>;
fn get_decided_idx(&self) -> StorageResult<u64>;
fn set_accepted_round(&mut self, na: Ballot) -> StorageResult<()>;
fn get_accepted_round(&self) -> StorageResult<Option<Ballot>>;
fn get_entries(&self, from: u64, to: u64) -> StorageResult<Vec<T>>;
fn get_log_len(&self) -> StorageResult<u64>;
fn get_suffix(&self, from: u64) -> StorageResult<Vec<T>>;
fn get_promise(&self) -> StorageResult<Option<Ballot>>;
fn set_stopsign(&mut self, s: Option<StopSign>) -> StorageResult<()>;
fn get_stopsign(&self) -> StorageResult<Option<StopSign>>;
fn trim(&mut self, idx: u64) -> StorageResult<()>;
fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()>;
fn get_compacted_idx(&self) -> StorageResult<u64>;
fn set_snapshot(&mut self, snapshot: Option<T::Snapshot>) -> StorageResult<()>;
fn get_snapshot(&self) -> StorageResult<Option<T::Snapshot>>;
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NoSnapshot;
impl<T: Entry> Snapshot<T> for NoSnapshot {
fn create(_entries: &[T]) -> Self {
panic!("NoSnapshot should not be created");
}
fn merge(&mut self, _delta: Self) {
panic!("NoSnapshot should not be merged");
}
fn use_snapshots() -> bool {
false
}
}
pub(crate) enum RollbackValue<T: Entry> {
DecidedIdx(u64),
AcceptedRound(Ballot),
Log(Vec<T>),
Snapshot(u64, Option<T::Snapshot>),
}
struct StateCache<T>
where
T: Entry,
{
#[cfg(feature = "unicache")]
pid: NodeId,
batch_size: usize,
batched_entries: Vec<T>,
promise: Ballot,
accepted_round: Ballot,
decided_idx: u64,
compacted_idx: u64,
real_log_len: u64,
stopsign: Option<StopSign>,
#[cfg(feature = "unicache")]
batched_processed_by_leader: Vec<T::EncodeResult>,
#[cfg(feature = "unicache")]
unicache: T::UniCache,
}
impl<T> StateCache<T>
where
T: Entry,
{
pub fn new(config: InternalStorageConfig, #[cfg(feature = "unicache")] pid: NodeId) -> Self {
StateCache {
#[cfg(feature = "unicache")]
pid,
batch_size: config.batch_size,
batched_entries: Vec::with_capacity(config.batch_size),
promise: Ballot::default(),
accepted_round: Ballot::default(),
decided_idx: 0,
compacted_idx: 0,
real_log_len: 0,
stopsign: None,
#[cfg(feature = "unicache")]
batched_processed_by_leader: Vec::with_capacity(config.batch_size),
#[cfg(feature = "unicache")]
unicache: T::UniCache::new(),
}
}
fn get_accepted_idx(&self) -> u64 {
let log_len = self.compacted_idx + self.real_log_len;
if self.stopsign.is_some() {
log_len + 1
} else {
log_len
}
}
fn stopsign_is_decided(&self) -> bool {
self.stopsign.is_some() && self.decided_idx == self.get_accepted_idx()
}
fn append_entry(&mut self, entry: T) -> Option<Vec<T>> {
#[cfg(feature = "unicache")]
{
let processed = self.unicache.try_encode(&entry);
self.batched_processed_by_leader.push(processed);
}
self.batched_entries.push(entry);
self.take_entries_if_batch_is_full()
}
fn append_entries(&mut self, entries: Vec<T>) -> Option<Vec<T>> {
#[cfg(feature = "unicache")]
{
if self.promise.pid == self.pid {
for entry in &entries {
let processed = self.unicache.try_encode(entry);
self.batched_processed_by_leader.push(processed);
}
}
}
self.batched_entries.extend(entries);
self.take_entries_if_batch_is_full()
}
fn take_entries_if_batch_is_full(&mut self) -> Option<Vec<T>> {
if self.batched_entries.len() >= self.batch_size {
Some(self.take_batched_entries())
} else {
None
}
}
fn take_batched_entries(&mut self) -> Vec<T> {
std::mem::take(&mut self.batched_entries)
}
#[cfg(feature = "unicache")]
fn take_batched_processed(&mut self) -> Vec<T::EncodeResult> {
std::mem::take(&mut self.batched_processed_by_leader)
}
}
pub(crate) struct InternalStorageConfig {
pub(crate) batch_size: usize,
}
pub(crate) struct InternalStorage<I, T>
where
I: Storage<T>,
T: Entry,
{
storage: I,
state_cache: StateCache<T>,
_t: PhantomData<T>,
}
impl<I, T> InternalStorage<I, T>
where
I: Storage<T>,
T: Entry,
{
pub(crate) fn with(
storage: I,
config: InternalStorageConfig,
#[cfg(feature = "unicache")] pid: NodeId,
) -> Self {
let mut internal_store = InternalStorage {
storage,
state_cache: StateCache::new(
config,
#[cfg(feature = "unicache")]
pid,
),
_t: Default::default(),
};
internal_store.load_cache();
internal_store
}
pub(crate) fn single_rollback(&mut self, value: RollbackValue<T>) {
match value {
RollbackValue::DecidedIdx(idx) => self
.set_decided_idx(idx)
.expect("storage error while trying to write decided_idx"),
RollbackValue::AcceptedRound(b) => self
.set_accepted_round(b)
.expect("storage error while trying to write accepted_round"),
RollbackValue::Log(entries) => {
self.rollback_log(entries);
}
RollbackValue::Snapshot(compacted_idx, snapshot) => {
self.rollback_snapshot(compacted_idx, snapshot);
}
}
}
pub(crate) fn rollback(&mut self, values: Vec<RollbackValue<T>>) {
for value in values {
self.single_rollback(value);
}
}
pub(crate) fn rollback_and_panic(&mut self, values: Vec<RollbackValue<T>>, msg: &str) {
for value in values {
self.single_rollback(value);
}
panic!("{}", msg);
}
pub(crate) fn rollback_and_panic_if_err<R>(
&mut self,
result: &StorageResult<R>,
values: Vec<RollbackValue<T>>,
msg: &str,
) where
R: Debug,
{
if result.is_err() {
self.rollback(values);
panic!("{}: {}", msg, result.as_ref().unwrap_err());
}
}
fn rollback_log(&mut self, entries: Vec<T>) {
self.try_trim(self.get_accepted_idx())
.expect("storage error while trying to trim log entries before rolling back");
self.append_entries_without_batching(entries)
.expect("storage error while trying to rollback log entries");
}
fn rollback_snapshot(&mut self, compacted_idx: u64, snapshot: Option<T::Snapshot>) {
if let Some(old_snapshot) = snapshot {
self.set_snapshot(compacted_idx, old_snapshot)
.expect("storage error while trying to rollback snapshot");
} else {
self.set_compacted_idx(compacted_idx)
.expect("storage error while trying to rollback compacted index");
self.reset_snapshot()
.expect("storage error while trying to reset snapshot");
}
}
fn get_entry_type(
&self,
idx: u64,
compacted_idx: u64,
virtual_log_len: u64,
) -> StorageResult<Option<IndexEntry>> {
if idx < compacted_idx {
Ok(Some(IndexEntry::Compacted))
} else if idx < virtual_log_len {
Ok(Some(IndexEntry::Entry))
} else if idx == virtual_log_len {
match self.get_stopsign() {
Some(ss) => Ok(Some(IndexEntry::StopSign(ss))),
_ => Ok(None),
}
} else {
Ok(None)
}
}
pub(crate) fn read<R>(&self, r: R) -> StorageResult<Option<Vec<LogEntry<T>>>>
where
R: RangeBounds<u64>,
{
let from_idx = match r.start_bound() {
Bound::Included(i) => *i,
Bound::Excluded(e) => *e + 1,
Bound::Unbounded => 0,
};
let to_idx = match r.end_bound() {
Bound::Included(i) => *i + 1,
Bound::Excluded(e) => *e,
Bound::Unbounded => self.get_accepted_idx(),
};
if to_idx == 0 {
return Ok(None);
}
let compacted_idx = self.get_compacted_idx();
let log_len = self.state_cache.real_log_len + self.state_cache.compacted_idx;
let to_type = match self.get_entry_type(to_idx - 1, compacted_idx, log_len)? {
Some(IndexEntry::Compacted) => {
return Ok(Some(vec![self.create_compacted_entry(compacted_idx)?]))
}
Some(from_type) => from_type,
_ => return Ok(None),
};
let from_type = match self.get_entry_type(from_idx, compacted_idx, log_len)? {
Some(from_type) => from_type,
_ => return Ok(None),
};
let decided_idx = self.get_decided_idx();
match (from_type, to_type) {
(IndexEntry::Entry, IndexEntry::Entry) => {
let from_suffix_idx = from_idx - compacted_idx;
let to_suffix_idx = to_idx - compacted_idx;
Ok(Some(self.create_read_log_entries_with_real_idx(
from_suffix_idx,
to_suffix_idx,
compacted_idx,
decided_idx,
)?))
}
(IndexEntry::Entry, IndexEntry::StopSign(ss)) => {
let from_suffix_idx = from_idx - compacted_idx;
let to_suffix_idx = to_idx - compacted_idx - 1;
let mut entries = self.create_read_log_entries_with_real_idx(
from_suffix_idx,
to_suffix_idx,
compacted_idx,
decided_idx,
)?;
entries.push(LogEntry::StopSign(ss, self.stopsign_is_decided()));
Ok(Some(entries))
}
(IndexEntry::Compacted, IndexEntry::Entry) => {
let from_suffix_idx = 0;
let to_suffix_idx = to_idx - compacted_idx;
let mut entries = Vec::with_capacity((to_suffix_idx + 1) as usize);
let compacted = self.create_compacted_entry(compacted_idx)?;
entries.push(compacted);
let mut e = self.create_read_log_entries_with_real_idx(
from_suffix_idx,
to_suffix_idx,
compacted_idx,
decided_idx,
)?;
entries.append(&mut e);
Ok(Some(entries))
}
(IndexEntry::Compacted, IndexEntry::StopSign(ss)) => {
let from_suffix_idx = 0;
let to_suffix_idx = to_idx - compacted_idx - 1;
let mut entries = Vec::with_capacity((to_suffix_idx + 1) as usize);
let compacted = self.create_compacted_entry(compacted_idx)?;
entries.push(compacted);
let mut e = self.create_read_log_entries_with_real_idx(
from_suffix_idx,
to_suffix_idx,
compacted_idx,
decided_idx,
)?;
entries.append(&mut e);
entries.push(LogEntry::StopSign(ss, self.stopsign_is_decided()));
Ok(Some(entries))
}
(IndexEntry::StopSign(ss), IndexEntry::StopSign(_)) => {
Ok(Some(vec![LogEntry::StopSign(
ss,
self.stopsign_is_decided(),
)]))
}
e => {
unimplemented!("{}", format!("Unexpected read combination: {:?}", e))
}
}
}
fn create_read_log_entries_with_real_idx(
&self,
from_sfx_idx: u64,
to_sfx_idx: u64,
compacted_idx: u64,
decided_idx: u64,
) -> StorageResult<Vec<LogEntry<T>>> {
let entries = self
.get_entries_with_real_idx(from_sfx_idx, to_sfx_idx)?
.into_iter()
.enumerate()
.map(|(idx, e)| {
let log_idx = idx as u64 + compacted_idx;
if log_idx > decided_idx {
LogEntry::Undecided(e)
} else {
LogEntry::Decided(e)
}
})
.collect();
Ok(entries)
}
pub(crate) fn read_decided_suffix(
&self,
from_idx: u64,
) -> StorageResult<Option<Vec<LogEntry<T>>>> {
let decided_idx = self.get_decided_idx();
if from_idx < decided_idx {
self.read(from_idx..decided_idx)
} else {
Ok(None)
}
}
fn create_compacted_entry(&self, compacted_idx: u64) -> StorageResult<LogEntry<T>> {
self.storage.get_snapshot().map(|snap| match snap {
Some(s) => LogEntry::Snapshotted(SnapshottedEntry::with(compacted_idx, s)),
None => LogEntry::Trimmed(compacted_idx),
})
}
fn load_cache(&mut self) {
if let Some(promise) = self
.storage
.get_promise()
.expect("failed to load cache from storage")
{
self.state_cache.promise = promise;
self.state_cache.decided_idx = self.storage.get_decided_idx().unwrap();
self.state_cache.accepted_round = self
.storage
.get_accepted_round()
.unwrap()
.unwrap_or_default();
self.state_cache.compacted_idx = self.storage.get_compacted_idx().unwrap();
self.state_cache.real_log_len = self.storage.get_log_len().unwrap();
self.state_cache.stopsign = self.storage.get_stopsign().unwrap();
}
}
pub(crate) fn append_entry_with_batching(
&mut self,
entry: T,
) -> StorageResult<Option<AcceptedMetaData<T>>> {
let append_res = self.state_cache.append_entry(entry);
self.flush_if_full_batch(append_res)
}
pub(crate) fn append_entries_with_batching(
&mut self,
entries: Vec<T>,
) -> StorageResult<Option<AcceptedMetaData<T>>> {
let append_res = self.state_cache.append_entries(entries);
self.flush_if_full_batch(append_res)
}
fn flush_if_full_batch(
&mut self,
append_res: Option<Vec<T>>,
) -> StorageResult<Option<AcceptedMetaData<T>>> {
if let Some(flushed_entries) = append_res {
let accepted_idx = self.append_entries_without_batching(flushed_entries.clone())?;
Ok(Some(AcceptedMetaData {
accepted_idx,
#[cfg(not(feature = "unicache"))]
flushed_entries,
#[cfg(feature = "unicache")]
flushed_processed: self.state_cache.take_batched_processed(),
}))
} else {
Ok(None)
}
}
pub(crate) fn append_entries_and_get_accepted_idx(
&mut self,
entries: Vec<T>,
) -> StorageResult<Option<u64>> {
let append_res = self.state_cache.append_entries(entries);
if let Some(flushed_entries) = append_res {
let accepted_idx = self.append_entries_without_batching(flushed_entries)?;
Ok(Some(accepted_idx))
} else {
Ok(None)
}
}
#[cfg(feature = "unicache")]
pub(crate) fn get_unicache(&self) -> T::UniCache {
self.state_cache.unicache.clone()
}
#[cfg(feature = "unicache")]
pub(crate) fn set_unicache(&mut self, unicache: T::UniCache) {
self.state_cache.unicache = unicache;
}
#[cfg(feature = "unicache")]
pub(crate) fn append_encoded_entries_and_get_accepted_idx(
&mut self,
encoded_entries: Vec<<T as Entry>::EncodeResult>,
) -> StorageResult<Option<u64>> {
let entries = encoded_entries
.into_iter()
.map(|x| self.state_cache.unicache.decode(x))
.collect();
self.append_entries_and_get_accepted_idx(entries)
}
pub(crate) fn flush_batch(&mut self) -> StorageResult<u64> {
#[cfg(feature = "unicache")]
{
self.state_cache.batched_processed_by_leader.clear();
}
let flushed_entries = self.state_cache.take_batched_entries();
self.append_entries_without_batching(flushed_entries)
}
pub(crate) fn append_entries_without_batching(
&mut self,
entries: Vec<T>,
) -> StorageResult<u64> {
self.state_cache.real_log_len = self.storage.append_entries(entries)?;
Ok(self.get_accepted_idx())
}
pub(crate) fn append_on_decided_prefix(&mut self, entries: Vec<T>) -> StorageResult<u64> {
let decided_idx = self.get_decided_idx();
let compacted_idx = self.get_compacted_idx();
self.state_cache.real_log_len = self
.storage
.append_on_prefix(decided_idx - compacted_idx, entries)?;
Ok(self.get_accepted_idx())
}
pub(crate) fn append_on_prefix(
&mut self,
from_idx: u64,
entries: Vec<T>,
) -> StorageResult<u64> {
let compacted_idx = self.get_compacted_idx();
self.state_cache.real_log_len = self
.storage
.append_on_prefix(from_idx - compacted_idx, entries)?;
Ok(self.get_accepted_idx())
}
pub(crate) fn set_promise(&mut self, n_prom: Ballot) -> StorageResult<()> {
self.state_cache.promise = n_prom;
self.storage.set_promise(n_prom)
}
pub(crate) fn set_decided_idx(&mut self, ld: u64) -> StorageResult<()> {
self.state_cache.decided_idx = ld;
self.storage.set_decided_idx(ld)
}
pub(crate) fn get_decided_idx(&self) -> u64 {
self.state_cache.decided_idx
}
fn get_decided_idx_without_stopsign(&self) -> u64 {
match self.stopsign_is_decided() {
true => self.get_decided_idx() - 1,
false => self.get_decided_idx(),
}
}
pub(crate) fn set_accepted_round(&mut self, na: Ballot) -> StorageResult<()> {
self.state_cache.accepted_round = na;
self.storage.set_accepted_round(na)
}
pub(crate) fn get_accepted_round(&self) -> Ballot {
self.state_cache.accepted_round
}
pub(crate) fn get_entries(&self, from: u64, to: u64) -> StorageResult<Vec<T>> {
let compacted_idx = self.get_compacted_idx();
self.get_entries_with_real_idx(from - compacted_idx.min(from), to - compacted_idx.min(to))
}
fn get_entries_with_real_idx(
&self,
from_sfx_idx: u64,
to_sfx_idx: u64,
) -> StorageResult<Vec<T>> {
self.storage.get_entries(from_sfx_idx, to_sfx_idx)
}
pub(crate) fn get_accepted_idx(&self) -> u64 {
self.state_cache.get_accepted_idx()
}
pub(crate) fn get_suffix(&self, from: u64) -> StorageResult<Vec<T>> {
let compacted_idx = self.get_compacted_idx();
self.storage.get_suffix(from - compacted_idx.min(from))
}
pub(crate) fn get_promise(&self) -> Ballot {
self.state_cache.promise
}
pub(crate) fn set_stopsign(&mut self, s: Option<StopSign>) -> StorageResult<()> {
self.state_cache.stopsign = s.clone();
self.storage.set_stopsign(s)
}
pub(crate) fn get_stopsign(&self) -> Option<StopSign> {
self.state_cache.stopsign.clone()
}
pub(crate) fn stopsign_is_decided(&self) -> bool {
self.state_cache.stopsign_is_decided()
}
pub(crate) fn create_snapshot(&mut self, compact_idx: u64) -> StorageResult<T::Snapshot> {
let compacted_idx = self.get_compacted_idx();
if compact_idx < compacted_idx {
Err(CompactionErr::TrimmedIndex(compacted_idx))?
}
let entries = self.storage.get_entries(0, compact_idx - compacted_idx)?;
let delta = T::Snapshot::create(entries.as_slice());
match self.storage.get_snapshot()? {
Some(mut s) => {
s.merge(delta);
Ok(s)
}
None => Ok(delta),
}
}
fn create_decided_snapshot(&mut self) -> StorageResult<T::Snapshot> {
let log_decided_idx = self.get_decided_idx_without_stopsign();
self.create_snapshot(log_decided_idx)
}
pub(crate) fn get_snapshot(&self) -> StorageResult<Option<T::Snapshot>> {
self.storage.get_snapshot()
}
pub(crate) fn create_diff_snapshot(
&mut self,
from_idx: u64,
) -> StorageResult<(Option<SnapshotType<T>>, u64)> {
let log_decided_idx = self.get_decided_idx_without_stopsign();
let compacted_idx = self.get_compacted_idx();
let snapshot = if from_idx <= compacted_idx {
if compacted_idx < log_decided_idx {
Some(SnapshotType::Complete(
self.create_snapshot(log_decided_idx)?,
))
} else {
self.get_snapshot()?.map(|s| SnapshotType::Complete(s))
}
} else {
let diff_entries = self.get_entries(from_idx, log_decided_idx)?;
Some(SnapshotType::Delta(T::Snapshot::create(
diff_entries.as_slice(),
)))
};
Ok((snapshot, log_decided_idx))
}
pub(crate) fn reset_snapshot(&mut self) -> StorageResult<()> {
self.storage.set_snapshot(None)
}
pub(crate) fn set_snapshot(&mut self, idx: u64, snapshot: T::Snapshot) -> StorageResult<()> {
let old_compacted_idx = self.get_compacted_idx();
let old_snapshot = self.storage.get_snapshot()?;
if idx > old_compacted_idx {
self.set_compacted_idx(idx)?;
if let Err(e) = self.storage.set_snapshot(Some(snapshot)) {
self.set_compacted_idx(old_compacted_idx)?;
return Err(e);
}
let old_log_len = self.state_cache.real_log_len;
if let Err(e) = self.storage.trim(idx - old_compacted_idx) {
self.set_compacted_idx(old_compacted_idx)?;
self.storage.set_snapshot(old_snapshot)?;
return Err(e);
}
self.state_cache.real_log_len =
old_log_len - (idx - old_compacted_idx).min(old_log_len);
}
Ok(())
}
pub(crate) fn merge_snapshot(&mut self, idx: u64, delta: T::Snapshot) -> StorageResult<()> {
let mut snapshot = if let Some(snap) = self.storage.get_snapshot()? {
snap
} else {
self.create_decided_snapshot()?
};
snapshot.merge(delta);
self.set_snapshot(idx, snapshot)
}
pub(crate) fn try_trim(&mut self, idx: u64) -> StorageResult<()> {
let compacted_idx = self.get_compacted_idx();
if idx <= compacted_idx {
Ok(()) } else {
let decided_idx = self.get_decided_idx();
if idx <= decided_idx {
self.set_compacted_idx(idx)?;
if let Err(e) = self.storage.trim(idx - compacted_idx) {
self.set_compacted_idx(compacted_idx)?;
Err(e)
} else {
self.state_cache.real_log_len = self.storage.get_log_len()?;
Ok(())
}
} else {
Err(CompactionErr::UndecidedIndex(decided_idx))?
}
}
}
pub(crate) fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()> {
self.state_cache.compacted_idx = idx;
self.storage.set_compacted_idx(idx)
}
pub(crate) fn get_compacted_idx(&self) -> u64 {
self.state_cache.compacted_idx
}
pub(crate) fn try_snapshot(&mut self, snapshot_idx: Option<u64>) -> StorageResult<()> {
let decided_idx = self.get_decided_idx();
let log_decided_idx = self.get_decided_idx_without_stopsign();
let idx = match snapshot_idx {
Some(i) => match i.cmp(&decided_idx) {
Ordering::Less => i,
Ordering::Equal => log_decided_idx,
Ordering::Greater => Err(CompactionErr::UndecidedIndex(decided_idx))?,
},
None => log_decided_idx,
};
if idx > self.get_compacted_idx() {
let snapshot = self.create_snapshot(idx)?;
self.set_snapshot(idx, snapshot)?;
}
Ok(())
}
}