Skip to main content

rustdb/
compact.rs

1use crate::{Arc, Data, PageStorage, PageStorageInfo, Storage, nd, util};
2use std::cmp::min;
3use std::collections::BTreeSet;
4
5/// CompactFile stores logical pages in smaller regions of backing storage.
6///
7/// Each logical page has a fixed size "starter page".
8///
9/// A logical page that does not fit in the "starter page" has 1 or more "extension pages".
10///
11/// Each extension page starts with its logical page number, to allow extension pages to be relocated as required.
12///
13/// When a new extension page is needed, it is allocated from the end of the file.
14///
15/// When an extension page is freed, the last extension page in the file is relocated to fill it.
16///
17/// The starter page section is extended as required when a logical page is written by relocating the first extension page to the end of the file.
18///
19/// File layout: file header | starter pages | extension pages.
20///
21/// Layout of starter page: 2 byte logical page size | array of 8 byte page numbers | user data | unused data.
22///
23/// Layout of extension page: 8 byte logical page number | user data | unused data.
24///
25/// All pages ( whether allocated or not ) initially have size zero.
26///
27/// Pages are allocated by simply incrementing lp_alloc, so sizes in the starter page section must be pre-initialised to zero when it is extended or after a renumber operation.
28pub struct CompactFile {
29    /// Underlying storage.
30    pub stg: Box<dyn Storage>,
31
32    /// Size of starter page
33    pub(crate) sp_size: usize,
34
35    /// Size of extension page
36    pub(crate) ep_size: usize,
37
38    /// Number of extension pages reserved for starter pages.      
39    ep_resvd: u64,
40
41    /// Number of extension pages allocated.       
42    ep_count: u64,
43
44    /// Temporary set of free extension pages.         
45    ep_free: BTreeSet<u64>,
46
47    /// Allocator for logical pages.        
48    lp_alloc: u64,
49
50    /// Start of linked list of free logical pages.        
51    lp_first: u64,
52
53    /// Temporary set of free logical pages.
54    lp_free: BTreeSet<u64>,
55
56    /// Starter page with list of free logical pages.
57    fsp: FreeStarterPage,
58
59    /// File is newly created.         
60    is_new: bool,
61
62    /// Header fields (ep_count, lp_alloc, lp_first) modified.
63    header_dirty: bool,
64}
65
66/// = 44. Size of file header.
67const HSIZE: u64 = 44;
68
69impl PageStorage for CompactFile {
70    fn size(&self, lpnum: u64) -> usize {
71        let off = self.lp_off(lpnum);
72        if off != 0 { self.read_u16(off) } else { 0 }
73    }
74
75    fn info(&self) -> Box<dyn PageStorageInfo> {
76        Box::new(Info {
77            sp_size: self.sp_size,
78            ep_size: self.ep_size,
79        })
80    }
81
82    fn set_page(&mut self, lpnum: u64, data: Data) {
83        debug_assert!(!self.lp_free.contains(&lpnum));
84
85        self.extend_starter_pages(lpnum);
86        // Calculate number of extension pages needed.
87        let size = data.len();
88        let ext = self.ext(size);
89
90        // Read the current starter info.
91        let foff = HSIZE + (self.sp_size as u64) * lpnum;
92        let old_size = self.read_u16(foff);
93        let mut old_ext = self.ext(old_size);
94
95        let mut info = vec![0_u8; 2 + old_ext * 8];
96        self.stg.read(foff, &mut info);
97
98        util::set(&mut info, 0, size as u64, 2);
99
100        if ext != old_ext {
101            // Note freed pages.
102            while old_ext > ext {
103                old_ext -= 1;
104                let fp = util::getu64(&info, 2 + old_ext * 8);
105                info.resize(info.len() - 8, 0); // Important or info could over-write data later.
106                self.ep_free.insert(fp);
107            }
108            // Allocate new pages.
109            while old_ext < ext {
110                let np = self.ep_alloc();
111                info.resize(info.len() + 8, 0);
112                util::setu64(&mut info[2 + old_ext * 8..], np);
113                old_ext += 1;
114            }
115        }
116
117        // Write the extension pages.
118        let mut done = 0;
119        for i in 0..ext {
120            let amount = min(size - done, self.ep_size - 8);
121            let page = util::getu64(&info, 2 + i * 8);
122            let foff = page * (self.ep_size as u64);
123            self.stg.write_u64(foff, lpnum);
124            self.stg.write_data(foff + 8, data.clone(), done, amount);
125            done += amount;
126        }
127
128        info.resize(self.sp_size, 0);
129
130        // Save any remaining data using unused portion of starter page.
131        let amount = size - done;
132        if amount > 0 {
133            let off = 2 + ext * 8;
134            info[off..off + amount].copy_from_slice(&data[done..size]);
135        }
136
137        // Write the info.
138        self.stg.write_vec(foff, info);
139    }
140
141    fn get_page(&self, lpnum: u64) -> Data {
142        let foff = self.lp_off(lpnum);
143        if foff == 0 {
144            return nd();
145        }
146        let mut starter = vec![0_u8; self.sp_size];
147        self.stg.read(foff, &mut starter);
148        let size = util::get(&starter, 0, 2) as usize; // Number of bytes in logical page.
149        let mut data = vec![0u8; size];
150        let ext = self.ext(size); // Number of extension pages.
151
152        // Read the extension pages.
153        let mut done = 0;
154        for i in 0..ext {
155            let amount = min(size - done, self.ep_size - 8);
156            let page = util::getu64(&starter, 2 + i * 8);
157            let roff = page * (self.ep_size as u64);
158            debug_assert!(self.stg.read_u64(roff) == lpnum);
159            self.stg.read(roff + 8, &mut data[done..done + amount]);
160            done += amount;
161        }
162
163        let amount = size - done;
164        if amount > 0 {
165            let off = 2 + ext * 8;
166            data[done..size].copy_from_slice(&starter[off..off + amount]);
167        }
168
169        Arc::new(data)
170    }
171
172    fn new_page(&mut self) -> u64 {
173        if let Some(p) = self.lp_free.pop_first() {
174            p
175        } else {
176            let mut p = self.lp_first;
177            if p != u64::MAX {
178                self.load_fsp(p);
179                if self.fsp.count > 1 {
180                    p = self.fsp.pop();
181                } else {
182                    self.lp_first = self.fsp.pop();
183                    self.header_dirty = true;
184                    self.fsp.dirty = false;
185                }
186            } else {
187                p = self.lp_alloc;
188                self.lp_alloc += 1;
189                self.header_dirty = true;
190            }
191            p
192        }
193    }
194
195    fn drop_page(&mut self, pnum: u64) {
196        self.lp_free.insert(pnum);
197    }
198
199    fn is_new(&self) -> bool {
200        self.is_new
201    }
202
203    fn rollback(&mut self) {
204        self.lp_free.clear();
205        self.read_header();
206        self.fsp.clear(u64::MAX);
207    }
208
209    fn save(&mut self) {
210        // Free the temporary set of free logical pages.
211        let flist = std::mem::take(&mut self.lp_free);
212        for p in flist.iter().rev() {
213            let p = *p;
214            // Set the page size to zero, frees any associated extension pages.
215            self.set_page(p, nd());
216            self.perm_free(p);
217        }
218        // Relocate pages to fill any free extension pages.
219        while !self.ep_free.is_empty() {
220            self.ep_count -= 1;
221            self.header_dirty = true;
222            let from = self.ep_count;
223            // If the last page is not a free page, relocate it using a free page.
224            if !self.ep_free.remove(&from) {
225                let to = self.ep_alloc();
226                self.relocate(from, to);
227            }
228        }
229        self.save_fsp();
230        if self.header_dirty {
231            self.write_header();
232            self.header_dirty = false;
233        }
234        self.stg.commit(self.ep_count * self.ep_size as u64);
235    }
236
237    fn wait_complete(&self) {
238        self.stg.wait_complete();
239    }
240
241    #[cfg(feature = "verify")]
242    fn get_free(&mut self) -> (crate::HashSet<u64>, u64) {
243        let mut free = crate::HashSet::default();
244        let mut p = self.lp_first;
245        while p != u64::MAX {
246            assert!(free.insert(p));
247            self.load_fsp(p);
248            for i in 1..self.fsp.count {
249                let p = self.fsp.get(i);
250                assert!(free.insert(p));
251            }
252            p = self.fsp.get(0);
253        }
254        (free, self.lp_alloc)
255    }
256
257    #[cfg(feature = "renumber")]
258    fn load_free_pages(&mut self) -> Option<u64> {
259        assert!(self.ep_free.is_empty());
260        let mut p = self.lp_first;
261        if p == u64::MAX {
262            return None;
263        }
264        while p != u64::MAX {
265            self.load_fsp(p);
266            for i in 1..self.fsp.count {
267                let p = self.fsp.get(i);
268                self.drop_page(p);
269            }
270            self.drop_page(p);
271            p = self.fsp.get(0);
272        }
273        self.lp_first = u64::MAX;
274        self.header_dirty = true;
275        Some(self.lp_alloc - self.lp_free.len() as u64)
276    }
277
278    #[cfg(feature = "renumber")]
279    fn renumber(&mut self, lpnum: u64) -> u64 {
280        let lpnum2 = self.new_page();
281        let foff = self.lp_off(lpnum);
282        if foff != 0 {
283            let mut starter = vec![0_u8; self.sp_size];
284            self.stg.read(foff, &mut starter);
285            let size = util::get(&starter, 0, 2) as usize; // Number of bytes in logical page.
286            let ext = self.ext(size); // Number of extension pages.
287
288            // Modify the extension pages.
289            for i in 0..ext {
290                let page = util::getu64(&starter, 2 + i * 8);
291                let woff = page * (self.ep_size as u64);
292                debug_assert!(self.stg.read_u64(woff) == lpnum);
293                self.stg.write_u64(woff, lpnum2);
294            }
295
296            // Write the starter data.
297            let foff2 = HSIZE + (self.sp_size as u64) * lpnum2;
298            self.stg.write_vec(foff2, starter);
299        }
300        lpnum2
301    }
302
303    #[cfg(feature = "renumber")]
304    fn set_alloc_pn(&mut self, target: u64) {
305        assert!(self.lp_first == u64::MAX);
306        assert!(self.ep_free.is_empty());
307        self.reduce_starter_pages(target);
308        self.lp_alloc = target;
309        self.header_dirty = true;
310        self.lp_free.clear();
311        self.clear_lp();
312    }
313}
314
315// *******************************************************************************
316
317impl CompactFile {
318    // Magic value to ensure file is correct format.
319    const MAGIC_VALUE: [u8; 8] = *b"RDBF1001";
320
321    /// Construct a new CompactFile.
322    pub fn new(stg: Box<dyn Storage>, sp_size: usize, ep_size: usize) -> Self {
323        let fsize = stg.size();
324        let is_new = fsize == 0;
325        let mut x = Self {
326            sp_size,
327            ep_size,
328            stg,
329            ep_resvd: 10,
330            ep_count: 10,
331            ep_free: BTreeSet::new(),
332            lp_alloc: 0,
333            lp_first: u64::MAX,
334            lp_free: BTreeSet::new(),
335            is_new,
336            header_dirty: false,
337            fsp: FreeStarterPage::new(),
338        };
339        let magic: u64 = crate::util::getu64(&Self::MAGIC_VALUE, 0);
340        if is_new {
341            x.stg.write_u64(0, magic);
342            x.write_header();
343            x.write_ep_resvd();
344            x.write_u16(40, x.sp_size as u16);
345            x.write_u16(42, x.ep_size as u16);
346        } else {
347            assert!(
348                x.stg.read_u64(0) == magic,
349                "Database File Invalid (maybe wrong version)"
350            );
351            x.read_header();
352            x.ep_resvd = x.stg.read_u64(32);
353            x.sp_size = x.read_u16(40);
354            x.ep_size = x.read_u16(42);
355        }
356        if is_new {
357            x.save();
358        }
359        x
360    }
361
362    fn read_header(&mut self) {
363        self.ep_count = self.stg.read_u64(8);
364        self.lp_alloc = self.stg.read_u64(16);
365        self.lp_first = self.stg.read_u64(24);
366    }
367
368    fn write_header(&mut self) {
369        self.stg.write_u64(8, self.ep_count);
370        self.stg.write_u64(16, self.lp_alloc);
371        self.stg.write_u64(24, self.lp_first);
372    }
373
374    fn write_ep_resvd(&mut self) {
375        self.stg.write_u64(32, self.ep_resvd);
376    }
377
378    fn perm_free(&mut self, p: u64) {
379        if self.lp_first == u64::MAX {
380            self.fsp.clear(p);
381            self.fsp.push(u64::MAX);
382            self.lp_first = p;
383            self.header_dirty = true;
384        } else {
385            self.load_fsp(self.lp_first);
386            if !self.fsp.full() {
387                self.fsp.push(p);
388            } else {
389                self.save_fsp();
390                self.fsp.clear(p);
391                self.fsp.push(self.lp_first);
392                self.lp_first = p;
393                self.header_dirty = true;
394            }
395        }
396    }
397
398    /// Read a u16 from the underlying file.
399    fn read_u16(&self, offset: u64) -> usize {
400        let mut bytes = [0; 2];
401        self.stg.read(offset, &mut bytes);
402        u16::from_le_bytes(bytes) as usize
403    }
404
405    /// Write a u16 to the underlying file.
406    fn write_u16(&mut self, offset: u64, x: u16) {
407        self.stg.write(offset, &x.to_le_bytes());
408    }
409
410    /// Relocate extension page to a new location.
411    fn relocate(&mut self, from: u64, to: u64) {
412        if from == to {
413            return;
414        }
415        let mut buffer = vec![0; self.ep_size];
416        self.stg.read(from * self.ep_size as u64, &mut buffer);
417        self.stg.write(to * self.ep_size as u64, &buffer);
418        let lpnum = util::getu64(&buffer, 0);
419        assert!(lpnum < self.lp_alloc);
420        // Compute location and length of the array of extension page numbers.
421        let mut off = HSIZE + lpnum * self.sp_size as u64;
422        let size = self.read_u16(off);
423        let mut ext = self.ext(size);
424        off += 2;
425        // Update the matching extension page number.
426        loop {
427            if ext == 0 {
428                panic!("relocate failed to find matching extension page lpnum={lpnum} from={from}");
429            }
430            let x = self.stg.read_u64(off);
431            if x == from {
432                self.stg.write_u64(off, to);
433                break;
434            }
435            off += 8;
436            ext -= 1;
437        }
438    }
439
440    /// Clear extension page.
441    fn ep_clear(&mut self, epnum: u64) {
442        let buf = vec![0; self.ep_size];
443        self.stg.write(epnum * self.ep_size as u64, &buf);
444    }
445
446    /// Get offset of starter page ( returns zero if not in reserved region ).
447    fn lp_off(&self, lpnum: u64) -> u64 {
448        let sp_size = self.sp_size as u64;
449        let mut off = HSIZE + lpnum * sp_size;
450        if off + sp_size > self.ep_resvd * (self.ep_size as u64) {
451            off = 0;
452        }
453        off
454    }
455
456    /// Extend the starter page array so that lpnum is valid.
457    fn extend_starter_pages(&mut self, lpnum: u64) {
458        let mut save = false;
459        while self.lp_off(lpnum) == 0 {
460            if !self.ep_free.remove(&self.ep_resvd)
461            // Do not relocate a free extended page.
462            {
463                self.relocate(self.ep_resvd, self.ep_count);
464                self.ep_count += 1;
465                self.header_dirty = true;
466            }
467
468            self.ep_clear(self.ep_resvd);
469            self.ep_resvd += 1;
470
471            save = true;
472        }
473        if save {
474            self.write_ep_resvd();
475        }
476    }
477
478    /// Allocate an extension page.
479    fn ep_alloc(&mut self) -> u64 {
480        if let Some(pp) = self.ep_free.iter().next() {
481            let p = *pp;
482            self.ep_free.remove(&p);
483            p
484        } else {
485            let p = self.ep_count;
486            self.ep_count += 1;
487            self.header_dirty = true;
488            p
489        }
490    }
491
492    /// Calculate the number of extension pages needed to store a page of given size.
493    fn ext(&self, size: usize) -> usize {
494        Self::ext_pages(self.sp_size, self.ep_size, size)
495    }
496
497    /// Calculate the number of extension pages needed to store a page of given size.
498    fn ext_pages(sp_size: usize, ep_size: usize, size: usize) -> usize {
499        let mut n = 0;
500        if size > (sp_size - 2) {
501            n = (size - (sp_size - 2)).div_ceil(ep_size - 16);
502        }
503        debug_assert!(2 + 16 * n + size <= sp_size + n * ep_size);
504        assert!(2 + n * 8 <= sp_size);
505        n
506    }
507
508    fn load_fsp(&mut self, lpnum: u64) {
509        if lpnum != self.fsp.current {
510            self.save_fsp();
511            let off = HSIZE + 2 + lpnum * self.sp_size as u64;
512            self.stg.read(off, &mut self.fsp.data);
513            self.fsp.init();
514            self.fsp.current = lpnum;
515        }
516    }
517
518    fn save_fsp(&mut self) {
519        if self.fsp.dirty {
520            self.fsp.terminate();
521            let off = HSIZE + 2 + self.fsp.current * self.sp_size as u64;
522            self.stg.write(off, &self.fsp.data);
523            self.fsp.dirty = false;
524        }
525    }
526
527    #[cfg(feature = "renumber")]
528    /// Set size of renumbered pages >= lp_alloc to zero.
529    fn clear_lp(&mut self) {
530        let start = HSIZE + (self.sp_size as u64) * self.lp_alloc;
531        let end = self.ep_resvd * self.ep_size as u64;
532        if end > start {
533            let buf = vec![0; (end - start) as usize];
534            self.stg.write(start, &buf);
535        }
536    }
537
538    #[cfg(feature = "renumber")]
539    fn reduce_starter_pages(&mut self, target: u64) {
540        let resvd = HSIZE + target * self.sp_size as u64;
541        let resvd = resvd.div_ceil(self.ep_size as u64);
542        while self.ep_resvd > resvd {
543            self.ep_count -= 1;
544            self.header_dirty = true;
545            let from = self.ep_count;
546            self.ep_resvd -= 1;
547            self.relocate(from, self.ep_resvd);
548        }
549        self.write_ep_resvd();
550    }
551} // end impl CompactFile
552
553struct FreeStarterPage {
554    current: u64,
555    count: usize,
556    data: [u8; 64],
557    dirty: bool,
558}
559
560impl FreeStarterPage {
561    fn new() -> Self {
562        Self {
563            count: 0,
564            data: [0; 64],
565            dirty: false,
566            current: u64::MAX,
567        }
568    }
569
570    fn full(&self) -> bool {
571        self.count == 8
572    }
573
574    fn push(&mut self, lpnum: u64) {
575        assert!(self.count < 8);
576        self.set(self.count, lpnum);
577        self.count += 1;
578        self.dirty = true;
579    }
580
581    fn pop(&mut self) -> u64 {
582        assert!(self.count > 0);
583        self.count -= 1;
584        self.dirty = true;
585        self.get(self.count)
586    }
587
588    fn terminate(&mut self) {
589        if self.count < 8 {
590            self.set(self.count, u64::MAX - 2);
591        }
592    }
593
594    fn init(&mut self) {
595        self.count = 0;
596        while self.count < 8 && self.get(self.count) != u64::MAX - 2 {
597            self.count += 1;
598        }
599        self.dirty = false;
600    }
601
602    fn get(&self, ix: usize) -> u64 {
603        let off = ix * 8;
604        util::getu64(&self.data, off)
605    }
606
607    fn set(&mut self, ix: usize, lpnum: u64) {
608        let off = ix * 8;
609        util::setu64(&mut self.data[off..off + 8], lpnum);
610    }
611
612    fn clear(&mut self, current: u64) {
613        self.data.fill(0);
614        self.current = current;
615        self.count = 0;
616        self.dirty = false;
617    }
618}
619
620struct Info {
621    sp_size: usize,
622    ep_size: usize,
623}
624
625impl PageStorageInfo for Info {
626    /// The number of different page sizes.
627    fn sizes(&self) -> usize {
628        (self.sp_size - 2) / 8
629    }
630
631    /// Size index for given page size.
632    fn index(&self, size: usize) -> usize {
633        (size + 2) / (self.ep_size - 16)
634    }
635
636    /// Page size for given index.
637    fn size(&self, ix: usize) -> usize {
638        (self.ep_size - 16) * ix + (self.sp_size - 2)
639    }
640}
641
642#[test]
643fn test() {
644    use crate::{AtomicFile, MemFile};
645    use rand::Rng;
646    /* Idea of test is to check two CompactFiles with different parameters behave the same */
647
648    let mut rng = rand::thread_rng();
649
650    let s0 = AtomicFile::new(MemFile::new(), MemFile::new());
651    let s1 = AtomicFile::new(MemFile::new(), MemFile::new());
652
653    let mut cf0 = CompactFile::new(s0, 200, 512);
654    let mut cf1 = CompactFile::new(s1, 136, 1024);
655    for _ in 0..100 {
656        cf0.new_page();
657        cf1.new_page();
658    }
659
660    for _ in 0..10000 {
661        let n: usize = rng.r#gen::<usize>() % 5000;
662        let p: u64 = rng.r#gen::<u64>() % 100;
663        let b: u8 = rng.r#gen::<u8>();
664
665        let d = vec![b; n];
666        let d = Arc::new(d);
667        cf0.set_page(p, d.clone());
668        cf1.set_page(p, d.clone());
669
670        let p: u64 = rng.r#gen::<u64>() % 100;
671        let x = cf0.get_page(p);
672        let y = cf1.get_page(p);
673        assert!(x == y);
674
675        cf0.save();
676        cf1.save();
677    }
678}