rustdb/
compact.rs

1use crate::{nd, util, Arc, Data, PageStorage, PageStorageInfo, Storage};
2use std::cmp::min;
3use std::collections::BTreeSet;
4
5/// CompactFile stores logical pages in smaller regions of backing storage.
6///
7/// Each logical page has a fixed size "starter page".
8///
9/// A logical page that does not fit in the "starter page" has 1 or more "extension pages".
10///
11/// Each extension page starts with its logical page number, to allow extension pages to be relocated as required.
12///
13/// When a new extension page is needed, it is allocated from the end of the file.
14///
15/// When an extension page is freed, the last extension page in the file is relocated to fill it.
16///
17/// The starter page section is extended as required when a logical page is written by relocating the first extension page to the end of the file.
18///
19/// File layout: file header | starter pages | extension pages.
20///
21/// Layout of starter page: 2 byte logical page size | array of 8 byte page numbers | user data | unused data.
22///
23/// Layout of extension page: 8 byte logical page number | user data | unused data.
24///
25/// All pages ( whether allocated or not ) initially have size zero.
26///
27/// Pages are allocated by simply incrementing lp_alloc, so sizes in the starter page section must be pre-initialised to zero when it is extended or after a renumber operation.
28
29pub struct CompactFile {
30    /// Underlying storage.
31    pub stg: Box<dyn Storage>,
32
33    /// Size of starter page
34    pub(crate) sp_size: usize,
35
36    /// Size of extension page
37    pub(crate) ep_size: usize,
38
39    /// Number of extension pages reserved for starter pages.      
40    ep_resvd: u64,
41
42    /// Number of extension pages allocated.       
43    ep_count: u64,
44
45    /// Temporary set of free extension pages.         
46    ep_free: BTreeSet<u64>,
47
48    /// Allocator for logical pages.        
49    lp_alloc: u64,
50
51    /// Start of linked list of free logical pages.        
52    lp_first: u64,
53
54    /// Temporary set of free logical pages.
55    lp_free: BTreeSet<u64>,
56
57    /// Starter page with list of free logical pages.
58    fsp: FreeStarterPage,
59
60    /// File is newly created.         
61    is_new: bool,
62
63    /// Header fields (ep_count, lp_alloc, lp_first) modified.
64    header_dirty: bool,
65}
66
67/// = 44. Size of file header.
68const HSIZE: u64 = 44;
69
70impl PageStorage for CompactFile {
71    fn size(&self, lpnum: u64) -> usize {
72        let off = self.lp_off(lpnum);
73        if off != 0 {
74            self.read_u16(off)
75        } else {
76            0
77        }
78    }
79
80    fn info(&self) -> Box<dyn PageStorageInfo> {
81        Box::new(Info {
82            sp_size: self.sp_size,
83            ep_size: self.ep_size,
84        })
85    }
86
87    fn set_page(&mut self, lpnum: u64, data: Data) {
88        debug_assert!(!self.lp_free.contains(&lpnum));
89
90        self.extend_starter_pages(lpnum);
91        // Calculate number of extension pages needed.
92        let size = data.len();
93        let ext = self.ext(size);
94
95        // Read the current starter info.
96        let foff = HSIZE + (self.sp_size as u64) * lpnum;
97        let old_size = self.read_u16(foff);
98        let mut old_ext = self.ext(old_size);
99
100        let mut info = vec![0_u8; 2 + old_ext * 8];
101        self.stg.read(foff, &mut info);
102
103        util::set(&mut info, 0, size as u64, 2);
104
105        if ext != old_ext {
106            // Note freed pages.
107            while old_ext > ext {
108                old_ext -= 1;
109                let fp = util::getu64(&info, 2 + old_ext * 8);
110                info.resize(info.len() - 8, 0); // Important or info could over-write data later.
111                self.ep_free.insert(fp);
112            }
113            // Allocate new pages.
114            while old_ext < ext {
115                let np = self.ep_alloc();
116                info.resize(info.len() + 8, 0);
117                util::setu64(&mut info[2 + old_ext * 8..], np);
118                old_ext += 1;
119            }
120        }
121
122        // Write the extension pages.
123        let mut done = 0;
124        for i in 0..ext {
125            let amount = min(size - done, self.ep_size - 8);
126            let page = util::getu64(&info, 2 + i * 8);
127            let foff = page * (self.ep_size as u64);
128            self.stg.write_u64(foff, lpnum);
129            self.stg.write_data(foff + 8, data.clone(), done, amount);
130            done += amount;
131        }
132
133        info.resize(self.sp_size, 0);
134
135        // Save any remaining data using unused portion of starter page.
136        let amount = size - done;
137        if amount > 0 {
138            let off = 2 + ext * 8;
139            info[off..off + amount].copy_from_slice(&data[done..size]);
140        }
141
142        // Write the info.
143        self.stg.write_vec(foff, info);
144    }
145
146    fn get_page(&self, lpnum: u64) -> Data {
147        let foff = self.lp_off(lpnum);
148        if foff == 0 {
149            return nd();
150        }
151        let mut starter = vec![0_u8; self.sp_size];
152        self.stg.read(foff, &mut starter);
153        let size = util::get(&starter, 0, 2) as usize; // Number of bytes in logical page.
154        let mut data = vec![0u8; size];
155        let ext = self.ext(size); // Number of extension pages.
156
157        // Read the extension pages.
158        let mut done = 0;
159        for i in 0..ext {
160            let amount = min(size - done, self.ep_size - 8);
161            let page = util::getu64(&starter, 2 + i * 8);
162            let roff = page * (self.ep_size as u64);
163            debug_assert!(self.stg.read_u64(roff) == lpnum);
164            self.stg.read(roff + 8, &mut data[done..done + amount]);
165            done += amount;
166        }
167
168        let amount = size - done;
169        if amount > 0 {
170            let off = 2 + ext * 8;
171            data[done..size].copy_from_slice(&starter[off..off + amount]);
172        }
173
174        Arc::new(data)
175    }
176
177    fn new_page(&mut self) -> u64 {
178        if let Some(p) = self.lp_free.pop_first() {
179            p
180        } else {
181            let mut p = self.lp_first;
182            if p != u64::MAX {
183                self.load_fsp(p);
184                if self.fsp.count > 1 {
185                    p = self.fsp.pop();
186                } else {
187                    self.lp_first = self.fsp.pop();
188                    self.header_dirty = true;
189                    self.fsp.dirty = false;
190                }
191            } else {
192                p = self.lp_alloc;
193                self.lp_alloc += 1;
194                self.header_dirty = true;
195            }
196            p
197        }
198    }
199
200    fn drop_page(&mut self, pnum: u64) {
201        self.lp_free.insert(pnum);
202    }
203
204    fn is_new(&self) -> bool {
205        self.is_new
206    }
207
208    fn rollback(&mut self) {
209        self.lp_free.clear();
210        self.read_header();
211        self.fsp.clear(u64::MAX);
212    }
213
214    fn save(&mut self) {
215        // Free the temporary set of free logical pages.
216        let flist = std::mem::take(&mut self.lp_free);
217        for p in flist.iter().rev() {
218            let p = *p;
219            // Set the page size to zero, frees any associated extension pages.
220            self.set_page(p, nd());
221            self.perm_free(p);
222        }
223        // Relocate pages to fill any free extension pages.
224        while !self.ep_free.is_empty() {
225            self.ep_count -= 1;
226            self.header_dirty = true;
227            let from = self.ep_count;
228            // If the last page is not a free page, relocate it using a free page.
229            if !self.ep_free.remove(&from) {
230                let to = self.ep_alloc();
231                self.relocate(from, to);
232            }
233        }
234        self.save_fsp();
235        if self.header_dirty {
236            self.write_header();
237            self.header_dirty = false;
238        }
239        self.stg.commit(self.ep_count * self.ep_size as u64);
240    }
241
242    fn wait_complete(&self) {
243        self.stg.wait_complete();
244    }
245
246    #[cfg(feature = "verify")]
247    fn get_free(&mut self) -> (crate::HashSet<u64>, u64) {
248        let mut free = crate::HashSet::default();
249        let mut p = self.lp_first;
250        while p != u64::MAX {
251            assert!(free.insert(p));
252            self.load_fsp(p);
253            for i in 1..self.fsp.count {
254                let p = self.fsp.get(i);
255                assert!(free.insert(p));
256            }
257            p = self.fsp.get(0);
258        }
259        (free, self.lp_alloc)
260    }
261
262    #[cfg(feature = "renumber")]
263    fn load_free_pages(&mut self) -> Option<u64> {
264        assert!(self.ep_free.is_empty());
265        let mut p = self.lp_first;
266        if p == u64::MAX {
267            return None;
268        }
269        while p != u64::MAX {
270            self.load_fsp(p);
271            for i in 1..self.fsp.count {
272                let p = self.fsp.get(i);
273                self.drop_page(p);
274            }
275            self.drop_page(p);
276            p = self.fsp.get(0);
277        }
278        self.lp_first = u64::MAX;
279        self.header_dirty = true;
280        Some(self.lp_alloc - self.lp_free.len() as u64)
281    }
282
283    #[cfg(feature = "renumber")]
284    fn renumber(&mut self, lpnum: u64) -> u64 {
285        let lpnum2 = self.new_page();
286        let foff = self.lp_off(lpnum);
287        if foff != 0 {
288            let mut starter = vec![0_u8; self.sp_size];
289            self.stg.read(foff, &mut starter);
290            let size = util::get(&starter, 0, 2) as usize; // Number of bytes in logical page.
291            let ext = self.ext(size); // Number of extension pages.
292
293            // Modify the extension pages.
294            for i in 0..ext {
295                let page = util::getu64(&starter, 2 + i * 8);
296                let woff = page * (self.ep_size as u64);
297                debug_assert!(self.stg.read_u64(woff) == lpnum);
298                self.stg.write_u64(woff, lpnum2);
299            }
300
301            // Write the starter data.
302            let foff2 = HSIZE + (self.sp_size as u64) * lpnum2;
303            self.stg.write_vec(foff2, starter);
304        }
305        lpnum2
306    }
307
308    #[cfg(feature = "renumber")]
309    fn set_alloc_pn(&mut self, target: u64) {
310        assert!(self.lp_first == u64::MAX);
311        assert!(self.ep_free.is_empty());
312        self.reduce_starter_pages(target);
313        self.lp_alloc = target;
314        self.header_dirty = true;
315        self.lp_free.clear();
316        self.clear_lp();
317    }
318}
319
320// *******************************************************************************
321
322impl CompactFile {
323    // Magic value to ensure file is correct format.
324    const MAGIC_VALUE: [u8; 8] = *b"RDBF1001";
325
326    /// Construct a new CompactFile.
327    pub fn new(stg: Box<dyn Storage>, sp_size: usize, ep_size: usize) -> Self {
328        let fsize = stg.size();
329        let is_new = fsize == 0;
330        let mut x = Self {
331            sp_size,
332            ep_size,
333            stg,
334            ep_resvd: 10,
335            ep_count: 10,
336            ep_free: BTreeSet::new(),
337            lp_alloc: 0,
338            lp_first: u64::MAX,
339            lp_free: BTreeSet::new(),
340            is_new,
341            header_dirty: false,
342            fsp: FreeStarterPage::new(),
343        };
344        let magic: u64 = crate::util::getu64(&Self::MAGIC_VALUE, 0);
345        if is_new {
346            x.stg.write_u64(0, magic);
347            x.write_header();
348            x.write_ep_resvd();
349            x.write_u16(40, x.sp_size as u16);
350            x.write_u16(42, x.ep_size as u16);
351        } else {
352            assert!(
353                x.stg.read_u64(0) == magic,
354                "Database File Invalid (maybe wrong version)"
355            );
356            x.read_header();
357            x.ep_resvd = x.stg.read_u64(32);
358            x.sp_size = x.read_u16(40);
359            x.ep_size = x.read_u16(42);
360        }
361        if is_new {
362            x.save();
363        }
364        x
365    }
366
367    fn read_header(&mut self) {
368        self.ep_count = self.stg.read_u64(8);
369        self.lp_alloc = self.stg.read_u64(16);
370        self.lp_first = self.stg.read_u64(24);
371    }
372
373    fn write_header(&mut self) {
374        self.stg.write_u64(8, self.ep_count);
375        self.stg.write_u64(16, self.lp_alloc);
376        self.stg.write_u64(24, self.lp_first);
377    }
378
379    fn write_ep_resvd(&mut self) {
380        self.stg.write_u64(32, self.ep_resvd);
381    }
382
383    fn perm_free(&mut self, p: u64) {
384        if self.lp_first == u64::MAX {
385            self.fsp.clear(p);
386            self.fsp.push(u64::MAX);
387            self.lp_first = p;
388            self.header_dirty = true;
389        } else {
390            self.load_fsp(self.lp_first);
391            if !self.fsp.full() {
392                self.fsp.push(p);
393            } else {
394                self.save_fsp();
395                self.fsp.clear(p);
396                self.fsp.push(self.lp_first);
397                self.lp_first = p;
398                self.header_dirty = true;
399            }
400        }
401    }
402
403    /// Read a u16 from the underlying file.
404    fn read_u16(&self, offset: u64) -> usize {
405        let mut bytes = [0; 2];
406        self.stg.read(offset, &mut bytes);
407        u16::from_le_bytes(bytes) as usize
408    }
409
410    /// Write a u16 to the underlying file.
411    fn write_u16(&mut self, offset: u64, x: u16) {
412        self.stg.write(offset, &x.to_le_bytes());
413    }
414
415    /// Relocate extension page to a new location.
416    fn relocate(&mut self, from: u64, to: u64) {
417        if from == to {
418            return;
419        }
420        let mut buffer = vec![0; self.ep_size];
421        self.stg.read(from * self.ep_size as u64, &mut buffer);
422        self.stg.write(to * self.ep_size as u64, &buffer);
423        let lpnum = util::getu64(&buffer, 0);
424        assert!(lpnum < self.lp_alloc);
425        // Compute location and length of the array of extension page numbers.
426        let mut off = HSIZE + lpnum * self.sp_size as u64;
427        let size = self.read_u16(off);
428        let mut ext = self.ext(size);
429        off += 2;
430        // Update the matching extension page number.
431        loop {
432            if ext == 0 {
433                panic!("relocate failed to find matching extension page lpnum={lpnum} from={from}");
434            }
435            let x = self.stg.read_u64(off);
436            if x == from {
437                self.stg.write_u64(off, to);
438                break;
439            }
440            off += 8;
441            ext -= 1;
442        }
443    }
444
445    /// Clear extension page.
446    fn ep_clear(&mut self, epnum: u64) {
447        let buf = vec![0; self.ep_size];
448        self.stg.write(epnum * self.ep_size as u64, &buf);
449    }
450
451    /// Get offset of starter page ( returns zero if not in reserved region ).
452    fn lp_off(&self, lpnum: u64) -> u64 {
453        let sp_size = self.sp_size as u64;
454        let mut off = HSIZE + lpnum * sp_size;
455        if off + sp_size > self.ep_resvd * (self.ep_size as u64) {
456            off = 0;
457        }
458        off
459    }
460
461    /// Extend the starter page array so that lpnum is valid.
462    fn extend_starter_pages(&mut self, lpnum: u64) {
463        let mut save = false;
464        while self.lp_off(lpnum) == 0 {
465            if !self.ep_free.remove(&self.ep_resvd)
466            // Do not relocate a free extended page.
467            {
468                self.relocate(self.ep_resvd, self.ep_count);
469                self.ep_count += 1;
470                self.header_dirty = true;
471            }
472
473            self.ep_clear(self.ep_resvd);
474            self.ep_resvd += 1;
475
476            save = true;
477        }
478        if save {
479            self.write_ep_resvd();
480        }
481    }
482
483    /// Allocate an extension page.
484    fn ep_alloc(&mut self) -> u64 {
485        if let Some(pp) = self.ep_free.iter().next() {
486            let p = *pp;
487            self.ep_free.remove(&p);
488            p
489        } else {
490            let p = self.ep_count;
491            self.ep_count += 1;
492            self.header_dirty = true;
493            p
494        }
495    }
496
497    /// Calculate the number of extension pages needed to store a page of given size.
498    fn ext(&self, size: usize) -> usize {
499        Self::ext_pages(self.sp_size, self.ep_size, size)
500    }
501
502    /// Calculate the number of extension pages needed to store a page of given size.
503    fn ext_pages(sp_size: usize, ep_size: usize, size: usize) -> usize {
504        let mut n = 0;
505        if size > (sp_size - 2) {
506            n = ((size - (sp_size - 2)) + (ep_size - 16 - 1)) / (ep_size - 16);
507        }
508        debug_assert!(2 + 16 * n + size <= sp_size + n * ep_size);
509        assert!(2 + n * 8 <= sp_size);
510        n
511    }
512
513    fn load_fsp(&mut self, lpnum: u64) {
514        if lpnum != self.fsp.current {
515            self.save_fsp();
516            let off = HSIZE + 2 + lpnum * self.sp_size as u64;
517            self.stg.read(off, &mut self.fsp.data);
518            self.fsp.init();
519            self.fsp.current = lpnum;
520        }
521    }
522
523    fn save_fsp(&mut self) {
524        if self.fsp.dirty {
525            self.fsp.terminate();
526            let off = HSIZE + 2 + self.fsp.current * self.sp_size as u64;
527            self.stg.write(off, &self.fsp.data);
528            self.fsp.dirty = false;
529        }
530    }
531
532    #[cfg(feature = "renumber")]
533    /// Set size of renumbered pages >= lp_alloc to zero.
534    fn clear_lp(&mut self) {
535        let start = HSIZE + (self.sp_size as u64) * self.lp_alloc;
536        let end = self.ep_resvd * self.ep_size as u64;
537        if end > start {
538            let buf = vec![0; (end - start) as usize];
539            self.stg.write(start, &buf);
540        }
541    }
542
543    #[cfg(feature = "renumber")]
544    fn reduce_starter_pages(&mut self, target: u64) {
545        let resvd = HSIZE + target * self.sp_size as u64;
546        let resvd = (resvd + self.ep_size as u64 - 1) / self.ep_size as u64;
547        while self.ep_resvd > resvd {
548            self.ep_count -= 1;
549            self.header_dirty = true;
550            let from = self.ep_count;
551            self.ep_resvd -= 1;
552            self.relocate(from, self.ep_resvd);
553        }
554        self.write_ep_resvd();
555    }
556} // end impl CompactFile
557
558struct FreeStarterPage {
559    current: u64,
560    count: usize,
561    data: [u8; 64],
562    dirty: bool,
563}
564
565impl FreeStarterPage {
566    fn new() -> Self {
567        Self {
568            count: 0,
569            data: [0; 64],
570            dirty: false,
571            current: u64::MAX,
572        }
573    }
574
575    fn full(&self) -> bool {
576        self.count == 8
577    }
578
579    fn push(&mut self, lpnum: u64) {
580        assert!(self.count < 8);
581        self.set(self.count, lpnum);
582        self.count += 1;
583        self.dirty = true;
584    }
585
586    fn pop(&mut self) -> u64 {
587        assert!(self.count > 0);
588        self.count -= 1;
589        self.dirty = true;
590        self.get(self.count)
591    }
592
593    fn terminate(&mut self) {
594        if self.count < 8 {
595            self.set(self.count, u64::MAX - 2);
596        }
597    }
598
599    fn init(&mut self) {
600        self.count = 0;
601        while self.count < 8 && self.get(self.count) != u64::MAX - 2 {
602            self.count += 1;
603        }
604        self.dirty = false;
605    }
606
607    fn get(&self, ix: usize) -> u64 {
608        let off = ix * 8;
609        util::getu64(&self.data, off)
610    }
611
612    fn set(&mut self, ix: usize, lpnum: u64) {
613        let off = ix * 8;
614        util::setu64(&mut self.data[off..off + 8], lpnum);
615    }
616
617    fn clear(&mut self, current: u64) {
618        self.data.fill(0);
619        self.current = current;
620        self.count = 0;
621        self.dirty = false;
622    }
623}
624
625struct Info {
626    sp_size: usize,
627    ep_size: usize,
628}
629
630impl PageStorageInfo for Info {
631    /// The number of different page sizes.
632    fn sizes(&self) -> usize {
633        (self.sp_size - 2) / 8
634    }
635
636    /// Size index for given page size.
637    fn index(&self, size: usize) -> usize {
638        (size + 2) / (self.ep_size - 16)
639    }
640
641    /// Page size for given index.
642    fn size(&self, ix: usize) -> usize {
643        (self.ep_size - 16) * ix + (self.sp_size - 2)
644    }
645}
646
647#[test]
648pub fn test() {
649    use crate::{AtomicFile, MemFile};
650    use rand::Rng;
651    /* Idea of test is to check two CompactFiles with different parameters behave the same */
652
653    let mut rng = rand::thread_rng();
654
655    let s0 = AtomicFile::new(MemFile::new(), MemFile::new());
656    let s1 = AtomicFile::new(MemFile::new(), MemFile::new());
657
658    let mut cf0 = CompactFile::new(s0, 200, 512);
659    let mut cf1 = CompactFile::new(s1, 136, 1024);
660    for _ in 0..100 {
661        cf0.new_page();
662        cf1.new_page();
663    }
664
665    for _ in 0..10000 {
666        let n: usize = rng.gen::<usize>() % 5000;
667        let p: u64 = rng.gen::<u64>() % 100;
668        let b: u8 = rng.gen::<u8>();
669
670        let d = vec![b; n];
671        let d = Arc::new(d);
672        cf0.set_page(p, d.clone());
673        cf1.set_page(p, d.clone());
674
675        let p: u64 = rng.gen::<u64>() % 100;
676        let x = cf0.get_page(p);
677        let y = cf1.get_page(p);
678        assert!(x == y);
679
680        cf0.save();
681        cf1.save();
682    }
683}