Skip to main content

page_store/
blockpagestg.rs

1use crate::{Data, Limits, PageStorage, PageStorageInfo, Storage, dividedstg, util};
2use dividedstg::{DividedStg, FD, FD_SIZE};
3use std::collections::BTreeSet;
4use std::sync::Arc;
5
6/*
7
8File 0 (PN_FILE) has a header ( allocation info and FDs ) then info for each numbered page, a 16-bit size and index into sub-file.
9
10First word of allocated page is 64-bit page number ( to allow relocation ).
11
12*/
13
14/// Implementation of [PageStorage].
15pub struct BlockPageStg {
16    /// Underlying Divided Storage.
17    pub ds: DividedStg,
18    alloc_pn: u64,
19    first_free_pn: u64,
20    fd: Vec<FD>,
21    free_pn: BTreeSet<u64>, // Temporary set of free page numbers.
22    header_dirty: bool,
23    is_new: bool,
24    psi: SizeInfo,
25    header_size: u64,
26    zbytes: Data,
27}
28
29const PN_FILE: usize = 0; // Page number sub-file, has header and info (size,index) for each numbered page.
30const NOT_PN: u64 = u64::MAX >> 16; // Special value to denote end of list of free page numbers.
31const PAGE_HSIZE: usize = 8; // Space for 64-bit page number to allow page to be relocated.
32const HEADER_SIZE: usize = 24; // Space in PN_FILE for storing alloc_pn, first_free_pn, max_div, sizes.
33
34impl BlockPageStg {
35    /// Construct from specified Storage and limits.
36    pub fn new(stg: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
37        let is_new = stg.size() == 0;
38
39        let sizes = lim.page_sizes;
40        let max_div = lim.max_div;
41        let ds = DividedStg::new(stg, lim.blk_cap);
42        let blk_cap = ds.blk_cap as usize;
43
44        let mut s = Self {
45            ds,
46            alloc_pn: 0,
47            first_free_pn: NOT_PN,
48            fd: Vec::new(),
49            free_pn: BTreeSet::default(),
50            header_dirty: true,
51            is_new,
52            psi: SizeInfo {
53                blk_cap,
54                max_div,
55                sizes,
56            },
57            header_size: 0,
58            zbytes: Data::default(),
59        };
60
61        if is_new {
62            for _i in 0..sizes + 1 {
63                s.fd.push(s.ds.new_file());
64            }
65            s.ds.set_root(&s.fd[0]);
66        } else {
67            s.read_header();
68        }
69
70        // Page sizes are assumed to fit in u16.
71        assert!(
72            s.psi.max_size_page() <= u16::MAX as usize,
73            "Max page size is 65535"
74        );
75
76        s.header_size = (HEADER_SIZE + s.psi.sizes * FD_SIZE) as u64;
77        s.zbytes = Arc::new(vec![0; s.psi.max_size_page()]);
78
79        #[cfg(feature = "log")]
80        println!("bps new alloc={:?}", &s.allocs());
81
82        Box::new(s)
83    }
84
85    /// Read page number file header.
86    fn read_header(&mut self) {
87        self.fd.clear();
88        self.fd.push(self.ds.get_root());
89
90        let mut buf = [0; HEADER_SIZE];
91        self.read(PN_FILE, 0, &mut buf);
92        self.alloc_pn = util::getu64(&buf, 0);
93        self.first_free_pn = util::getu64(&buf, 8);
94
95        self.psi.max_div = util::get(&buf, 16, 4) as usize;
96        self.psi.sizes = util::get(&buf, 20, 4) as usize;
97
98        let sizes = self.psi.sizes;
99        let mut buf = vec![0; FD_SIZE * sizes];
100        self.read(PN_FILE, HEADER_SIZE as u64, &mut buf);
101
102        for fx in 0..sizes {
103            let off = fx * FD_SIZE;
104            self.fd.push(self.ds.load_fd(&buf[off..]));
105        }
106        self.header_dirty = false;
107    }
108
109    /// Write page number file header.
110    fn write_header(&mut self) {
111        let mut buf = vec![0; self.header_size as usize];
112        util::setu64(&mut buf, self.alloc_pn);
113        util::setu64(&mut buf[8..], self.first_free_pn);
114        util::set(&mut buf, 16, self.psi.max_div as u64, 4);
115        util::set(&mut buf, 20, self.psi.sizes as u64, 4);
116
117        for fx in 0..self.psi.sizes {
118            let off = HEADER_SIZE + fx * FD_SIZE;
119            self.ds.save_fd(&self.fd[fx + 1], &mut buf[off..]);
120        }
121        self.write(PN_FILE, 0, &buf);
122        self.header_dirty = false;
123
124        #[cfg(feature = "log")]
125        println!("bps write_header allocs={:?}", &self.allocs());
126    }
127
128    #[cfg(feature = "log")]
129    fn allocs(&self) -> Vec<u64> {
130        (0..self.psi.sizes() + 1).map(|x| self.alloc(x)).collect()
131    }
132
133    /// Get page size of sub-file ( fx > 0 ).
134    fn page_size(&self, fx: usize) -> u64 {
135        (self.psi.size(fx) + PAGE_HSIZE) as u64
136    }
137
138    /// Get sub-file size.
139    fn fsize(&self, fx: usize) -> u64 {
140        let size = self.fd[fx].size();
141        if fx == 0 && size < self.header_size {
142            self.header_size
143        } else {
144            size
145        }
146    }
147
148    /// Use sub-file size to calculate allocation index.
149    fn alloc(&self, fx: usize) -> u64 {
150        let size = self.fsize(fx);
151        if fx == 0 {
152            (size - self.header_size) / 8
153        } else {
154            let ps = self.page_size(fx);
155            size.div_ceil(ps)
156        }
157    }
158
159    /// Free page by relocating last page in sub-file to fill gap and truncating.
160    fn free_page(&mut self, fx: usize, ix: u64) {
161        if fx != 0 {
162            let last = self.alloc(fx) - 1;
163            let ps = self.page_size(fx);
164            if last != ix {
165                let mut buf = vec![0; ps as usize];
166                self.read(fx, last * ps, &mut buf);
167                let pn = util::getu64(&buf, 0);
168                let (fx1, _size, ix1) = self.get_pn_info(pn);
169                assert!(fx1 == fx && ix1 == last);
170                self.update_ix(pn, ix);
171                self.write_data(fx, ix * ps, Arc::new(buf));
172            }
173            self.truncate(fx, last * ps);
174        }
175    }
176
177    /// Set numbered page info.
178    fn set_pn_info(&mut self, pn: u64, size: usize, ix: u64) {
179        let off = self.header_size + pn * 8;
180        let eof = self.fsize(PN_FILE);
181        if off > eof {
182            self.clear(PN_FILE, eof, off - eof);
183        }
184        let mut buf = [0; 8];
185        util::set(&mut buf, 0, ix, 6);
186        util::set(&mut buf, 6, size as u64, 2);
187        self.write(PN_FILE, off, &buf);
188    }
189
190    /// Get info about numbered page ( file index, size, index ).
191    fn get_pn_info(&self, pn: u64) -> (usize, usize, u64) {
192        let off = self.header_size + pn * 8;
193        if off >= self.fsize(0) {
194            return (0, 0, 0);
195        }
196        let mut buf = [0; 8];
197        self.read(PN_FILE, off, &mut buf);
198        let ix = util::get(&buf, 0, 6);
199        let size = util::get(&buf, 6, 2) as usize;
200        let fx = if size == 0 { 0 } else { self.psi.index(size) };
201        (fx, size, ix)
202    }
203
204    /// Update ix for numbered page ( for relocation ).
205    fn update_ix(&mut self, pn: u64, ix: u64) {
206        let off = self.header_size + pn * 8;
207        self.write(PN_FILE, off, &ix.to_le_bytes()[0..6]);
208    }
209
210    /// Clear sub-file region.
211    fn clear(&mut self, fx: usize, off: u64, n: u64) {
212        let z = Arc::new(vec![0; n as usize]);
213        self.write_data(fx, off, z);
214    }
215
216    /// Write sub-file.
217    fn write(&mut self, fx: usize, off: u64, data: &[u8]) {
218        let data = Arc::new(data.to_vec());
219        self.write_data(fx, off, data);
220    }
221
222    /// Write sub-file Data.
223    fn write_data(&mut self, fx: usize, off: u64, data: Data) {
224        let n = data.len();
225        self.write_data_n(fx, off, data, n);
226    }
227
228    /// Write sub-file Data up to n bytes.
229    fn write_data_n(&mut self, fx: usize, off: u64, data: Data, n: usize) {
230        self.ds.write_data(&mut self.fd[fx], off, data, n);
231        self.save_fd(fx);
232    }
233
234    /// Truncate sub-file.
235    fn truncate(&mut self, fx: usize, off: u64) {
236        self.ds.truncate(&mut self.fd[fx], off);
237        self.save_fd(fx);
238    }
239
240    /// Save sub-file descriptor after write or truncate operation.
241    fn save_fd(&mut self, fx: usize) {
242        let fd = &mut self.fd[fx];
243        if fd.changed {
244            fd.changed = false;
245            self.header_dirty = true;
246            if fx == 0 {
247                self.ds.set_root(fd);
248            }
249        }
250    }
251
252    /// Read sub-file.
253    fn read(&self, fx: usize, off: u64, data: &mut [u8]) {
254        self.ds.read(&self.fd[fx], off, data);
255    }
256}
257
258impl PageStorage for BlockPageStg {
259    fn is_new(&self) -> bool {
260        self.is_new
261    }
262
263    fn new_page(&mut self) -> u64 {
264        if let Some(pn) = self.free_pn.pop_first() {
265            pn
266        } else {
267            self.header_dirty = true;
268            let pn = self.first_free_pn;
269            if pn != NOT_PN {
270                let (_fx, _size, next) = self.get_pn_info(pn);
271                self.first_free_pn = next;
272                pn
273            } else {
274                let pn = self.alloc_pn;
275                self.alloc_pn += 1;
276                pn
277            }
278        }
279    }
280
281    fn drop_page(&mut self, pn: u64) {
282        self.free_pn.insert(pn);
283    }
284
285    fn info(&self) -> Box<dyn PageStorageInfo> {
286        Box::new(self.psi.clone())
287    }
288
289    fn set_page(&mut self, pn: u64, data: Data) {
290        let size = data.len();
291        let fx = self.psi.index(size);
292        let ps = self.page_size(fx);
293        let (old_fx, mut old_size, mut ix) = self.get_pn_info(pn);
294        if size != old_size {
295            if fx != old_fx {
296                self.free_page(old_fx, ix);
297                ix = self.alloc(fx);
298                old_size = ps as usize - PAGE_HSIZE;
299                self.write(fx, ix * ps, &pn.to_le_bytes());
300                self.set_pn_info(pn, size, ix);
301            }
302            self.set_pn_info(pn, size, ix);
303        }
304
305        let off = PAGE_HSIZE as u64 + ix * ps;
306        self.write_data_n(fx, off, data, size);
307
308        // Clear unused space in page.
309        if old_size > size {
310            self.write_data_n(fx, off + size as u64, self.zbytes.clone(), old_size - size);
311        }
312    }
313
314    fn get_page(&self, pn: u64) -> Data {
315        let (fx, size, ix) = self.get_pn_info(pn);
316        if fx == 0 {
317            return Data::default();
318        }
319        let mut data = vec![0; size];
320        let off = PAGE_HSIZE as u64 + ix * self.page_size(fx);
321        self.read(fx, off, &mut data);
322        Arc::new(data)
323    }
324
325    fn size(&self, pn: u64) -> usize {
326        self.get_pn_info(pn).1
327    }
328
329    fn save(&mut self) {
330        // Free the temporary set of free logical pages.
331        let flist = std::mem::take(&mut self.free_pn);
332        for pn in flist.iter().rev() {
333            let pn = *pn;
334            let (fx, _size, ix) = self.get_pn_info(pn);
335            self.free_page(fx, ix);
336            self.set_pn_info(pn, 0, self.first_free_pn);
337            self.first_free_pn = pn;
338            self.header_dirty = true;
339        }
340        if self.header_dirty {
341            self.write_header();
342        }
343        self.ds.save();
344    }
345
346    fn rollback(&mut self) {
347        self.free_pn.clear();
348        self.read_header();
349    }
350
351    fn wait_complete(&self) {
352        self.ds.wait_complete();
353    }
354
355    #[cfg(feature = "verify")]
356    fn get_free(&mut self) -> (crate::HashSet<u64>, u64) {
357        let mut free = crate::HashSet::default();
358        let mut pn = self.first_free_pn;
359        while pn != NOT_PN {
360            assert!(free.insert(pn));
361            let (_fx, _size, next) = self.get_pn_info(pn);
362            pn = next;
363        }
364        (free, self.alloc_pn)
365    }
366
367    #[cfg(feature = "renumber")]
368    fn load_free_pages(&mut self) -> Option<u64> {
369        let mut pn = self.first_free_pn;
370        if pn == NOT_PN {
371            return None;
372        }
373        while pn != NOT_PN {
374            let (_sx, _size, next) = self.get_pn_info(pn);
375            self.drop_page(pn);
376            pn = next;
377        }
378        self.first_free_pn = NOT_PN;
379        self.header_dirty = true;
380        Some(self.alloc_pn - self.free_pn.len() as u64)
381    }
382
383    #[cfg(feature = "renumber")]
384    fn renumber(&mut self, pn: u64) -> u64 {
385        let new_pn = self.new_page();
386        let (fx, size, ix) = self.get_pn_info(pn);
387        if fx != 0 {
388            let off = ix * self.page_size(fx);
389            self.write(fx, off, &new_pn.to_le_bytes());
390        }
391        self.set_pn_info(new_pn, size, ix);
392        self.set_pn_info(pn, 0, 0);
393        self.drop_page(pn);
394        new_pn
395    }
396
397    #[cfg(feature = "renumber")]
398    fn set_alloc_pn(&mut self, target: u64) {
399        assert!(self.first_free_pn == NOT_PN);
400        self.alloc_pn = target;
401        self.header_dirty = true;
402        self.free_pn.clear();
403        self.truncate(PN_FILE, self.header_size + target * 8);
404    }
405}
406
407#[derive(Clone)]
408struct SizeInfo {
409    blk_cap: usize,
410    max_div: usize,
411    sizes: usize,
412}
413
414impl PageStorageInfo for SizeInfo {
415    /// The number of different page sizes.
416    fn sizes(&self) -> usize {
417        self.sizes
418    }
419
420    /// Size index for given page size.
421    fn index(&self, size: usize) -> usize {
422        let r = self.blk_cap / (size + PAGE_HSIZE);
423        if r >= self.max_div {
424            1
425        } else {
426            1 + self.max_div - r
427        }
428    }
429
430    /// Page size for given index.
431    fn size(&self, ix: usize) -> usize {
432        debug_assert!(ix > 0 && ix <= self.sizes);
433        let size = self.blk_cap / (1 + self.max_div - ix);
434        size - PAGE_HSIZE
435    }
436}
437
438#[test]
439fn test_block_page_stg() {
440    use atom_file::MemFile;
441    let stg = MemFile::new();
442    let limits = Limits::default();
443    let mut bps = BlockPageStg::new(stg.clone(), &limits);
444
445    let pn = bps.new_page();
446    let data = Arc::new(b"hello george".to_vec());
447
448    bps.set_page(pn, data.clone());
449
450    bps.save();
451    let mut bps = BlockPageStg::new(stg, &limits);
452
453    let data1 = bps.get_page(pn);
454    assert!(data == data1);
455
456    bps.save();
457}