use std::collections::BTreeMap;
use std::marker::PhantomData;
use omnipaxos::ballot_leader_election::Ballot;
use omnipaxos::storage::{Entry, StopSign, Storage, StorageResult};
use parking_lot::Mutex;
pub struct MemStorage<T: Entry> {
inner: Mutex<MemInner<T>>,
_marker: PhantomData<T>,
}
struct MemInner<T: Entry> {
log: BTreeMap<u64, T>,
promise: Option<Ballot>,
accepted_round: Option<Ballot>,
decided_idx: u64,
compacted_idx: u64,
snapshot: Option<T::Snapshot>,
stopsign: Option<StopSign>,
}
impl<T: Entry> Default for MemStorage<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Entry> MemStorage<T> {
#[must_use]
pub fn new() -> Self {
Self {
inner: Mutex::new(MemInner {
log: BTreeMap::new(),
promise: None,
accepted_round: None,
decided_idx: 0,
compacted_idx: 0,
snapshot: None,
stopsign: None,
}),
_marker: PhantomData,
}
}
}
impl<T> Storage<T> for MemStorage<T>
where
T: Entry + Clone,
T::Snapshot: Clone,
{
fn append_entry(&mut self, entry: T) -> StorageResult<u64> {
let mut inner = self.inner.lock();
let next = inner
.log
.keys()
.next_back()
.map(|key| key + 1)
.unwrap_or(inner.compacted_idx);
inner.log.insert(next, entry);
let compacted = inner.compacted_idx;
Ok((next + 1).saturating_sub(compacted))
}
fn append_entries(&mut self, entries: Vec<T>) -> StorageResult<u64> {
let mut inner = self.inner.lock();
let start = inner
.log
.keys()
.next_back()
.map(|key| key + 1)
.unwrap_or(inner.compacted_idx);
let count = entries.len() as u64;
for (offset, entry) in entries.into_iter().enumerate() {
inner.log.insert(start + offset as u64, entry);
}
let compacted = inner.compacted_idx;
Ok((start + count).saturating_sub(compacted))
}
fn append_on_prefix(&mut self, from_idx: u64, entries: Vec<T>) -> StorageResult<u64> {
let mut inner = self.inner.lock();
inner.log.retain(|key, _| *key < from_idx);
let count = entries.len() as u64;
for (offset, entry) in entries.into_iter().enumerate() {
inner.log.insert(from_idx + offset as u64, entry);
}
let compacted = inner.compacted_idx;
Ok((from_idx + count).saturating_sub(compacted))
}
fn get_entries(&self, from: u64, to: u64) -> StorageResult<Vec<T>> {
if from >= to {
return Ok(Vec::new());
}
let inner = self.inner.lock();
let mut out = Vec::with_capacity((to - from) as usize);
for idx in from..to {
match inner.log.get(&idx) {
Some(entry) => out.push(entry.clone()),
None => return Ok(Vec::new()),
}
}
Ok(out)
}
fn get_log_len(&self) -> StorageResult<u64> {
let inner = self.inner.lock();
let next_abs = inner.log.keys().next_back().map(|key| key + 1).unwrap_or(0);
Ok(next_abs.saturating_sub(inner.compacted_idx))
}
fn get_suffix(&self, from: u64) -> StorageResult<Vec<T>> {
let inner = self.inner.lock();
Ok(inner
.log
.range(from..)
.map(|(_, entry)| entry.clone())
.collect())
}
fn set_promise(&mut self, ballot: Ballot) -> StorageResult<()> {
self.inner.lock().promise = Some(ballot);
Ok(())
}
fn get_promise(&self) -> StorageResult<Option<Ballot>> {
Ok(self.inner.lock().promise)
}
fn set_accepted_round(&mut self, ballot: Ballot) -> StorageResult<()> {
self.inner.lock().accepted_round = Some(ballot);
Ok(())
}
fn get_accepted_round(&self) -> StorageResult<Option<Ballot>> {
Ok(self.inner.lock().accepted_round)
}
fn set_decided_idx(&mut self, idx: u64) -> StorageResult<()> {
self.inner.lock().decided_idx = idx;
Ok(())
}
fn get_decided_idx(&self) -> StorageResult<u64> {
Ok(self.inner.lock().decided_idx)
}
fn trim(&mut self, idx: u64) -> StorageResult<()> {
self.inner.lock().log.retain(|key, _| *key >= idx);
Ok(())
}
fn set_compacted_idx(&mut self, idx: u64) -> StorageResult<()> {
self.inner.lock().compacted_idx = idx;
Ok(())
}
fn get_compacted_idx(&self) -> StorageResult<u64> {
Ok(self.inner.lock().compacted_idx)
}
fn set_snapshot(&mut self, snapshot: Option<T::Snapshot>) -> StorageResult<()> {
self.inner.lock().snapshot = snapshot;
Ok(())
}
fn get_snapshot(&self) -> StorageResult<Option<T::Snapshot>> {
Ok(self.inner.lock().snapshot.clone())
}
fn set_stopsign(&mut self, stopsign: Option<StopSign>) -> StorageResult<()> {
self.inner.lock().stopsign = stopsign;
Ok(())
}
fn get_stopsign(&self) -> StorageResult<Option<StopSign>> {
Ok(self.inner.lock().stopsign.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use omnipaxos::ballot_leader_election::Ballot;
use omnipaxos::storage::Storage;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct Cmd(u64);
impl omnipaxos::storage::Entry for Cmd {
type Snapshot = Snap;
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
struct Snap(u64);
impl omnipaxos::storage::Snapshot<Cmd> for Snap {
fn create(entries: &[Cmd]) -> Self {
Self(entries.iter().map(|cmd| cmd.0).max().unwrap_or(0))
}
fn merge(&mut self, other: Self) {
self.0 = self.0.max(other.0);
}
fn use_snapshots() -> bool {
true
}
}
#[test]
fn empty_log_defaults() {
let storage: MemStorage<Cmd> = MemStorage::new();
assert_eq!(storage.get_log_len().unwrap(), 0);
assert_eq!(storage.get_decided_idx().unwrap(), 0);
assert_eq!(storage.get_compacted_idx().unwrap(), 0);
assert!(storage.get_promise().unwrap().is_none());
assert!(storage.get_accepted_round().unwrap().is_none());
assert!(storage.get_snapshot().unwrap().is_none());
assert!(storage.get_stopsign().unwrap().is_none());
}
#[test]
fn append_and_read_round_trip() {
let mut storage: MemStorage<Cmd> = MemStorage::new();
let len = storage
.append_entries(vec![Cmd(1), Cmd(2), Cmd(3)])
.unwrap();
assert_eq!(len, 3);
let suffix = storage.get_suffix(0).unwrap();
assert_eq!(suffix.len(), 3);
assert_eq!(suffix[0].0, 1);
assert_eq!(suffix[2].0, 3);
}
#[test]
fn get_entries_returns_empty_on_gap() {
let mut storage: MemStorage<Cmd> = MemStorage::new();
storage.append_entries(vec![Cmd(1), Cmd(2)]).unwrap();
assert!(storage.get_entries(0, 5).unwrap().is_empty());
let present = storage.get_entries(0, 2).unwrap();
assert_eq!(present.len(), 2);
}
#[test]
fn trim_then_compacted_yields_physical_remaining_length() {
let mut storage: MemStorage<Cmd> = MemStorage::new();
storage
.append_entries(vec![Cmd(1), Cmd(2), Cmd(3), Cmd(4)])
.unwrap();
storage.trim(2).unwrap();
storage.set_compacted_idx(2).unwrap();
assert_eq!(storage.get_log_len().unwrap(), 2);
let suffix = storage.get_suffix(2).unwrap();
assert_eq!(suffix.len(), 2);
assert_eq!(suffix[0].0, 3);
}
#[test]
fn append_entry_after_full_trim_writes_at_compacted_idx() {
let mut storage: MemStorage<Cmd> = MemStorage::new();
storage
.append_entries(vec![Cmd(1), Cmd(2), Cmd(3), Cmd(4)])
.unwrap();
storage.trim(4).unwrap();
storage.set_compacted_idx(4).unwrap();
let new_len = storage.append_entry(Cmd(99)).unwrap();
assert_eq!(new_len, 1);
assert_eq!(storage.get_log_len().unwrap(), 1);
let suffix = storage.get_suffix(4).unwrap();
assert_eq!(suffix.len(), 1, "entry must be reachable at absolute idx 4");
assert_eq!(suffix[0].0, 99);
let range = storage.get_entries(4, 5).unwrap();
assert_eq!(range.len(), 1);
assert_eq!(range[0].0, 99);
assert_eq!(storage.get_suffix(0).unwrap().len(), 1);
}
#[test]
fn append_entries_after_full_trim_writes_at_compacted_idx() {
let mut storage: MemStorage<Cmd> = MemStorage::new();
storage
.append_entries(vec![Cmd(1), Cmd(2), Cmd(3)])
.unwrap();
storage.trim(3).unwrap();
storage.set_compacted_idx(3).unwrap();
let new_len = storage.append_entries(vec![Cmd(77), Cmd(88)]).unwrap();
assert_eq!(new_len, 2);
assert_eq!(storage.get_log_len().unwrap(), 2);
let suffix = storage.get_suffix(3).unwrap();
assert_eq!(suffix.len(), 2, "entries must continue from absolute idx 3");
assert_eq!(suffix[0].0, 77);
assert_eq!(suffix[1].0, 88);
assert_eq!(storage.get_suffix(0).unwrap().len(), 2);
}
#[test]
fn promise_round_trip() {
let mut storage: MemStorage<Cmd> = MemStorage::new();
let ballot = Ballot {
config_id: 1,
n: 5,
priority: 0,
pid: 2,
};
storage.set_promise(ballot).unwrap();
let got = storage.get_promise().unwrap().expect("present");
assert_eq!(got.n, 5);
assert_eq!(got.pid, 2);
}
#[test]
fn snapshot_round_trip_and_clear() {
let mut storage: MemStorage<Cmd> = MemStorage::new();
storage.set_snapshot(Some(Snap(99))).unwrap();
assert_eq!(storage.get_snapshot().unwrap().expect("present").0, 99);
storage.set_snapshot(None).unwrap();
assert!(storage.get_snapshot().unwrap().is_none());
}
}