1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
use super::{Ballot, Entry}; use crate::Index; use std::collections::{BTreeMap, BTreeSet}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; pub struct Storage { entries: Arc<RwLock<BTreeMap<u64, super::Entry>>>, ballot: Arc<Mutex<Ballot>>, snapshot_index: AtomicU64, tags: Arc<RwLock<BTreeMap<u64, crate::SnapshotTag>>>, } impl Storage { pub fn new() -> Self { Self { entries: Arc::new(RwLock::new(BTreeMap::new())), ballot: Arc::new(Mutex::new(Ballot::new())), snapshot_index: AtomicU64::new(0), tags: Arc::new(RwLock::new(BTreeMap::new())), } } } use anyhow::Result; #[async_trait::async_trait] impl super::RaftStorage for Storage { async fn delete_tag(&self, i: Index) -> Result<()> { self.tags.write().await.remove(&i); Ok(()) } async fn list_tags(&self) -> Result<BTreeSet<Index>> { let mut r = BTreeSet::new(); for k in self.tags.read().await.keys() { r.insert(*k); } Ok(r) } async fn get_tag(&self, i: Index) -> Result<Option<crate::SnapshotTag>> { let r = self.tags.read().await.get(&i).cloned(); Ok(r) } async fn put_tag(&self, i: Index, x: crate::SnapshotTag) -> Result<()> { self.tags.write().await.insert(i, x); Ok(()) } async fn get_last_index(&self) -> Result<Index> { let x = self.entries.read().await; let r = match x.iter().next_back() { Some((k, _)) => *k, None => 0, }; Ok(r) } async fn delete_before(&self, r: u64) -> Result<()> { let ls: Vec<u64> = self.entries.read().await.range(..r).map(|x| *x.0).collect(); for i in ls { self.entries.write().await.remove(&i); } let ls: Vec<u64> = self.tags.read().await.range(..r).map(|x| *x.0).collect(); for i in ls { self.tags.write().await.remove(&i); } Ok(()) } async fn insert_snapshot(&self, i: Index, e: Entry) -> Result<()> { self.entries.write().await.insert(i, e); self.snapshot_index.fetch_max(i, Ordering::SeqCst); Ok(()) } async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> { self.entries.write().await.insert(i, e); Ok(()) } async fn get_entry(&self, i: Index) -> Result<Option<Entry>> { let r = self.entries.read().await.get(&i).cloned(); Ok(r) } async fn get_snapshot_index(&self) -> Result<Index> { let r = self.snapshot_index.load(Ordering::SeqCst); Ok(r) } async fn save_ballot(&self, v: Ballot) -> Result<()> { *self.ballot.lock().await = v; Ok(()) } async fn load_ballot(&self) -> Result<Ballot> { let r = self.ballot.lock().await.clone(); Ok(r) } } #[tokio::test] async fn test_mem_storage() -> Result<()> { let s = Storage::new(); super::test_storage(s).await?; Ok(()) }