rustdb/
atomfile.rs

1use crate::{
2    buf::ReadBufStg, wmap::WMap, Arc, BasicAtomicFile, Data, Limits, Mutex, RwLock, Storage,
3};
4
5/// Based on [BasicAtomicFile] which makes sure that database updates are all-or-nothing.
6/// Provides read buffering for small reads, and a thread to perform commit asyncronously.
7pub struct AtomicFile {
8    map: WMap,
9    cf: Arc<RwLock<CommitFile>>,
10    size: u64,
11    tx: std::sync::mpsc::Sender<(u64, WMap)>,
12    busy: Arc<Mutex<()>>,
13    map_lim: usize,
14}
15
16impl AtomicFile {
17    /// Construct AtomicFile with default limits. stg is the main underlying storage, upd is temporary storage for updates during commit.
18    pub fn new(stg: Box<dyn Storage>, upd: Box<dyn Storage>) -> Box<Self> {
19        Self::new_with_limits(stg, upd, &Limits::default())
20    }
21
22    /// Construct Atomic file with specified limits.
23    pub fn new_with_limits(
24        stg: Box<dyn Storage>,
25        upd: Box<dyn Storage>,
26        lim: &Limits,
27    ) -> Box<Self> {
28        let size = stg.size();
29        let mut baf = BasicAtomicFile::new(stg.clone(), upd, lim);
30        let (tx, rx) = std::sync::mpsc::channel::<(u64, WMap)>();
31        let cf = Arc::new(RwLock::new(CommitFile::new(stg, lim.rbuf_mem)));
32        let busy = Arc::new(Mutex::new(())); // Lock held while async save thread is active.
33
34        // Start the thread which does save asyncronously.
35        let (cf1, busy1) = (cf.clone(), busy.clone());
36        std::thread::spawn(move || {
37            while let Ok((size, map)) = rx.recv() {
38                let _lock = busy1.lock();
39                baf.map = map;
40                baf.commit(size);
41                cf1.write().unwrap().done_one();
42            }
43        });
44        Box::new(Self {
45            map: WMap::default(),
46            cf,
47            size,
48            tx,
49            busy,
50            map_lim: lim.map_lim,
51        })
52    }
53}
54
55impl Storage for AtomicFile {
56    fn commit(&mut self, size: u64) {
57        self.size = size;
58        if self.map.is_empty() {
59            return;
60        }
61        if self.cf.read().unwrap().map.len() > self.map_lim {
62            self.wait_complete();
63        }
64        let map = std::mem::take(&mut self.map);
65        let cf = &mut *self.cf.write().unwrap();
66        cf.todo += 1;
67        map.to_storage(cf);
68        self.tx.send((size, map)).unwrap();
69    }
70
71    fn size(&self) -> u64 {
72        self.size
73    }
74
75    fn read(&self, start: u64, data: &mut [u8]) {
76        self.map.read(start, data, &*self.cf.read().unwrap());
77    }
78
79    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
80        self.map.write(start, data, off, len);
81    }
82
83    fn write(&mut self, start: u64, data: &[u8]) {
84        let len = data.len();
85        let d = Arc::new(data.to_vec());
86        self.write_data(start, d, 0, len);
87    }
88
89    fn wait_complete(&self) {
90        while self.cf.read().unwrap().todo != 0 {
91            #[cfg(feature = "log")]
92            println!("AtomicFile::wait_complete - waiting for writer process");
93            let _x = self.busy.lock();
94        }
95    }
96}
97
98struct CommitFile {
99    stg: Box<dyn Storage>,
100    map: WMap,
101    todo: usize,
102}
103
104impl CommitFile {
105    fn new(stg: Box<dyn Storage>, buf_mem: usize) -> Self {
106        Self {
107            stg: ReadBufStg::<256>::new(stg, 50, buf_mem / 256),
108            map: WMap::default(),
109            todo: 0,
110        }
111    }
112
113    fn done_one(&mut self) {
114        self.todo -= 1;
115        if self.todo == 0 {
116            self.map = WMap::default();
117            self.stg.reset();
118        }
119    }
120}
121
122impl Storage for CommitFile {
123    fn commit(&mut self, _size: u64) {
124        panic!()
125    }
126
127    fn size(&self) -> u64 {
128        panic!()
129    }
130
131    fn read(&self, start: u64, data: &mut [u8]) {
132        self.map.read(start, data, &*self.stg);
133    }
134
135    fn write_data(&mut self, start: u64, data: Data, off: usize, len: usize) {
136        self.map.write(start, data, off, len);
137    }
138
139    fn write(&mut self, _start: u64, _data: &[u8]) {
140        panic!()
141    }
142}
143
144#[test]
145pub fn test() {
146    use crate::stg::MemFile;
147    use rand::Rng;
148    /* Idea of test is to check AtomicFile and MemFile behave the same */
149
150    let ta = crate::test::test_amount();
151
152    let mut rng = rand::thread_rng();
153
154    for _ in 0..1000 {
155        let mut s1 = AtomicFile::new(MemFile::new(), MemFile::new());
156        let mut s2 = MemFile::new();
157
158        for _ in 0..1000 * ta {
159            let off: usize = rng.gen::<usize>() % 100;
160            let mut len = 1 + rng.gen::<usize>() % 20;
161            let w: bool = rng.gen();
162            if w {
163                let mut bytes = Vec::new();
164                while len > 0 {
165                    len -= 1;
166                    let b: u8 = rng.gen::<u8>();
167                    bytes.push(b);
168                }
169                s1.write(off as u64, &bytes);
170                s2.write(off as u64, &bytes);
171            } else {
172                let mut b2 = vec![0; len];
173                let mut b3 = vec![0; len];
174                s1.read(off as u64, &mut b2);
175                s2.read(off as u64, &mut b3);
176                assert!(b2 == b3);
177            }
178        }
179    }
180}