1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use super::{Ballot, Entry};
use crate::Index;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};

pub struct Storage {
    entries: Arc<RwLock<BTreeMap<u64, super::Entry>>>,
    ballot: Arc<Mutex<Ballot>>,
    snapshot_index: AtomicU64,
    tags: Arc<RwLock<BTreeMap<u64, crate::SnapshotTag>>>,
}
impl Storage {
    pub fn new() -> Self {
        Self {
            entries: Arc::new(RwLock::new(BTreeMap::new())),
            ballot: Arc::new(Mutex::new(Ballot::new())),
            snapshot_index: AtomicU64::new(0),
            tags: Arc::new(RwLock::new(BTreeMap::new())),
        }
    }
}
use anyhow::Result;
#[async_trait::async_trait]
impl super::RaftStorage for Storage {
    async fn delete_tag(&self, i: Index) -> Result<()> {
        self.tags.write().await.remove(&i);
        Ok(())
    }
    async fn list_tags(&self) -> Result<BTreeSet<Index>> {
        let mut r = BTreeSet::new();
        for k in self.tags.read().await.keys() {
            r.insert(*k);
        }
        Ok(r)
    }
    async fn get_tag(&self, i: Index) -> Result<Option<crate::SnapshotTag>> {
        let r = self.tags.read().await.get(&i).cloned();
        Ok(r)
    }
    async fn put_tag(&self, i: Index, x: crate::SnapshotTag) -> Result<()> {
        self.tags.write().await.insert(i, x);
        Ok(())
    }
    async fn get_last_index(&self) -> Result<Index> {
        let x = self.entries.read().await;
        let r = match x.iter().next_back() {
            Some((k, _)) => *k,
            None => 0,
        };
        Ok(r)
    }
    async fn delete_before(&self, r: u64) -> Result<()> {
        let ls: Vec<u64> = self.entries.read().await.range(..r).map(|x| *x.0).collect();
        for i in ls {
            self.entries.write().await.remove(&i);
        }
        let ls: Vec<u64> = self.tags.read().await.range(..r).map(|x| *x.0).collect();
        for i in ls {
            self.tags.write().await.remove(&i);
        }
        Ok(())
    }
    async fn insert_snapshot(&self, i: Index, e: Entry) -> Result<()> {
        self.entries.write().await.insert(i, e);
        self.snapshot_index.fetch_max(i, Ordering::SeqCst);
        Ok(())
    }
    async fn insert_entry(&self, i: Index, e: Entry) -> Result<()> {
        self.entries.write().await.insert(i, e);
        Ok(())
    }
    async fn get_entry(&self, i: Index) -> Result<Option<Entry>> {
        let r = self.entries.read().await.get(&i).cloned();
        Ok(r)
    }
    async fn get_snapshot_index(&self) -> Result<Index> {
        let r = self.snapshot_index.load(Ordering::SeqCst);
        Ok(r)
    }
    async fn save_ballot(&self, v: Ballot) -> Result<()> {
        *self.ballot.lock().await = v;
        Ok(())
    }
    async fn load_ballot(&self) -> Result<Ballot> {
        let r = self.ballot.lock().await.clone();
        Ok(r)
    }
}

#[tokio::test]
async fn test_mem_storage() -> Result<()> {
    let s = Storage::new();
    super::test_storage(s).await?;
    Ok(())
}