1use super::ballot_leader_election::Ballot;
2#[cfg(feature = "unicache")]
3use crate::{unicache::*, util::NodeId};
4use crate::{
5 util::{AcceptedMetaData, IndexEntry, LogEntry, SnapshottedEntry},
6 ClusterConfig, CompactionErr,
7};
8#[cfg(feature = "serde")]
9use serde::{Deserialize, Serialize};
10use std::{
11 cmp::Ordering,
12 error::Error,
13 fmt::Debug,
14 marker::PhantomData,
15 ops::{Bound, RangeBounds},
16};
17
18pub trait Entry: Clone + Debug {
20 #[cfg(not(feature = "serde"))]
21 type Snapshot: Snapshot<Self>;
23
24 #[cfg(feature = "serde")]
25 type Snapshot: Snapshot<Self> + Serialize + for<'a> Deserialize<'a>;
27
28 #[cfg(feature = "unicache")]
29 type Encoded: Encoded;
31 #[cfg(feature = "unicache")]
32 type Encodable: Encodable;
34 #[cfg(feature = "unicache")]
35 type NotEncodable: NotEncodable;
37
38 #[cfg(all(feature = "unicache", not(feature = "serde")))]
39 type EncodeResult: Clone + Debug;
41
42 #[cfg(all(feature = "unicache", feature = "serde"))]
43 type EncodeResult: Clone + Debug + Serialize + for<'a> Deserialize<'a>;
45
46 #[cfg(all(feature = "unicache", not(feature = "serde")))]
47 type UniCache: UniCache<T = Self>;
49 #[cfg(all(feature = "unicache", feature = "serde"))]
50 type UniCache: UniCache<T = Self> + Serialize + for<'a> Deserialize<'a>;
52}
53
54#[derive(Clone, Debug, PartialEq)]
56#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
57pub struct StopSign {
58 pub next_config: ClusterConfig,
60 pub metadata: Option<Vec<u8>>,
62}
63
64impl StopSign {
65 pub fn with(next_config: ClusterConfig, metadata: Option<Vec<u8>>) -> Self {
67 StopSign {
68 next_config,
69 metadata,
70 }
71 }
72}
73
74#[allow(missing_docs)]
76#[derive(Clone, Debug)]
77#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
78pub enum SnapshotType<T>
79where
80 T: Entry,
81{
82 Complete(T::Snapshot),
83 Delta(T::Snapshot),
84}
85
86pub trait Snapshot<T>: Clone + Debug
88where
89 T: Entry,
90{
91 fn create(entries: &[T]) -> Self;
93
94 fn merge(&mut self, delta: Self);
96
97 fn use_snapshots() -> bool;
99
100 }
102
103pub type StorageResult<T> = Result<T, Box<dyn Error>>;
105
106pub trait Storage<T>
108where
109 T: Entry,
110{
111 fn append_entry(&mut self, entry: T) -> StorageResult<u64>;
113
114 fn append_entries(&mut self, entries: Vec<T>) -> StorageResult<u64>;
116
117 fn append_on_prefix(&mut self, from_idx: u64, entries: Vec<T>) -> StorageResult<u64>;
119
120 fn set_promise(&mut self, n_prom: Ballot) -> StorageResult<()>;
122
123 fn set_decided_idx(&mut self, ld: u64) -> StorageResult<()>;
125
126 fn get_decided_idx(&self) -> StorageResult<u64>;
128
129 fn set_accepted_round(&mut self, na: Ballot) -> StorageResult<()>;
131
132 fn get_accepted_round(&self) -> StorageResult<Option<Ballot>>;
135
136 fn get_entries(&self, from: u64, to: u64) -> StorageResult<Vec<T>>;
139
140 fn get_log_len(&self) -> StorageResult<u64>;
142
143 fn get_suffix(&self, from: u64) -> StorageResult<Vec<T>>;
145
146 fn get_promise(&self) -> StorageResult<Option<Ballot>>;
148
149 fn set_stopsign(&mut self, s: Option<StopSign>) -> StorageResult<()>;
151
152 fn get_stopsign(&self) -> StorageResult<Option<StopSign>>;
154
155 fn trim(&mut self, idx: u64) -> StorageResult<()>;
157
158 fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()>;
160
161 fn get_compacted_idx(&self) -> StorageResult<u64>;
163
164 fn set_snapshot(&mut self, snapshot: Option<T::Snapshot>) -> StorageResult<()>;
166
167 fn get_snapshot(&self) -> StorageResult<Option<T::Snapshot>>;
169}
170
171#[derive(Copy, Clone, Debug, Eq, PartialEq)]
173#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
174pub struct NoSnapshot;
175
176impl<T: Entry> Snapshot<T> for NoSnapshot {
177 fn create(_entries: &[T]) -> Self {
178 panic!("NoSnapshot should not be created");
179 }
180
181 fn merge(&mut self, _delta: Self) {
182 panic!("NoSnapshot should not be merged");
183 }
184
185 fn use_snapshots() -> bool {
186 false
187 }
188}
189
190pub(crate) enum RollbackValue<T: Entry> {
193 DecidedIdx(u64),
194 AcceptedRound(Ballot),
195 Log(Vec<T>),
196 Snapshot(u64, Option<T::Snapshot>),
198}
199
200struct StateCache<T>
202where
203 T: Entry,
204{
205 #[cfg(feature = "unicache")]
206 pid: NodeId,
208 batch_size: usize,
210 batched_entries: Vec<T>,
212 promise: Ballot,
214 accepted_round: Ballot,
216 decided_idx: u64,
218 compacted_idx: u64,
220 real_log_len: u64,
222 stopsign: Option<StopSign>,
224 #[cfg(feature = "unicache")]
225 batched_processed_by_leader: Vec<T::EncodeResult>,
227 #[cfg(feature = "unicache")]
228 unicache: T::UniCache,
229}
230
231impl<T> StateCache<T>
232where
233 T: Entry,
234{
235 pub fn new(config: InternalStorageConfig, #[cfg(feature = "unicache")] pid: NodeId) -> Self {
236 StateCache {
237 #[cfg(feature = "unicache")]
238 pid,
239 batch_size: config.batch_size,
240 batched_entries: Vec::with_capacity(config.batch_size),
241 promise: Ballot::default(),
242 accepted_round: Ballot::default(),
243 decided_idx: 0,
244 compacted_idx: 0,
245 real_log_len: 0,
246 stopsign: None,
247 #[cfg(feature = "unicache")]
248 batched_processed_by_leader: Vec::with_capacity(config.batch_size),
249 #[cfg(feature = "unicache")]
250 unicache: T::UniCache::new(),
251 }
252 }
253
254 fn get_accepted_idx(&self) -> u64 {
256 let log_len = self.compacted_idx + self.real_log_len;
257 if self.stopsign.is_some() {
258 log_len + 1
259 } else {
260 log_len
261 }
262 }
263
264 fn stopsign_is_decided(&self) -> bool {
266 self.stopsign.is_some() && self.decided_idx == self.get_accepted_idx()
267 }
268
269 fn append_entry(&mut self, entry: T) -> Option<Vec<T>> {
272 #[cfg(feature = "unicache")]
273 {
274 let processed = self.unicache.try_encode(&entry);
275 self.batched_processed_by_leader.push(processed);
276 }
277 self.batched_entries.push(entry);
278 self.take_entries_if_batch_is_full()
279 }
280
281 fn append_entries(&mut self, entries: Vec<T>) -> Option<Vec<T>> {
284 #[cfg(feature = "unicache")]
285 {
286 if self.promise.pid == self.pid {
287 for entry in &entries {
289 let processed = self.unicache.try_encode(entry);
290 self.batched_processed_by_leader.push(processed);
291 }
292 }
293 }
294 self.batched_entries.extend(entries);
295 self.take_entries_if_batch_is_full()
296 }
297
298 fn take_entries_if_batch_is_full(&mut self) -> Option<Vec<T>> {
300 if self.batched_entries.len() >= self.batch_size {
301 Some(self.take_batched_entries())
302 } else {
303 None
304 }
305 }
306
307 fn take_batched_entries(&mut self) -> Vec<T> {
310 std::mem::take(&mut self.batched_entries)
311 }
312
313 #[cfg(feature = "unicache")]
314 fn take_batched_processed(&mut self) -> Vec<T::EncodeResult> {
315 std::mem::take(&mut self.batched_processed_by_leader)
316 }
317}
318
319pub(crate) struct InternalStorageConfig {
320 pub(crate) batch_size: usize,
321}
322
323pub(crate) struct InternalStorage<I, T>
326where
327 I: Storage<T>,
328 T: Entry,
329{
330 storage: I,
331 state_cache: StateCache<T>,
332 _t: PhantomData<T>,
333}
334
335impl<I, T> InternalStorage<I, T>
336where
337 I: Storage<T>,
338 T: Entry,
339{
340 pub(crate) fn with(
341 storage: I,
342 config: InternalStorageConfig,
343 #[cfg(feature = "unicache")] pid: NodeId,
344 ) -> Self {
345 let mut internal_store = InternalStorage {
346 storage,
347 state_cache: StateCache::new(
348 config,
349 #[cfg(feature = "unicache")]
350 pid,
351 ),
352 _t: Default::default(),
353 };
354 internal_store.load_cache();
355 internal_store
356 }
357
358 pub(crate) fn single_rollback(&mut self, value: RollbackValue<T>) {
360 match value {
361 RollbackValue::DecidedIdx(idx) => self
362 .set_decided_idx(idx)
363 .expect("storage error while trying to write decided_idx"),
364 RollbackValue::AcceptedRound(b) => self
365 .set_accepted_round(b)
366 .expect("storage error while trying to write accepted_round"),
367 RollbackValue::Log(entries) => {
368 self.rollback_log(entries);
369 }
370 RollbackValue::Snapshot(compacted_idx, snapshot) => {
371 self.rollback_snapshot(compacted_idx, snapshot);
372 }
373 }
374 }
375
376 pub(crate) fn rollback(&mut self, values: Vec<RollbackValue<T>>) {
378 for value in values {
379 self.single_rollback(value);
380 }
381 }
382
383 pub(crate) fn rollback_and_panic(&mut self, values: Vec<RollbackValue<T>>, msg: &str) {
384 for value in values {
385 self.single_rollback(value);
386 }
387 panic!("{}", msg);
388 }
389
390 pub(crate) fn rollback_and_panic_if_err<R>(
394 &mut self,
395 result: &StorageResult<R>,
396 values: Vec<RollbackValue<T>>,
397 msg: &str,
398 ) where
399 R: Debug,
400 {
401 if result.is_err() {
402 self.rollback(values);
403 panic!("{}: {}", msg, result.as_ref().unwrap_err());
404 }
405 }
406
407 fn rollback_log(&mut self, entries: Vec<T>) {
409 self.try_trim(self.get_accepted_idx())
410 .expect("storage error while trying to trim log entries before rolling back");
411 self.append_entries_without_batching(entries)
412 .expect("storage error while trying to rollback log entries");
413 }
414
415 fn rollback_snapshot(&mut self, compacted_idx: u64, snapshot: Option<T::Snapshot>) {
417 if let Some(old_snapshot) = snapshot {
418 self.set_snapshot(compacted_idx, old_snapshot)
419 .expect("storage error while trying to rollback snapshot");
420 } else {
421 self.set_compacted_idx(compacted_idx)
422 .expect("storage error while trying to rollback compacted index");
423 self.reset_snapshot()
424 .expect("storage error while trying to reset snapshot");
425 }
426 }
427
428 fn get_entry_type(
429 &self,
430 idx: u64,
431 compacted_idx: u64,
432 virtual_log_len: u64,
433 ) -> StorageResult<Option<IndexEntry>> {
434 if idx < compacted_idx {
435 Ok(Some(IndexEntry::Compacted))
436 } else if idx < virtual_log_len {
437 Ok(Some(IndexEntry::Entry))
438 } else if idx == virtual_log_len {
439 match self.get_stopsign() {
440 Some(ss) => Ok(Some(IndexEntry::StopSign(ss))),
441 _ => Ok(None),
442 }
443 } else {
444 Ok(None)
445 }
446 }
447
448 pub(crate) fn read<R>(&self, r: R) -> StorageResult<Option<Vec<LogEntry<T>>>>
450 where
451 R: RangeBounds<u64>,
452 {
453 let from_idx = match r.start_bound() {
454 Bound::Included(i) => *i,
455 Bound::Excluded(e) => *e + 1,
456 Bound::Unbounded => 0,
457 };
458 let to_idx = match r.end_bound() {
459 Bound::Included(i) => *i + 1,
460 Bound::Excluded(e) => *e,
461 Bound::Unbounded => self.get_accepted_idx(),
462 };
463 if to_idx == 0 {
464 return Ok(None);
465 }
466 let compacted_idx = self.get_compacted_idx();
467 let log_len = self.state_cache.real_log_len + self.state_cache.compacted_idx;
468 let to_type = match self.get_entry_type(to_idx - 1, compacted_idx, log_len)? {
469 Some(IndexEntry::Compacted) => {
471 return Ok(Some(vec![self.create_compacted_entry(compacted_idx)?]))
472 }
473 Some(from_type) => from_type,
474 _ => return Ok(None),
475 };
476 let from_type = match self.get_entry_type(from_idx, compacted_idx, log_len)? {
477 Some(from_type) => from_type,
478 _ => return Ok(None),
479 };
480 let decided_idx = self.get_decided_idx();
481 match (from_type, to_type) {
482 (IndexEntry::Entry, IndexEntry::Entry) => {
483 let from_suffix_idx = from_idx - compacted_idx;
484 let to_suffix_idx = to_idx - compacted_idx;
485 Ok(Some(self.create_read_log_entries_with_real_idx(
486 from_suffix_idx,
487 to_suffix_idx,
488 compacted_idx,
489 decided_idx,
490 )?))
491 }
492 (IndexEntry::Entry, IndexEntry::StopSign(ss)) => {
493 let from_suffix_idx = from_idx - compacted_idx;
494 let to_suffix_idx = to_idx - compacted_idx - 1;
495 let mut entries = self.create_read_log_entries_with_real_idx(
496 from_suffix_idx,
497 to_suffix_idx,
498 compacted_idx,
499 decided_idx,
500 )?;
501 entries.push(LogEntry::StopSign(ss, self.stopsign_is_decided()));
502 Ok(Some(entries))
503 }
504 (IndexEntry::Compacted, IndexEntry::Entry) => {
505 let from_suffix_idx = 0;
506 let to_suffix_idx = to_idx - compacted_idx;
507 let mut entries = Vec::with_capacity((to_suffix_idx + 1) as usize);
508 let compacted = self.create_compacted_entry(compacted_idx)?;
509 entries.push(compacted);
510 let mut e = self.create_read_log_entries_with_real_idx(
511 from_suffix_idx,
512 to_suffix_idx,
513 compacted_idx,
514 decided_idx,
515 )?;
516 entries.append(&mut e);
517 Ok(Some(entries))
518 }
519 (IndexEntry::Compacted, IndexEntry::StopSign(ss)) => {
520 let from_suffix_idx = 0;
521 let to_suffix_idx = to_idx - compacted_idx - 1;
522 let mut entries = Vec::with_capacity((to_suffix_idx + 1) as usize);
523 let compacted = self.create_compacted_entry(compacted_idx)?;
524 entries.push(compacted);
525 let mut e = self.create_read_log_entries_with_real_idx(
526 from_suffix_idx,
527 to_suffix_idx,
528 compacted_idx,
529 decided_idx,
530 )?;
531 entries.append(&mut e);
532 entries.push(LogEntry::StopSign(ss, self.stopsign_is_decided()));
533 Ok(Some(entries))
534 }
535 (IndexEntry::StopSign(ss), IndexEntry::StopSign(_)) => {
536 Ok(Some(vec![LogEntry::StopSign(
537 ss,
538 self.stopsign_is_decided(),
539 )]))
540 }
541 e => {
542 unimplemented!("{}", format!("Unexpected read combination: {:?}", e))
543 }
544 }
545 }
546
547 fn create_read_log_entries_with_real_idx(
548 &self,
549 from_sfx_idx: u64,
550 to_sfx_idx: u64,
551 compacted_idx: u64,
552 decided_idx: u64,
553 ) -> StorageResult<Vec<LogEntry<T>>> {
554 let entries = self
555 .get_entries_with_real_idx(from_sfx_idx, to_sfx_idx)?
556 .into_iter()
557 .enumerate()
558 .map(|(idx, e)| {
559 let log_idx = idx as u64 + compacted_idx;
560 if log_idx > decided_idx {
561 LogEntry::Undecided(e)
562 } else {
563 LogEntry::Decided(e)
564 }
565 })
566 .collect();
567 Ok(entries)
568 }
569
570 pub(crate) fn read_decided_suffix(
572 &self,
573 from_idx: u64,
574 ) -> StorageResult<Option<Vec<LogEntry<T>>>> {
575 let decided_idx = self.get_decided_idx();
576 if from_idx < decided_idx {
577 self.read(from_idx..decided_idx)
578 } else {
579 Ok(None)
580 }
581 }
582
583 fn create_compacted_entry(&self, compacted_idx: u64) -> StorageResult<LogEntry<T>> {
584 self.storage.get_snapshot().map(|snap| match snap {
585 Some(s) => LogEntry::Snapshotted(SnapshottedEntry::with(compacted_idx, s)),
586 None => LogEntry::Trimmed(compacted_idx),
587 })
588 }
589
590 fn load_cache(&mut self) {
591 if let Some(promise) = self
593 .storage
594 .get_promise()
595 .expect("failed to load cache from storage")
596 {
597 self.state_cache.promise = promise;
598 self.state_cache.decided_idx = self.storage.get_decided_idx().unwrap();
599 self.state_cache.accepted_round = self
600 .storage
601 .get_accepted_round()
602 .unwrap()
603 .unwrap_or_default();
604 self.state_cache.compacted_idx = self.storage.get_compacted_idx().unwrap();
605 self.state_cache.real_log_len = self.storage.get_log_len().unwrap();
606 self.state_cache.stopsign = self.storage.get_stopsign().unwrap();
607 }
608 }
609
610 pub(crate) fn append_entry_with_batching(
614 &mut self,
615 entry: T,
616 ) -> StorageResult<Option<AcceptedMetaData<T>>> {
617 let append_res = self.state_cache.append_entry(entry);
618 self.flush_if_full_batch(append_res)
619 }
620
621 pub(crate) fn append_entries_with_batching(
624 &mut self,
625 entries: Vec<T>,
626 ) -> StorageResult<Option<AcceptedMetaData<T>>> {
627 let append_res = self.state_cache.append_entries(entries);
628 self.flush_if_full_batch(append_res)
629 }
630
631 fn flush_if_full_batch(
632 &mut self,
633 append_res: Option<Vec<T>>,
634 ) -> StorageResult<Option<AcceptedMetaData<T>>> {
635 if let Some(flushed_entries) = append_res {
636 let accepted_idx = self.append_entries_without_batching(flushed_entries.clone())?;
637 Ok(Some(AcceptedMetaData {
638 accepted_idx,
639 #[cfg(not(feature = "unicache"))]
640 flushed_entries,
641 #[cfg(feature = "unicache")]
642 flushed_processed: self.state_cache.take_batched_processed(),
643 }))
644 } else {
645 Ok(None)
646 }
647 }
648
649 pub(crate) fn append_entries_and_get_accepted_idx(
652 &mut self,
653 entries: Vec<T>,
654 ) -> StorageResult<Option<u64>> {
655 let append_res = self.state_cache.append_entries(entries);
656 if let Some(flushed_entries) = append_res {
657 let accepted_idx = self.append_entries_without_batching(flushed_entries)?;
658 Ok(Some(accepted_idx))
659 } else {
660 Ok(None)
661 }
662 }
663
664 #[cfg(feature = "unicache")]
665 pub(crate) fn get_unicache(&self) -> T::UniCache {
666 self.state_cache.unicache.clone()
667 }
668
669 #[cfg(feature = "unicache")]
670 pub(crate) fn set_unicache(&mut self, unicache: T::UniCache) {
671 self.state_cache.unicache = unicache;
672 }
673
674 #[cfg(feature = "unicache")]
675 pub(crate) fn append_encoded_entries_and_get_accepted_idx(
676 &mut self,
677 encoded_entries: Vec<<T as Entry>::EncodeResult>,
678 ) -> StorageResult<Option<u64>> {
679 let entries = encoded_entries
680 .into_iter()
681 .map(|x| self.state_cache.unicache.decode(x))
682 .collect();
683 self.append_entries_and_get_accepted_idx(entries)
684 }
685
686 pub(crate) fn flush_batch(&mut self) -> StorageResult<u64> {
687 #[cfg(feature = "unicache")]
688 {
689 self.state_cache.batched_processed_by_leader.clear();
691 }
692 let flushed_entries = self.state_cache.take_batched_entries();
693 self.append_entries_without_batching(flushed_entries)
694 }
695
696 pub(crate) fn append_entries_without_batching(
698 &mut self,
699 entries: Vec<T>,
700 ) -> StorageResult<u64> {
701 self.state_cache.real_log_len = self.storage.append_entries(entries)?;
702 Ok(self.get_accepted_idx())
703 }
704
705 pub(crate) fn append_on_decided_prefix(&mut self, entries: Vec<T>) -> StorageResult<u64> {
706 let decided_idx = self.get_decided_idx();
707 let compacted_idx = self.get_compacted_idx();
708 self.state_cache.real_log_len = self
709 .storage
710 .append_on_prefix(decided_idx - compacted_idx, entries)?;
711 Ok(self.get_accepted_idx())
712 }
713
714 pub(crate) fn append_on_prefix(
715 &mut self,
716 from_idx: u64,
717 entries: Vec<T>,
718 ) -> StorageResult<u64> {
719 let compacted_idx = self.get_compacted_idx();
720 self.state_cache.real_log_len = self
721 .storage
722 .append_on_prefix(from_idx - compacted_idx, entries)?;
723 Ok(self.get_accepted_idx())
724 }
725
726 pub(crate) fn set_promise(&mut self, n_prom: Ballot) -> StorageResult<()> {
727 self.state_cache.promise = n_prom;
728 self.storage.set_promise(n_prom)
729 }
730
731 pub(crate) fn set_decided_idx(&mut self, ld: u64) -> StorageResult<()> {
732 self.state_cache.decided_idx = ld;
733 self.storage.set_decided_idx(ld)
734 }
735
736 pub(crate) fn get_decided_idx(&self) -> u64 {
737 self.state_cache.decided_idx
738 }
739
740 fn get_decided_idx_without_stopsign(&self) -> u64 {
741 match self.stopsign_is_decided() {
742 true => self.get_decided_idx() - 1,
743 false => self.get_decided_idx(),
744 }
745 }
746
747 pub(crate) fn set_accepted_round(&mut self, na: Ballot) -> StorageResult<()> {
748 self.state_cache.accepted_round = na;
749 self.storage.set_accepted_round(na)
750 }
751
752 pub(crate) fn get_accepted_round(&self) -> Ballot {
753 self.state_cache.accepted_round
754 }
755
756 pub(crate) fn get_entries(&self, from: u64, to: u64) -> StorageResult<Vec<T>> {
757 let compacted_idx = self.get_compacted_idx();
758 self.get_entries_with_real_idx(from - compacted_idx.min(from), to - compacted_idx.min(to))
759 }
760
761 fn get_entries_with_real_idx(
763 &self,
764 from_sfx_idx: u64,
765 to_sfx_idx: u64,
766 ) -> StorageResult<Vec<T>> {
767 self.storage.get_entries(from_sfx_idx, to_sfx_idx)
768 }
769
770 pub(crate) fn get_accepted_idx(&self) -> u64 {
772 self.state_cache.get_accepted_idx()
773 }
774
775 pub(crate) fn get_suffix(&self, from: u64) -> StorageResult<Vec<T>> {
776 let compacted_idx = self.get_compacted_idx();
777 self.storage.get_suffix(from - compacted_idx.min(from))
778 }
779
780 pub(crate) fn get_promise(&self) -> Ballot {
781 self.state_cache.promise
782 }
783
784 pub(crate) fn set_stopsign(&mut self, s: Option<StopSign>) -> StorageResult<()> {
786 self.state_cache.stopsign = s.clone();
787 self.storage.set_stopsign(s)
788 }
789
790 pub(crate) fn get_stopsign(&self) -> Option<StopSign> {
791 self.state_cache.stopsign.clone()
792 }
793
794 pub(crate) fn stopsign_is_decided(&self) -> bool {
796 self.state_cache.stopsign_is_decided()
797 }
798
799 pub(crate) fn create_snapshot(&mut self, compact_idx: u64) -> StorageResult<T::Snapshot> {
800 let compacted_idx = self.get_compacted_idx();
801 if compact_idx < compacted_idx {
802 Err(CompactionErr::TrimmedIndex(compacted_idx))?
803 }
804 let entries = self.storage.get_entries(0, compact_idx - compacted_idx)?;
805 let delta = T::Snapshot::create(entries.as_slice());
806 match self.storage.get_snapshot()? {
807 Some(mut s) => {
808 s.merge(delta);
809 Ok(s)
810 }
811 None => Ok(delta),
812 }
813 }
814
815 fn create_decided_snapshot(&mut self) -> StorageResult<T::Snapshot> {
816 let log_decided_idx = self.get_decided_idx_without_stopsign();
817 self.create_snapshot(log_decided_idx)
818 }
819
820 pub(crate) fn get_snapshot(&self) -> StorageResult<Option<T::Snapshot>> {
821 self.storage.get_snapshot()
822 }
823
824 pub(crate) fn create_diff_snapshot(
829 &mut self,
830 from_idx: u64,
831 ) -> StorageResult<(Option<SnapshotType<T>>, u64)> {
832 let log_decided_idx = self.get_decided_idx_without_stopsign();
833 let compacted_idx = self.get_compacted_idx();
834 let snapshot = if from_idx <= compacted_idx {
835 if compacted_idx < log_decided_idx {
837 Some(SnapshotType::Complete(
838 self.create_snapshot(log_decided_idx)?,
839 ))
840 } else {
841 self.get_snapshot()?.map(|s| SnapshotType::Complete(s))
843 }
844 } else {
845 let diff_entries = self.get_entries(from_idx, log_decided_idx)?;
846 Some(SnapshotType::Delta(T::Snapshot::create(
847 diff_entries.as_slice(),
848 )))
849 };
850 Ok((snapshot, log_decided_idx))
851 }
852
853 pub(crate) fn reset_snapshot(&mut self) -> StorageResult<()> {
854 self.storage.set_snapshot(None)
855 }
856
857 pub(crate) fn set_snapshot(&mut self, idx: u64, snapshot: T::Snapshot) -> StorageResult<()> {
859 let old_compacted_idx = self.get_compacted_idx();
860 let old_snapshot = self.storage.get_snapshot()?;
861 if idx > old_compacted_idx {
862 self.set_compacted_idx(idx)?;
863 if let Err(e) = self.storage.set_snapshot(Some(snapshot)) {
864 self.set_compacted_idx(old_compacted_idx)?;
865 return Err(e);
866 }
867 let old_log_len = self.state_cache.real_log_len;
868 if let Err(e) = self.storage.trim(idx - old_compacted_idx) {
869 self.set_compacted_idx(old_compacted_idx)?;
870 self.storage.set_snapshot(old_snapshot)?;
871 return Err(e);
872 }
873 self.state_cache.real_log_len =
874 old_log_len - (idx - old_compacted_idx).min(old_log_len);
875 }
876 Ok(())
877 }
878
879 pub(crate) fn merge_snapshot(&mut self, idx: u64, delta: T::Snapshot) -> StorageResult<()> {
880 let mut snapshot = if let Some(snap) = self.storage.get_snapshot()? {
881 snap
882 } else {
883 self.create_decided_snapshot()?
884 };
885 snapshot.merge(delta);
886 self.set_snapshot(idx, snapshot)
887 }
888
889 pub(crate) fn try_trim(&mut self, idx: u64) -> StorageResult<()> {
890 let compacted_idx = self.get_compacted_idx();
891 if idx <= compacted_idx {
892 Ok(()) } else {
894 let decided_idx = self.get_decided_idx();
895 if idx <= decided_idx {
896 self.set_compacted_idx(idx)?;
897 if let Err(e) = self.storage.trim(idx - compacted_idx) {
898 self.set_compacted_idx(compacted_idx)?;
899 Err(e)
900 } else {
901 self.state_cache.real_log_len = self.storage.get_log_len()?;
902 Ok(())
903 }
904 } else {
905 Err(CompactionErr::UndecidedIndex(decided_idx))?
906 }
907 }
908 }
909
910 pub(crate) fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()> {
911 self.state_cache.compacted_idx = idx;
912 self.storage.set_compacted_idx(idx)
913 }
914
915 pub(crate) fn get_compacted_idx(&self) -> u64 {
916 self.state_cache.compacted_idx
917 }
918
919 pub(crate) fn try_snapshot(&mut self, snapshot_idx: Option<u64>) -> StorageResult<()> {
920 let decided_idx = self.get_decided_idx();
921 let log_decided_idx = self.get_decided_idx_without_stopsign();
922 let idx = match snapshot_idx {
923 Some(i) => match i.cmp(&decided_idx) {
924 Ordering::Less => i,
925 Ordering::Equal => log_decided_idx,
926 Ordering::Greater => Err(CompactionErr::UndecidedIndex(decided_idx))?,
927 },
928 None => log_decided_idx,
929 };
930 if idx > self.get_compacted_idx() {
931 let snapshot = self.create_snapshot(idx)?;
932 self.set_snapshot(idx, snapshot)?;
933 }
934 Ok(())
935 }
936}