Skip to main content

page_store/
blockpagestg.rs

1use crate::{Data, Limits, PageStorage, PageStorageInfo, Storage, dividedstg, util};
2use dividedstg::{DividedStg, FD, FD_SIZE};
3use std::collections::BTreeSet;
4use std::sync::Arc;
5
6/*
7
8File 0 (PN_FILE) has a header ( allocation info and FDs ) then info for each numbered page, a 16-bit size and index into sub-file.
9
10First word of allocated page is 64-bit page number ( to allow relocation ).
11
12*/
13
14/// Implementation of [PageStorage].
15///
16/// BlockPageStorage first divides the file into a small number of variable size files ( each built out of fixed size blocks ), and then allocates pages from these files, with each file being used to store pages of a particular size. So this is implemented as three structs: BlockStg which managed fixed size blocks, DividedStg which implements multiple files out of fixed size blocks and finally BlockPageStorage which uses the files to manage variable size pages. Since blocks are fixed size, when there is a free block, the last block in the file can be relocated to fill the gap. Similarly, when there is a free page, the last page in the subfile can be relocated to fill the gap. Thus the amount of unused file storage is minimal.
17///
18/// The choice of block size and page sizes is not fixed and can be configured. There is some advantage in choosing a block size which can be divided exactly by a range of integers – this means that when a page is read, the read does not cross a block boundary, so there is a single read to the underlying file system (disregarding “small” reads which are required to establish the page location, which can be buffered). For example, a block size of 27,720 ( = 5 x 7 x 8 x 9 x 11 ) can be divided evenly by integers in the range 6 to 12.
19pub struct BlockPageStg {
20    /// Underlying Divided Storage.
21    pub ds: DividedStg,
22    alloc_pn: u64,
23    first_free_pn: u64,
24    fd: Vec<FD>,
25    free_pn: BTreeSet<u64>, // Temporary set of free page numbers.
26    header_dirty: bool,
27    is_new: bool,
28    psi: SizeInfo,
29    header_size: u64,
30    zbytes: Data,
31}
32
33const PN_FILE: usize = 0; // Page number sub-file, has header and info (size,index) for each numbered page.
34const NOT_PN: u64 = u64::MAX >> 16; // Special value to denote end of list of free page numbers.
35const PAGE_HSIZE: usize = 8; // Space for 64-bit page number to allow page to be relocated.
36const HEADER_SIZE: usize = 24; // Space in PN_FILE for storing alloc_pn, first_free_pn, max_div, sizes.
37
38impl BlockPageStg {
39    /// Construct from specified Storage and limits.
40    pub fn new(stg: Box<dyn Storage>, lim: &Limits) -> Box<Self> {
41        let is_new = stg.size() == 0;
42
43        let sizes = lim.page_sizes;
44        let max_div = lim.max_div;
45        let ds = DividedStg::new(stg, lim.blk_cap);
46        let blk_cap = ds.blk_cap as usize;
47
48        let mut s = Self {
49            ds,
50            alloc_pn: 0,
51            first_free_pn: NOT_PN,
52            fd: Vec::new(),
53            free_pn: BTreeSet::default(),
54            header_dirty: true,
55            is_new,
56            psi: SizeInfo {
57                blk_cap,
58                max_div,
59                sizes,
60            },
61            header_size: 0,
62            zbytes: Data::default(),
63        };
64
65        if is_new {
66            for _i in 0..sizes + 1 {
67                s.fd.push(s.ds.new_file());
68            }
69            s.ds.set_root(&s.fd[0]);
70        } else {
71            s.read_header();
72        }
73
74        // Page sizes are assumed to fit in u16.
75        assert!(
76            s.psi.max_size_page() <= u16::MAX as usize,
77            "Max page size is 65535"
78        );
79
80        s.header_size = (HEADER_SIZE + s.psi.sizes * FD_SIZE) as u64;
81        s.zbytes = Arc::new(vec![0; s.psi.max_size_page()]);
82
83        #[cfg(feature = "log")]
84        println!("bps new alloc={:?}", &s.allocs());
85
86        Box::new(s)
87    }
88
89    /// Read page number file header.
90    fn read_header(&mut self) {
91        self.fd.clear();
92        self.fd.push(self.ds.get_root());
93
94        let mut buf = [0; HEADER_SIZE];
95        self.read(PN_FILE, 0, &mut buf);
96        self.alloc_pn = util::getu64(&buf, 0);
97        self.first_free_pn = util::getu64(&buf, 8);
98
99        self.psi.max_div = util::get(&buf, 16, 4) as usize;
100        self.psi.sizes = util::get(&buf, 20, 4) as usize;
101
102        let sizes = self.psi.sizes;
103        let mut buf = vec![0; FD_SIZE * sizes];
104        self.read(PN_FILE, HEADER_SIZE as u64, &mut buf);
105
106        for fx in 0..sizes {
107            let off = fx * FD_SIZE;
108            self.fd.push(self.ds.load_fd(&buf[off..]));
109        }
110        self.header_dirty = false;
111    }
112
113    /// Write page number file header.
114    fn write_header(&mut self) {
115        let mut buf = vec![0; self.header_size as usize];
116        util::setu64(&mut buf, self.alloc_pn);
117        util::setu64(&mut buf[8..], self.first_free_pn);
118        util::set(&mut buf, 16, self.psi.max_div as u64, 4);
119        util::set(&mut buf, 20, self.psi.sizes as u64, 4);
120
121        for fx in 0..self.psi.sizes {
122            let off = HEADER_SIZE + fx * FD_SIZE;
123            self.ds.save_fd(&self.fd[fx + 1], &mut buf[off..]);
124        }
125        self.write(PN_FILE, 0, &buf);
126        self.header_dirty = false;
127
128        #[cfg(feature = "log")]
129        println!("bps write_header allocs={:?}", &self.allocs());
130    }
131
132    #[cfg(feature = "log")]
133    fn allocs(&self) -> Vec<u64> {
134        (0..self.psi.sizes() + 1).map(|x| self.alloc(x)).collect()
135    }
136
137    /// Get page size of sub-file ( fx > 0 ).
138    fn page_size(&self, fx: usize) -> u64 {
139        (self.psi.size(fx) + PAGE_HSIZE) as u64
140    }
141
142    /// Get sub-file size.
143    fn fsize(&self, fx: usize) -> u64 {
144        let size = self.fd[fx].size();
145        if fx == 0 && size < self.header_size {
146            self.header_size
147        } else {
148            size
149        }
150    }
151
152    /// Use sub-file size to calculate allocation index.
153    fn alloc(&self, fx: usize) -> u64 {
154        let size = self.fsize(fx);
155        if fx == 0 {
156            (size - self.header_size) / 8
157        } else {
158            let ps = self.page_size(fx);
159            size.div_ceil(ps)
160        }
161    }
162
163    /// Free page by relocating last page in sub-file to fill gap and truncating.
164    fn free_page(&mut self, fx: usize, ix: u64) {
165        if fx != 0 {
166            let last = self.alloc(fx) - 1;
167            let ps = self.page_size(fx);
168            if last != ix {
169                let mut buf = vec![0; ps as usize];
170                self.read(fx, last * ps, &mut buf);
171                let pn = util::getu64(&buf, 0);
172                let (fx1, _size, ix1) = self.get_pn_info(pn);
173                assert!(fx1 == fx && ix1 == last);
174                self.update_ix(pn, ix);
175                self.write_data(fx, ix * ps, Arc::new(buf));
176            }
177            self.truncate(fx, last * ps);
178        }
179    }
180
181    /// Set numbered page info.
182    fn set_pn_info(&mut self, pn: u64, size: usize, ix: u64) {
183        let off = self.header_size + pn * 8;
184        let eof = self.fsize(PN_FILE);
185        if off > eof {
186            self.clear(PN_FILE, eof, off - eof);
187        }
188        let mut buf = [0; 8];
189        util::set(&mut buf, 0, ix, 6);
190        util::set(&mut buf, 6, size as u64, 2);
191        self.write(PN_FILE, off, &buf);
192    }
193
194    /// Get info about numbered page ( file index, size, index ).
195    fn get_pn_info(&self, pn: u64) -> (usize, usize, u64) {
196        let off = self.header_size + pn * 8;
197        if off >= self.fsize(0) {
198            return (0, 0, 0);
199        }
200        let mut buf = [0; 8];
201        self.read(PN_FILE, off, &mut buf);
202        let ix = util::get(&buf, 0, 6);
203        let size = util::get(&buf, 6, 2) as usize;
204        let fx = if size == 0 { 0 } else { self.psi.index(size) };
205        (fx, size, ix)
206    }
207
208    /// Update ix for numbered page ( for relocation ).
209    fn update_ix(&mut self, pn: u64, ix: u64) {
210        let off = self.header_size + pn * 8;
211        self.write(PN_FILE, off, &ix.to_le_bytes()[0..6]);
212    }
213
214    /// Clear sub-file region.
215    fn clear(&mut self, fx: usize, off: u64, n: u64) {
216        let z = Arc::new(vec![0; n as usize]);
217        self.write_data(fx, off, z);
218    }
219
220    /// Write sub-file.
221    fn write(&mut self, fx: usize, off: u64, data: &[u8]) {
222        let data = Arc::new(data.to_vec());
223        self.write_data(fx, off, data);
224    }
225
226    /// Write sub-file Data.
227    fn write_data(&mut self, fx: usize, off: u64, data: Data) {
228        let n = data.len();
229        self.write_data_n(fx, off, data, n);
230    }
231
232    /// Write sub-file Data up to n bytes.
233    fn write_data_n(&mut self, fx: usize, off: u64, data: Data, n: usize) {
234        self.ds.write_data(&mut self.fd[fx], off, data, n);
235        self.save_fd(fx);
236    }
237
238    /// Truncate sub-file.
239    fn truncate(&mut self, fx: usize, off: u64) {
240        self.ds.truncate(&mut self.fd[fx], off);
241        self.save_fd(fx);
242    }
243
244    /// Save sub-file descriptor after write or truncate operation.
245    fn save_fd(&mut self, fx: usize) {
246        let fd = &mut self.fd[fx];
247        if fd.changed {
248            fd.changed = false;
249            self.header_dirty = true;
250            if fx == 0 {
251                self.ds.set_root(fd);
252            }
253        }
254    }
255
256    /// Read sub-file.
257    fn read(&self, fx: usize, off: u64, data: &mut [u8]) {
258        self.ds.read(&self.fd[fx], off, data);
259    }
260}
261
262impl PageStorage for BlockPageStg {
263    fn is_new(&self) -> bool {
264        self.is_new
265    }
266
267    fn new_page(&mut self) -> u64 {
268        if let Some(pn) = self.free_pn.pop_first() {
269            pn
270        } else {
271            self.header_dirty = true;
272            let pn = self.first_free_pn;
273            if pn != NOT_PN {
274                let (_fx, _size, next) = self.get_pn_info(pn);
275                self.first_free_pn = next;
276                pn
277            } else {
278                let pn = self.alloc_pn;
279                self.alloc_pn += 1;
280                pn
281            }
282        }
283    }
284
285    fn drop_page(&mut self, pn: u64) {
286        self.free_pn.insert(pn);
287    }
288
289    fn info(&self) -> Box<dyn PageStorageInfo> {
290        Box::new(self.psi.clone())
291    }
292
293    fn set_page(&mut self, pn: u64, data: Data) {
294        let size = data.len();
295        let fx = self.psi.index(size);
296        let ps = self.page_size(fx);
297        let (old_fx, mut old_size, mut ix) = self.get_pn_info(pn);
298        if size != old_size {
299            if fx != old_fx {
300                self.free_page(old_fx, ix);
301                ix = self.alloc(fx);
302                old_size = ps as usize - PAGE_HSIZE;
303                self.write(fx, ix * ps, &pn.to_le_bytes());
304                self.set_pn_info(pn, size, ix);
305            }
306            self.set_pn_info(pn, size, ix);
307        }
308
309        let off = PAGE_HSIZE as u64 + ix * ps;
310        self.write_data_n(fx, off, data, size);
311
312        // Clear unused space in page.
313        if old_size > size {
314            self.write_data_n(fx, off + size as u64, self.zbytes.clone(), old_size - size);
315        }
316    }
317
318    fn get_page(&self, pn: u64) -> Data {
319        let (fx, size, ix) = self.get_pn_info(pn);
320        if fx == 0 {
321            return Data::default();
322        }
323        let mut data = vec![0; size];
324        let off = PAGE_HSIZE as u64 + ix * self.page_size(fx);
325        self.read(fx, off, &mut data);
326        Arc::new(data)
327    }
328
329    fn size(&self, pn: u64) -> usize {
330        self.get_pn_info(pn).1
331    }
332
333    fn save(&mut self) {
334        // Free the temporary set of free logical pages.
335        let flist = std::mem::take(&mut self.free_pn);
336        for pn in flist.iter().rev() {
337            let pn = *pn;
338            let (fx, _size, ix) = self.get_pn_info(pn);
339            self.free_page(fx, ix);
340            self.set_pn_info(pn, 0, self.first_free_pn);
341            self.first_free_pn = pn;
342            self.header_dirty = true;
343        }
344        if self.header_dirty {
345            self.write_header();
346        }
347        self.ds.save();
348    }
349
350    fn rollback(&mut self) {
351        self.free_pn.clear();
352        self.read_header();
353    }
354
355    fn wait_complete(&self) {
356        self.ds.wait_complete();
357    }
358
359    #[cfg(feature = "verify")]
360    fn get_free(&mut self) -> (crate::HashSet<u64>, u64) {
361        let mut free = crate::HashSet::default();
362        let mut pn = self.first_free_pn;
363        while pn != NOT_PN {
364            assert!(free.insert(pn));
365            let (_fx, _size, next) = self.get_pn_info(pn);
366            pn = next;
367        }
368        (free, self.alloc_pn)
369    }
370
371    #[cfg(feature = "renumber")]
372    fn load_free_pages(&mut self) -> Option<u64> {
373        let mut pn = self.first_free_pn;
374        if pn == NOT_PN {
375            return None;
376        }
377        while pn != NOT_PN {
378            let (_sx, _size, next) = self.get_pn_info(pn);
379            self.drop_page(pn);
380            pn = next;
381        }
382        self.first_free_pn = NOT_PN;
383        self.header_dirty = true;
384        Some(self.alloc_pn - self.free_pn.len() as u64)
385    }
386
387    #[cfg(feature = "renumber")]
388    fn renumber(&mut self, pn: u64) -> u64 {
389        let new_pn = self.new_page();
390        let (fx, size, ix) = self.get_pn_info(pn);
391        if fx != 0 {
392            let off = ix * self.page_size(fx);
393            self.write(fx, off, &new_pn.to_le_bytes());
394        }
395        self.set_pn_info(new_pn, size, ix);
396        self.set_pn_info(pn, 0, 0);
397        self.drop_page(pn);
398        new_pn
399    }
400
401    #[cfg(feature = "renumber")]
402    fn set_alloc_pn(&mut self, target: u64) {
403        assert!(self.first_free_pn == NOT_PN);
404        self.alloc_pn = target;
405        self.header_dirty = true;
406        self.free_pn.clear();
407        self.truncate(PN_FILE, self.header_size + target * 8);
408    }
409}
410
411#[derive(Clone)]
412struct SizeInfo {
413    blk_cap: usize,
414    max_div: usize,
415    sizes: usize,
416}
417
418impl PageStorageInfo for SizeInfo {
419    /// The number of different page sizes.
420    fn sizes(&self) -> usize {
421        self.sizes
422    }
423
424    /// Size index for given page size.
425    fn index(&self, size: usize) -> usize {
426        let r = self.blk_cap / (size + PAGE_HSIZE);
427        if r >= self.max_div {
428            1
429        } else {
430            1 + self.max_div - r
431        }
432    }
433
434    /// Page size for given index.
435    fn size(&self, ix: usize) -> usize {
436        debug_assert!(ix > 0 && ix <= self.sizes);
437        let size = self.blk_cap / (1 + self.max_div - ix);
438        size - PAGE_HSIZE
439    }
440}
441
442#[test]
443fn test_block_page_stg() {
444    use atom_file::MemFile;
445    let stg = MemFile::new();
446    let limits = Limits::default();
447    let mut bps = BlockPageStg::new(stg.clone(), &limits);
448
449    let pn = bps.new_page();
450    let data = Arc::new(b"hello george".to_vec());
451
452    bps.set_page(pn, data.clone());
453
454    bps.save();
455    let mut bps = BlockPageStg::new(stg, &limits);
456
457    let data1 = bps.get_page(pn);
458    assert!(data == data1);
459
460    bps.save();
461}