omnipaxos/
storage.rs

1use super::ballot_leader_election::Ballot;
2#[cfg(feature = "unicache")]
3use crate::{unicache::*, util::NodeId};
4use crate::{
5    util::{AcceptedMetaData, IndexEntry, LogEntry, SnapshottedEntry},
6    ClusterConfig, CompactionErr,
7};
8#[cfg(feature = "serde")]
9use serde::{Deserialize, Serialize};
10use std::{
11    cmp::Ordering,
12    error::Error,
13    fmt::Debug,
14    marker::PhantomData,
15    ops::{Bound, RangeBounds},
16};
17
18/// Type of the entries stored in the log.
19pub trait Entry: Clone + Debug {
20    #[cfg(not(feature = "serde"))]
21    /// The snapshot type for this entry type.
22    type Snapshot: Snapshot<Self>;
23
24    #[cfg(feature = "serde")]
25    /// The snapshot type for this entry type.
26    type Snapshot: Snapshot<Self> + Serialize + for<'a> Deserialize<'a>;
27
28    #[cfg(feature = "unicache")]
29    /// The encoded type of some data. If there is a cache hit in UniCache, the data will be replaced and get sent over the network as this type instead. E.g., if `u8` then the cached `Entry` (or field of it) will be sent as `u8` instead.
30    type Encoded: Encoded;
31    #[cfg(feature = "unicache")]
32    /// The type representing the encodable parts of an `Entry`. It can be set to `Self` if the whole `Entry` is cachable. See docs of `pre_process()` for an example of deriving `Encodable` from an `Entry`.
33    type Encodable: Encodable;
34    #[cfg(feature = "unicache")]
35    /// The type representing the **NOT** encodable parts of an `Entry`. Any `NotEncodable` data will be transmitted in its original form, without encoding. It can be set to `()` if the whole `Entry` is cachable. See docs of `pre_process()` for an example.
36    type NotEncodable: NotEncodable;
37
38    #[cfg(all(feature = "unicache", not(feature = "serde")))]
39    /// The type that represents if there was a cache hit or miss in UniCache.
40    type EncodeResult: Clone + Debug;
41
42    #[cfg(all(feature = "unicache", feature = "serde"))]
43    /// The type that represents the results of trying to encode i.e., if there was a cache hit or miss in UniCache.
44    type EncodeResult: Clone + Debug + Serialize + for<'a> Deserialize<'a>;
45
46    #[cfg(all(feature = "unicache", not(feature = "serde")))]
47    /// The type that represents the results of trying to encode i.e., if there was a cache hit or miss in UniCache.
48    type UniCache: UniCache<T = Self>;
49    #[cfg(all(feature = "unicache", feature = "serde"))]
50    /// The unicache type for caching popular/re-occurring fields of an entry.
51    type UniCache: UniCache<T = Self> + Serialize + for<'a> Deserialize<'a>;
52}
53
54/// A StopSign entry that marks the end of a configuration. Used for reconfiguration.
55#[derive(Clone, Debug, PartialEq)]
56#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
57pub struct StopSign {
58    /// The new `Omnipaxos` cluster configuration
59    pub next_config: ClusterConfig,
60    /// Metadata for the reconfiguration.
61    pub metadata: Option<Vec<u8>>,
62}
63
64impl StopSign {
65    /// Creates a [`StopSign`].
66    pub fn with(next_config: ClusterConfig, metadata: Option<Vec<u8>>) -> Self {
67        StopSign {
68            next_config,
69            metadata,
70        }
71    }
72}
73
74/// Snapshot type. A `Complete` snapshot contains all snapshotted data while `Delta` has snapshotted changes since an earlier snapshot.
75#[allow(missing_docs)]
76#[derive(Clone, Debug)]
77#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
78pub enum SnapshotType<T>
79where
80    T: Entry,
81{
82    Complete(T::Snapshot),
83    Delta(T::Snapshot),
84}
85
86/// Trait for implementing snapshot operations for log entries of type `T` in OmniPaxos.
87pub trait Snapshot<T>: Clone + Debug
88where
89    T: Entry,
90{
91    /// Create a snapshot from the log `entries`.
92    fn create(entries: &[T]) -> Self;
93
94    /// Merge another snapshot `delta` into self.
95    fn merge(&mut self, delta: Self);
96
97    /// Whether `T` is snapshottable. If not, simply return `false` and leave the other functions `unimplemented!()`.
98    fn use_snapshots() -> bool;
99
100    //fn size_hint() -> u64;  // TODO: To let the system know trade-off of using entries vs snapshot?
101}
102
103/// The Result type returned by the storage API.
104pub type StorageResult<T> = Result<T, Box<dyn Error>>;
105
106/// Trait for implementing the storage backend of Sequence Paxos.
107pub trait Storage<T>
108where
109    T: Entry,
110{
111    /// Appends an entry to the end of the log and returns the log length.
112    fn append_entry(&mut self, entry: T) -> StorageResult<u64>;
113
114    /// Appends the entries of `entries` to the end of the log and returns the log length.
115    fn append_entries(&mut self, entries: Vec<T>) -> StorageResult<u64>;
116
117    /// Appends the entries of `entries` to the prefix from index `from_index` in the log and returns the log length.
118    fn append_on_prefix(&mut self, from_idx: u64, entries: Vec<T>) -> StorageResult<u64>;
119
120    /// Sets the round that has been promised.
121    fn set_promise(&mut self, n_prom: Ballot) -> StorageResult<()>;
122
123    /// Sets the decided index in the log.
124    fn set_decided_idx(&mut self, ld: u64) -> StorageResult<()>;
125
126    /// Returns the decided index in the log.
127    fn get_decided_idx(&self) -> StorageResult<u64>;
128
129    /// Sets the latest accepted round.
130    fn set_accepted_round(&mut self, na: Ballot) -> StorageResult<()>;
131
132    /// Returns the latest round in which entries have been accepted, returns `None` if no
133    /// entries have been accepted.
134    fn get_accepted_round(&self) -> StorageResult<Option<Ballot>>;
135
136    /// Returns the entries in the log in the index interval of [from, to).
137    /// If entries **do not exist for the complete interval**, an empty Vector should be returned.
138    fn get_entries(&self, from: u64, to: u64) -> StorageResult<Vec<T>>;
139
140    /// Returns the current length of the log.
141    fn get_log_len(&self) -> StorageResult<u64>;
142
143    /// Returns the suffix of entries in the log from index `from`.
144    fn get_suffix(&self, from: u64) -> StorageResult<Vec<T>>;
145
146    /// Returns the round that has been promised.
147    fn get_promise(&self) -> StorageResult<Option<Ballot>>;
148
149    /// Sets the StopSign used for reconfiguration.
150    fn set_stopsign(&mut self, s: Option<StopSign>) -> StorageResult<()>;
151
152    /// Returns the stored StopSign, returns `None` if no StopSign has been stored.
153    fn get_stopsign(&self) -> StorageResult<Option<StopSign>>;
154
155    /// Removes elements up to the given [`idx`] from storage.
156    fn trim(&mut self, idx: u64) -> StorageResult<()>;
157
158    /// Sets the compacted (i.e. trimmed or snapshotted) index.
159    fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()>;
160
161    /// Returns the garbage collector index from storage.
162    fn get_compacted_idx(&self) -> StorageResult<u64>;
163
164    /// Sets the snapshot.
165    fn set_snapshot(&mut self, snapshot: Option<T::Snapshot>) -> StorageResult<()>;
166
167    /// Returns the stored snapshot.
168    fn get_snapshot(&self) -> StorageResult<Option<T::Snapshot>>;
169}
170
171/// A place holder type for when not using snapshots. You should not use this type, it is only internally when deriving the Entry implementation.
172#[derive(Copy, Clone, Debug, Eq, PartialEq)]
173#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
174pub struct NoSnapshot;
175
176impl<T: Entry> Snapshot<T> for NoSnapshot {
177    fn create(_entries: &[T]) -> Self {
178        panic!("NoSnapshot should not be created");
179    }
180
181    fn merge(&mut self, _delta: Self) {
182        panic!("NoSnapshot should not be merged");
183    }
184
185    fn use_snapshots() -> bool {
186        false
187    }
188}
189
190/// Used to perform convenient rollbacks of storage operations on internal storage.
191/// Represents only values that can and will actually be rolled back from outside internal storage.
192pub(crate) enum RollbackValue<T: Entry> {
193    DecidedIdx(u64),
194    AcceptedRound(Ballot),
195    Log(Vec<T>),
196    /// compacted index and snapshot
197    Snapshot(u64, Option<T::Snapshot>),
198}
199
200/// A simple in-memory storage for simple state values of OmniPaxos.
201struct StateCache<T>
202where
203    T: Entry,
204{
205    #[cfg(feature = "unicache")]
206    /// Id of this node
207    pid: NodeId,
208    /// The maximum number of entries to batch.
209    batch_size: usize,
210    /// Vector which contains all the logged entries in-memory.
211    batched_entries: Vec<T>,
212    /// Last promised round.
213    promise: Ballot,
214    /// Last accepted round.
215    accepted_round: Ballot,
216    /// Length of the decided log.
217    decided_idx: u64,
218    /// Garbage collected index.
219    compacted_idx: u64,
220    /// Real length of logs in the storage.
221    real_log_len: u64,
222    /// Stopsign entry.
223    stopsign: Option<StopSign>,
224    #[cfg(feature = "unicache")]
225    /// Batch of entries that are processed (i.e., maybe encoded). Only used by the leader.
226    batched_processed_by_leader: Vec<T::EncodeResult>,
227    #[cfg(feature = "unicache")]
228    unicache: T::UniCache,
229}
230
231impl<T> StateCache<T>
232where
233    T: Entry,
234{
235    pub fn new(config: InternalStorageConfig, #[cfg(feature = "unicache")] pid: NodeId) -> Self {
236        StateCache {
237            #[cfg(feature = "unicache")]
238            pid,
239            batch_size: config.batch_size,
240            batched_entries: Vec::with_capacity(config.batch_size),
241            promise: Ballot::default(),
242            accepted_round: Ballot::default(),
243            decided_idx: 0,
244            compacted_idx: 0,
245            real_log_len: 0,
246            stopsign: None,
247            #[cfg(feature = "unicache")]
248            batched_processed_by_leader: Vec::with_capacity(config.batch_size),
249            #[cfg(feature = "unicache")]
250            unicache: T::UniCache::new(),
251        }
252    }
253
254    // Returns the index of the last accepted entry.
255    fn get_accepted_idx(&self) -> u64 {
256        let log_len = self.compacted_idx + self.real_log_len;
257        if self.stopsign.is_some() {
258            log_len + 1
259        } else {
260            log_len
261        }
262    }
263
264    // Returns whether a stopsign is decided
265    fn stopsign_is_decided(&self) -> bool {
266        self.stopsign.is_some() && self.decided_idx == self.get_accepted_idx()
267    }
268
269    // Appends an entry to the end of the `batched_entries`. If the batch is full, the
270    // batch is flushed and return flushed entries. Else, return None.
271    fn append_entry(&mut self, entry: T) -> Option<Vec<T>> {
272        #[cfg(feature = "unicache")]
273        {
274            let processed = self.unicache.try_encode(&entry);
275            self.batched_processed_by_leader.push(processed);
276        }
277        self.batched_entries.push(entry);
278        self.take_entries_if_batch_is_full()
279    }
280
281    // Appends entries to the end of the `batched_entries`. If the batch is full, the
282    // batch is flushed and return flushed entries. Else, return None.
283    fn append_entries(&mut self, entries: Vec<T>) -> Option<Vec<T>> {
284        #[cfg(feature = "unicache")]
285        {
286            if self.promise.pid == self.pid {
287                // only try encoding if we're the leader
288                for entry in &entries {
289                    let processed = self.unicache.try_encode(entry);
290                    self.batched_processed_by_leader.push(processed);
291                }
292            }
293        }
294        self.batched_entries.extend(entries);
295        self.take_entries_if_batch_is_full()
296    }
297
298    // Return batched entries if the batch is full that need to be flushed in to storage.
299    fn take_entries_if_batch_is_full(&mut self) -> Option<Vec<T>> {
300        if self.batched_entries.len() >= self.batch_size {
301            Some(self.take_batched_entries())
302        } else {
303            None
304        }
305    }
306
307    // Clears the batched entries and returns the cleared entries. If the batch is empty,
308    // return an empty vector.
309    fn take_batched_entries(&mut self) -> Vec<T> {
310        std::mem::take(&mut self.batched_entries)
311    }
312
313    #[cfg(feature = "unicache")]
314    fn take_batched_processed(&mut self) -> Vec<T::EncodeResult> {
315        std::mem::take(&mut self.batched_processed_by_leader)
316    }
317}
318
319pub(crate) struct InternalStorageConfig {
320    pub(crate) batch_size: usize,
321}
322
323/// Internal representation of storage. Hides all complexities with the compacted index
324/// such that Sequence Paxos accesses the log with the uncompacted index.
325pub(crate) struct InternalStorage<I, T>
326where
327    I: Storage<T>,
328    T: Entry,
329{
330    storage: I,
331    state_cache: StateCache<T>,
332    _t: PhantomData<T>,
333}
334
335impl<I, T> InternalStorage<I, T>
336where
337    I: Storage<T>,
338    T: Entry,
339{
340    pub(crate) fn with(
341        storage: I,
342        config: InternalStorageConfig,
343        #[cfg(feature = "unicache")] pid: NodeId,
344    ) -> Self {
345        let mut internal_store = InternalStorage {
346            storage,
347            state_cache: StateCache::new(
348                config,
349                #[cfg(feature = "unicache")]
350                pid,
351            ),
352            _t: Default::default(),
353        };
354        internal_store.load_cache();
355        internal_store
356    }
357
358    /// Writes the value.
359    pub(crate) fn single_rollback(&mut self, value: RollbackValue<T>) {
360        match value {
361            RollbackValue::DecidedIdx(idx) => self
362                .set_decided_idx(idx)
363                .expect("storage error while trying to write decided_idx"),
364            RollbackValue::AcceptedRound(b) => self
365                .set_accepted_round(b)
366                .expect("storage error while trying to write accepted_round"),
367            RollbackValue::Log(entries) => {
368                self.rollback_log(entries);
369            }
370            RollbackValue::Snapshot(compacted_idx, snapshot) => {
371                self.rollback_snapshot(compacted_idx, snapshot);
372            }
373        }
374    }
375
376    /// Writes the values.
377    pub(crate) fn rollback(&mut self, values: Vec<RollbackValue<T>>) {
378        for value in values {
379            self.single_rollback(value);
380        }
381    }
382
383    pub(crate) fn rollback_and_panic(&mut self, values: Vec<RollbackValue<T>>, msg: &str) {
384        for value in values {
385            self.single_rollback(value);
386        }
387        panic!("{}", msg);
388    }
389
390    /// This function is useful to handle `StorageResult::Error`.
391    /// If `result` is an error, this function tries to write the `values` and then panics with `msg`.
392    /// Otherwise it returns.
393    pub(crate) fn rollback_and_panic_if_err<R>(
394        &mut self,
395        result: &StorageResult<R>,
396        values: Vec<RollbackValue<T>>,
397        msg: &str,
398    ) where
399        R: Debug,
400    {
401        if result.is_err() {
402            self.rollback(values);
403            panic!("{}: {}", msg, result.as_ref().unwrap_err());
404        }
405    }
406
407    /// Rollback the log in the storage using given log entries.
408    fn rollback_log(&mut self, entries: Vec<T>) {
409        self.try_trim(self.get_accepted_idx())
410            .expect("storage error while trying to trim log entries before rolling back");
411        self.append_entries_without_batching(entries)
412            .expect("storage error while trying to rollback log entries");
413    }
414
415    /// Rollback the snapshot in the storage using given compacted_idx and snapshot.
416    fn rollback_snapshot(&mut self, compacted_idx: u64, snapshot: Option<T::Snapshot>) {
417        if let Some(old_snapshot) = snapshot {
418            self.set_snapshot(compacted_idx, old_snapshot)
419                .expect("storage error while trying to rollback snapshot");
420        } else {
421            self.set_compacted_idx(compacted_idx)
422                .expect("storage error while trying to rollback compacted index");
423            self.reset_snapshot()
424                .expect("storage error while trying to reset snapshot");
425        }
426    }
427
428    fn get_entry_type(
429        &self,
430        idx: u64,
431        compacted_idx: u64,
432        virtual_log_len: u64,
433    ) -> StorageResult<Option<IndexEntry>> {
434        if idx < compacted_idx {
435            Ok(Some(IndexEntry::Compacted))
436        } else if idx < virtual_log_len {
437            Ok(Some(IndexEntry::Entry))
438        } else if idx == virtual_log_len {
439            match self.get_stopsign() {
440                Some(ss) => Ok(Some(IndexEntry::StopSign(ss))),
441                _ => Ok(None),
442            }
443        } else {
444            Ok(None)
445        }
446    }
447
448    /// Read entries in the range `r` in the log. Returns `None` if `r` is out of bounds.
449    pub(crate) fn read<R>(&self, r: R) -> StorageResult<Option<Vec<LogEntry<T>>>>
450    where
451        R: RangeBounds<u64>,
452    {
453        let from_idx = match r.start_bound() {
454            Bound::Included(i) => *i,
455            Bound::Excluded(e) => *e + 1,
456            Bound::Unbounded => 0,
457        };
458        let to_idx = match r.end_bound() {
459            Bound::Included(i) => *i + 1,
460            Bound::Excluded(e) => *e,
461            Bound::Unbounded => self.get_accepted_idx(),
462        };
463        if to_idx == 0 {
464            return Ok(None);
465        }
466        let compacted_idx = self.get_compacted_idx();
467        let log_len = self.state_cache.real_log_len + self.state_cache.compacted_idx;
468        let to_type = match self.get_entry_type(to_idx - 1, compacted_idx, log_len)? {
469            // use to_idx-1 when getting the entry type as to_idx is exclusive
470            Some(IndexEntry::Compacted) => {
471                return Ok(Some(vec![self.create_compacted_entry(compacted_idx)?]))
472            }
473            Some(from_type) => from_type,
474            _ => return Ok(None),
475        };
476        let from_type = match self.get_entry_type(from_idx, compacted_idx, log_len)? {
477            Some(from_type) => from_type,
478            _ => return Ok(None),
479        };
480        let decided_idx = self.get_decided_idx();
481        match (from_type, to_type) {
482            (IndexEntry::Entry, IndexEntry::Entry) => {
483                let from_suffix_idx = from_idx - compacted_idx;
484                let to_suffix_idx = to_idx - compacted_idx;
485                Ok(Some(self.create_read_log_entries_with_real_idx(
486                    from_suffix_idx,
487                    to_suffix_idx,
488                    compacted_idx,
489                    decided_idx,
490                )?))
491            }
492            (IndexEntry::Entry, IndexEntry::StopSign(ss)) => {
493                let from_suffix_idx = from_idx - compacted_idx;
494                let to_suffix_idx = to_idx - compacted_idx - 1;
495                let mut entries = self.create_read_log_entries_with_real_idx(
496                    from_suffix_idx,
497                    to_suffix_idx,
498                    compacted_idx,
499                    decided_idx,
500                )?;
501                entries.push(LogEntry::StopSign(ss, self.stopsign_is_decided()));
502                Ok(Some(entries))
503            }
504            (IndexEntry::Compacted, IndexEntry::Entry) => {
505                let from_suffix_idx = 0;
506                let to_suffix_idx = to_idx - compacted_idx;
507                let mut entries = Vec::with_capacity((to_suffix_idx + 1) as usize);
508                let compacted = self.create_compacted_entry(compacted_idx)?;
509                entries.push(compacted);
510                let mut e = self.create_read_log_entries_with_real_idx(
511                    from_suffix_idx,
512                    to_suffix_idx,
513                    compacted_idx,
514                    decided_idx,
515                )?;
516                entries.append(&mut e);
517                Ok(Some(entries))
518            }
519            (IndexEntry::Compacted, IndexEntry::StopSign(ss)) => {
520                let from_suffix_idx = 0;
521                let to_suffix_idx = to_idx - compacted_idx - 1;
522                let mut entries = Vec::with_capacity((to_suffix_idx + 1) as usize);
523                let compacted = self.create_compacted_entry(compacted_idx)?;
524                entries.push(compacted);
525                let mut e = self.create_read_log_entries_with_real_idx(
526                    from_suffix_idx,
527                    to_suffix_idx,
528                    compacted_idx,
529                    decided_idx,
530                )?;
531                entries.append(&mut e);
532                entries.push(LogEntry::StopSign(ss, self.stopsign_is_decided()));
533                Ok(Some(entries))
534            }
535            (IndexEntry::StopSign(ss), IndexEntry::StopSign(_)) => {
536                Ok(Some(vec![LogEntry::StopSign(
537                    ss,
538                    self.stopsign_is_decided(),
539                )]))
540            }
541            e => {
542                unimplemented!("{}", format!("Unexpected read combination: {:?}", e))
543            }
544        }
545    }
546
547    fn create_read_log_entries_with_real_idx(
548        &self,
549        from_sfx_idx: u64,
550        to_sfx_idx: u64,
551        compacted_idx: u64,
552        decided_idx: u64,
553    ) -> StorageResult<Vec<LogEntry<T>>> {
554        let entries = self
555            .get_entries_with_real_idx(from_sfx_idx, to_sfx_idx)?
556            .into_iter()
557            .enumerate()
558            .map(|(idx, e)| {
559                let log_idx = idx as u64 + compacted_idx;
560                if log_idx > decided_idx {
561                    LogEntry::Undecided(e)
562                } else {
563                    LogEntry::Decided(e)
564                }
565            })
566            .collect();
567        Ok(entries)
568    }
569
570    /// Read all decided entries from `from_idx` in the log. Returns `None` if `from_idx` is out of bounds.
571    pub(crate) fn read_decided_suffix(
572        &self,
573        from_idx: u64,
574    ) -> StorageResult<Option<Vec<LogEntry<T>>>> {
575        let decided_idx = self.get_decided_idx();
576        if from_idx < decided_idx {
577            self.read(from_idx..decided_idx)
578        } else {
579            Ok(None)
580        }
581    }
582
583    fn create_compacted_entry(&self, compacted_idx: u64) -> StorageResult<LogEntry<T>> {
584        self.storage.get_snapshot().map(|snap| match snap {
585            Some(s) => LogEntry::Snapshotted(SnapshottedEntry::with(compacted_idx, s)),
586            None => LogEntry::Trimmed(compacted_idx),
587        })
588    }
589
590    fn load_cache(&mut self) {
591        // try to load from storage
592        if let Some(promise) = self
593            .storage
594            .get_promise()
595            .expect("failed to load cache from storage")
596        {
597            self.state_cache.promise = promise;
598            self.state_cache.decided_idx = self.storage.get_decided_idx().unwrap();
599            self.state_cache.accepted_round = self
600                .storage
601                .get_accepted_round()
602                .unwrap()
603                .unwrap_or_default();
604            self.state_cache.compacted_idx = self.storage.get_compacted_idx().unwrap();
605            self.state_cache.real_log_len = self.storage.get_log_len().unwrap();
606            self.state_cache.stopsign = self.storage.get_stopsign().unwrap();
607        }
608    }
609
610    /*** Writing ***/
611    // Append entry, if the batch size is reached, flush the batch and return the actual
612    // accepted index (not including the batched entries)
613    pub(crate) fn append_entry_with_batching(
614        &mut self,
615        entry: T,
616    ) -> StorageResult<Option<AcceptedMetaData<T>>> {
617        let append_res = self.state_cache.append_entry(entry);
618        self.flush_if_full_batch(append_res)
619    }
620
621    // Append entries in batch, if the batch size is reached, flush the batch and return the
622    // accepted index and the flushed entries. If the batch size is not reached, return None.
623    pub(crate) fn append_entries_with_batching(
624        &mut self,
625        entries: Vec<T>,
626    ) -> StorageResult<Option<AcceptedMetaData<T>>> {
627        let append_res = self.state_cache.append_entries(entries);
628        self.flush_if_full_batch(append_res)
629    }
630
631    fn flush_if_full_batch(
632        &mut self,
633        append_res: Option<Vec<T>>,
634    ) -> StorageResult<Option<AcceptedMetaData<T>>> {
635        if let Some(flushed_entries) = append_res {
636            let accepted_idx = self.append_entries_without_batching(flushed_entries.clone())?;
637            Ok(Some(AcceptedMetaData {
638                accepted_idx,
639                #[cfg(not(feature = "unicache"))]
640                flushed_entries,
641                #[cfg(feature = "unicache")]
642                flushed_processed: self.state_cache.take_batched_processed(),
643            }))
644        } else {
645            Ok(None)
646        }
647    }
648
649    // Append entries in batch, if the batch size is reached, flush the batch and return the
650    // accepted index. If the batch size is not reached, return None.
651    pub(crate) fn append_entries_and_get_accepted_idx(
652        &mut self,
653        entries: Vec<T>,
654    ) -> StorageResult<Option<u64>> {
655        let append_res = self.state_cache.append_entries(entries);
656        if let Some(flushed_entries) = append_res {
657            let accepted_idx = self.append_entries_without_batching(flushed_entries)?;
658            Ok(Some(accepted_idx))
659        } else {
660            Ok(None)
661        }
662    }
663
664    #[cfg(feature = "unicache")]
665    pub(crate) fn get_unicache(&self) -> T::UniCache {
666        self.state_cache.unicache.clone()
667    }
668
669    #[cfg(feature = "unicache")]
670    pub(crate) fn set_unicache(&mut self, unicache: T::UniCache) {
671        self.state_cache.unicache = unicache;
672    }
673
674    #[cfg(feature = "unicache")]
675    pub(crate) fn append_encoded_entries_and_get_accepted_idx(
676        &mut self,
677        encoded_entries: Vec<<T as Entry>::EncodeResult>,
678    ) -> StorageResult<Option<u64>> {
679        let entries = encoded_entries
680            .into_iter()
681            .map(|x| self.state_cache.unicache.decode(x))
682            .collect();
683        self.append_entries_and_get_accepted_idx(entries)
684    }
685
686    pub(crate) fn flush_batch(&mut self) -> StorageResult<u64> {
687        #[cfg(feature = "unicache")]
688        {
689            // clear the processed batch
690            self.state_cache.batched_processed_by_leader.clear();
691        }
692        let flushed_entries = self.state_cache.take_batched_entries();
693        self.append_entries_without_batching(flushed_entries)
694    }
695
696    // Append entries without batching, return the accepted index
697    pub(crate) fn append_entries_without_batching(
698        &mut self,
699        entries: Vec<T>,
700    ) -> StorageResult<u64> {
701        self.state_cache.real_log_len = self.storage.append_entries(entries)?;
702        Ok(self.get_accepted_idx())
703    }
704
705    pub(crate) fn append_on_decided_prefix(&mut self, entries: Vec<T>) -> StorageResult<u64> {
706        let decided_idx = self.get_decided_idx();
707        let compacted_idx = self.get_compacted_idx();
708        self.state_cache.real_log_len = self
709            .storage
710            .append_on_prefix(decided_idx - compacted_idx, entries)?;
711        Ok(self.get_accepted_idx())
712    }
713
714    pub(crate) fn append_on_prefix(
715        &mut self,
716        from_idx: u64,
717        entries: Vec<T>,
718    ) -> StorageResult<u64> {
719        let compacted_idx = self.get_compacted_idx();
720        self.state_cache.real_log_len = self
721            .storage
722            .append_on_prefix(from_idx - compacted_idx, entries)?;
723        Ok(self.get_accepted_idx())
724    }
725
726    pub(crate) fn set_promise(&mut self, n_prom: Ballot) -> StorageResult<()> {
727        self.state_cache.promise = n_prom;
728        self.storage.set_promise(n_prom)
729    }
730
731    pub(crate) fn set_decided_idx(&mut self, ld: u64) -> StorageResult<()> {
732        self.state_cache.decided_idx = ld;
733        self.storage.set_decided_idx(ld)
734    }
735
736    pub(crate) fn get_decided_idx(&self) -> u64 {
737        self.state_cache.decided_idx
738    }
739
740    fn get_decided_idx_without_stopsign(&self) -> u64 {
741        match self.stopsign_is_decided() {
742            true => self.get_decided_idx() - 1,
743            false => self.get_decided_idx(),
744        }
745    }
746
747    pub(crate) fn set_accepted_round(&mut self, na: Ballot) -> StorageResult<()> {
748        self.state_cache.accepted_round = na;
749        self.storage.set_accepted_round(na)
750    }
751
752    pub(crate) fn get_accepted_round(&self) -> Ballot {
753        self.state_cache.accepted_round
754    }
755
756    pub(crate) fn get_entries(&self, from: u64, to: u64) -> StorageResult<Vec<T>> {
757        let compacted_idx = self.get_compacted_idx();
758        self.get_entries_with_real_idx(from - compacted_idx.min(from), to - compacted_idx.min(to))
759    }
760
761    /// Get entries with real physical log indexes i.e. the index with the compacted offset.
762    fn get_entries_with_real_idx(
763        &self,
764        from_sfx_idx: u64,
765        to_sfx_idx: u64,
766    ) -> StorageResult<Vec<T>> {
767        self.storage.get_entries(from_sfx_idx, to_sfx_idx)
768    }
769
770    /// The length of the replicated log, as if log was never compacted.
771    pub(crate) fn get_accepted_idx(&self) -> u64 {
772        self.state_cache.get_accepted_idx()
773    }
774
775    pub(crate) fn get_suffix(&self, from: u64) -> StorageResult<Vec<T>> {
776        let compacted_idx = self.get_compacted_idx();
777        self.storage.get_suffix(from - compacted_idx.min(from))
778    }
779
780    pub(crate) fn get_promise(&self) -> Ballot {
781        self.state_cache.promise
782    }
783
784    /// Sets the stopsign. This function should not be used directly from sequence paxos. Instead, use accept_stopsign.
785    pub(crate) fn set_stopsign(&mut self, s: Option<StopSign>) -> StorageResult<()> {
786        self.state_cache.stopsign = s.clone();
787        self.storage.set_stopsign(s)
788    }
789
790    pub(crate) fn get_stopsign(&self) -> Option<StopSign> {
791        self.state_cache.stopsign.clone()
792    }
793
794    // Returns whether a stopsign is decided
795    pub(crate) fn stopsign_is_decided(&self) -> bool {
796        self.state_cache.stopsign_is_decided()
797    }
798
799    pub(crate) fn create_snapshot(&mut self, compact_idx: u64) -> StorageResult<T::Snapshot> {
800        let compacted_idx = self.get_compacted_idx();
801        if compact_idx < compacted_idx {
802            Err(CompactionErr::TrimmedIndex(compacted_idx))?
803        }
804        let entries = self.storage.get_entries(0, compact_idx - compacted_idx)?;
805        let delta = T::Snapshot::create(entries.as_slice());
806        match self.storage.get_snapshot()? {
807            Some(mut s) => {
808                s.merge(delta);
809                Ok(s)
810            }
811            None => Ok(delta),
812        }
813    }
814
815    fn create_decided_snapshot(&mut self) -> StorageResult<T::Snapshot> {
816        let log_decided_idx = self.get_decided_idx_without_stopsign();
817        self.create_snapshot(log_decided_idx)
818    }
819
820    pub(crate) fn get_snapshot(&self) -> StorageResult<Option<T::Snapshot>> {
821        self.storage.get_snapshot()
822    }
823
824    // Creates a Delta snapshot of entries from `from_idx` to the end of the decided log and also
825    // returns the compacted idx of the created snapshot. If the range of entries contains entries
826    // which have already been compacted a valid delta cannot be created, so creates a Complete
827    // snapshot of the entire decided log instead.
828    pub(crate) fn create_diff_snapshot(
829        &mut self,
830        from_idx: u64,
831    ) -> StorageResult<(Option<SnapshotType<T>>, u64)> {
832        let log_decided_idx = self.get_decided_idx_without_stopsign();
833        let compacted_idx = self.get_compacted_idx();
834        let snapshot = if from_idx <= compacted_idx {
835            // Some entries in range are compacted, snapshot entire decided log
836            if compacted_idx < log_decided_idx {
837                Some(SnapshotType::Complete(
838                    self.create_snapshot(log_decided_idx)?,
839                ))
840            } else {
841                // Entire decided log already snapshotted
842                self.get_snapshot()?.map(|s| SnapshotType::Complete(s))
843            }
844        } else {
845            let diff_entries = self.get_entries(from_idx, log_decided_idx)?;
846            Some(SnapshotType::Delta(T::Snapshot::create(
847                diff_entries.as_slice(),
848            )))
849        };
850        Ok((snapshot, log_decided_idx))
851    }
852
853    pub(crate) fn reset_snapshot(&mut self) -> StorageResult<()> {
854        self.storage.set_snapshot(None)
855    }
856
857    /// This operation is atomic, but non-reversible after completion
858    pub(crate) fn set_snapshot(&mut self, idx: u64, snapshot: T::Snapshot) -> StorageResult<()> {
859        let old_compacted_idx = self.get_compacted_idx();
860        let old_snapshot = self.storage.get_snapshot()?;
861        if idx > old_compacted_idx {
862            self.set_compacted_idx(idx)?;
863            if let Err(e) = self.storage.set_snapshot(Some(snapshot)) {
864                self.set_compacted_idx(old_compacted_idx)?;
865                return Err(e);
866            }
867            let old_log_len = self.state_cache.real_log_len;
868            if let Err(e) = self.storage.trim(idx - old_compacted_idx) {
869                self.set_compacted_idx(old_compacted_idx)?;
870                self.storage.set_snapshot(old_snapshot)?;
871                return Err(e);
872            }
873            self.state_cache.real_log_len =
874                old_log_len - (idx - old_compacted_idx).min(old_log_len);
875        }
876        Ok(())
877    }
878
879    pub(crate) fn merge_snapshot(&mut self, idx: u64, delta: T::Snapshot) -> StorageResult<()> {
880        let mut snapshot = if let Some(snap) = self.storage.get_snapshot()? {
881            snap
882        } else {
883            self.create_decided_snapshot()?
884        };
885        snapshot.merge(delta);
886        self.set_snapshot(idx, snapshot)
887    }
888
889    pub(crate) fn try_trim(&mut self, idx: u64) -> StorageResult<()> {
890        let compacted_idx = self.get_compacted_idx();
891        if idx <= compacted_idx {
892            Ok(()) // already trimmed or snapshotted this index.
893        } else {
894            let decided_idx = self.get_decided_idx();
895            if idx <= decided_idx {
896                self.set_compacted_idx(idx)?;
897                if let Err(e) = self.storage.trim(idx - compacted_idx) {
898                    self.set_compacted_idx(compacted_idx)?;
899                    Err(e)
900                } else {
901                    self.state_cache.real_log_len = self.storage.get_log_len()?;
902                    Ok(())
903                }
904            } else {
905                Err(CompactionErr::UndecidedIndex(decided_idx))?
906            }
907        }
908    }
909
910    pub(crate) fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()> {
911        self.state_cache.compacted_idx = idx;
912        self.storage.set_compacted_idx(idx)
913    }
914
915    pub(crate) fn get_compacted_idx(&self) -> u64 {
916        self.state_cache.compacted_idx
917    }
918
919    pub(crate) fn try_snapshot(&mut self, snapshot_idx: Option<u64>) -> StorageResult<()> {
920        let decided_idx = self.get_decided_idx();
921        let log_decided_idx = self.get_decided_idx_without_stopsign();
922        let idx = match snapshot_idx {
923            Some(i) => match i.cmp(&decided_idx) {
924                Ordering::Less => i,
925                Ordering::Equal => log_decided_idx,
926                Ordering::Greater => Err(CompactionErr::UndecidedIndex(decided_idx))?,
927            },
928            None => log_decided_idx,
929        };
930        if idx > self.get_compacted_idx() {
931            let snapshot = self.create_snapshot(idx)?;
932            self.set_snapshot(idx, snapshot)?;
933        }
934        Ok(())
935    }
936}