rustdb/
sortedfile.rs

1use crate::*;
2
3/// Sorted Record storage.
4///
5/// SortedFile is a tree of pages.
6///
7/// Each page is either a parent page with links to child pages, or a leaf page.
8pub struct SortedFile {
9    /// Map of pages that have not been saved..
10    pub dirty_pages: RefCell<HashMap<u64, PagePtr>>,
11    /// Size of a record.
12    pub rec_size: usize,
13    /// Size of a key.
14    pub key_size: usize,
15    /// The root page.
16    pub root_page: Cell<u64>,
17    /// Status
18    pub ok: Cell<bool>,
19}
20
21impl SortedFile {
22    /// Create File with specified record size, key size, root page.
23    pub fn new(rec_size: usize, key_size: usize, root_page: u64) -> Self {
24        SortedFile {
25            dirty_pages: newmap(),
26            rec_size,
27            key_size,
28            root_page: Cell::new(root_page),
29            ok: Cell::new(true),
30        }
31    }
32
33    /// Test whether file has unsaved changes.
34    pub fn changed(&self) -> bool {
35        let dp = &*self.dirty_pages.borrow();
36        !dp.is_empty()
37    }
38
39    /// Save changes to underlying storage.
40    pub fn save(&self, db: &DB, op: SaveOp) {
41        if op == SaveOp::RollBack {
42            self.rollback();
43            return;
44        }
45        let dp = &mut *self.dirty_pages.borrow_mut();
46        for (pnum, pp) in dp.drain() {
47            let p = &mut *pp.borrow_mut();
48            p.compress(db);
49            p.write_header();
50            debug_assert!(p.pnum == pnum);
51            db.apd.set_data(pnum, p.data.to_data());
52        }
53    }
54
55    /// Clear the cache, changes are discarded instead of being saved.
56    pub fn rollback(&self) {
57        self.dirty_pages.borrow_mut().clear();
58    }
59
60    /// Free the underlying storage. File is not useable after this.
61    pub fn free_pages(&self, db: &DB, r: &dyn Record) {
62        self.free_page(db, self.root_page.get(), r);
63        self.rollback();
64        self.ok.set(false);
65    }
66
67    /// Insert a Record. Panics if the key is a duplicate.
68    pub fn insert(&self, db: &DB, r: &dyn Record) {
69        while !self.insert_leaf(db, self.root_page.get(), r, None) {
70            // We get here if a child page needed to be split.
71        }
72    }
73
74    /// Remove a Record.
75    pub fn remove(&self, db: &DB, r: &dyn Record) {
76        let mut pp = self.load_page(db, self.root_page.get());
77        loop {
78            let cpnum = {
79                let p = &mut *pp.borrow_mut();
80                if p.level == 0 {
81                    self.set_dirty(p, &pp);
82                    p.remove(db, r);
83                    break;
84                }
85                p.find_child(db, r)
86            };
87            pp = self.load_page(db, cpnum);
88        }
89    }
90
91    /// Free a page and any child pages if this is a parent page.
92    fn free_page(&self, db: &DB, pnum: u64, r: &dyn Record) {
93        let pp = self.load_page(db, pnum);
94        let p = &pp.borrow();
95        if p.level != 0 {
96            if p.level > 1 {
97                self.free_page(db, p.first_page, r);
98            } else {
99                db.free_page(p.first_page);
100            }
101            self.free_parent_node(db, p, p.root, r);
102        }
103        db.free_page(pnum);
104    }
105
106    /// Free a parent node.
107    fn free_parent_node(&self, db: &DB, p: &Page, x: usize, r: &dyn Record) {
108        if x != 0 {
109            self.free_parent_node(db, p, p.left(x), r);
110            self.free_parent_node(db, p, p.right(x), r);
111            p.drop_key(db, x, r);
112            let cp = p.child_page(x);
113            if p.level > 1 {
114                self.free_page(db, cp, r);
115            } else {
116                db.free_page(cp);
117            }
118        }
119    }
120
121    /// Locate a record with matching key. Result is PagePtr and offset of data.
122    pub fn get(&self, db: &DB, r: &dyn Record) -> Option<(PagePtr, usize)> {
123        let mut pp = self.load_page(db, self.root_page.get());
124        let off;
125        loop {
126            let cpnum = {
127                let p = pp.borrow();
128                if p.level == 0 {
129                    let x = p.find_equal(db, r);
130                    if x == 0 {
131                        return None;
132                    }
133                    off = p.rec_offset(x);
134                    break;
135                }
136                p.find_child(db, r)
137            };
138            pp = self.load_page(db, cpnum);
139        }
140        Some((pp, off))
141    }
142
143    /// For iteration in ascending order from start.
144    pub fn asc(self: &Rc<Self>, db: &DB, start: Box<dyn Record>) -> Asc {
145        Asc::new(db, start, self)
146    }
147
148    /// For iteration in descending order from start.
149    pub fn dsc(self: &Rc<Self>, db: &DB, start: Box<dyn Record>) -> Dsc {
150        Dsc::new(db, start, self)
151    }
152
153    /// Insert a record into a leaf page.
154    fn insert_leaf(&self, db: &DB, pnum: u64, r: &dyn Record, pi: Option<&ParentInfo>) -> bool {
155        let pp = self.load_page(db, pnum);
156        let cpnum = {
157            // new block to ensure pp borrow is released before recursing.
158            let p = &mut *pp.borrow_mut();
159            if p.level != 0 {
160                p.find_child(db, r)
161            } else if !p.full(db.page_size_max) {
162                self.set_dirty(p, &pp);
163                p.insert(db, r);
164                return true;
165            } else {
166                // Page is full, divide it into left and right.
167                self.remove_page(p.pnum);
168                let sp = Split::new(p, db);
169                let sk = &*p.get_key(db, sp.split_node, r);
170                // Could insert r into left or right here.
171                // sp.right is allocated a new page number.
172                let pnum2 = self.alloc_page(db, sp.right);
173                match pi {
174                    None => {
175                        // No parent, new root page needed. New root uses pnum.
176                        let mut new_root = self.new_page(p.level + 1);
177                        // sp.left is allocated a new page number, which is first page of new root.
178                        new_root.first_page = self.alloc_page(db, sp.left);
179                        self.publish_page(pnum, new_root);
180                        self.append_page(db, pnum, sk, pnum2);
181                    }
182                    Some(pi) => {
183                        self.publish_page(pnum, sp.left);
184                        self.insert_page(db, pi, sk, pnum2);
185                    }
186                }
187                return false; // r has not yet been inserted.
188            }
189        };
190        self.insert_leaf(db, cpnum, r, Some(&ParentInfo { pnum, parent: pi }))
191    }
192
193    /// Insert child into a non-leaf page.
194    fn insert_page(&self, db: &DB, into: &ParentInfo, r: &dyn Record, cpnum: u64) {
195        let pnum = into.pnum;
196        let pp = self.load_page(db, pnum);
197        let p = &mut *pp.borrow_mut();
198        // Need to check if page is full.
199        if !p.full(db.page_size_max) {
200            self.set_dirty(p, &pp);
201            p.insert_page(db, r, cpnum);
202        } else {
203            // Split p.
204            self.remove_page(pnum);
205            let mut sp = Split::new(p, db);
206            let sk = &*p.get_key(db, sp.split_node, r);
207            // Insert into either left or right.
208            let c = p.compare(db, r, sp.split_node);
209            if c == Ordering::Greater {
210                sp.left.insert_page(db, r, cpnum);
211            } else {
212                sp.right.insert_page(db, r, cpnum);
213            }
214            let pnum2 = self.alloc_page(db, sp.right);
215            match into.parent {
216                None => {
217                    debug_assert!(pnum == self.root_page.get());
218                    let mut new_root = self.new_page(p.level + 1);
219                    new_root.first_page = self.alloc_page(db, sp.left);
220                    self.publish_page(pnum, new_root);
221                    self.append_page(db, pnum, sk, pnum2);
222                }
223                Some(pi) => {
224                    self.publish_page(pnum, sp.left);
225                    self.insert_page(db, pi, sk, pnum2);
226                }
227            }
228        }
229    }
230
231    /// Append child to a non-leaf page. Used when a new root page has just been created.
232    fn append_page(&self, db: &DB, into: u64, k: &dyn Record, cpnum: u64) {
233        let pp = self.load_page(db, into);
234        let p = &mut *pp.borrow_mut();
235        self.set_dirty(p, &pp);
236        p.append_page(k, cpnum);
237    }
238
239    /// Construct a new empty page.
240    fn new_page(&self, level: u8) -> Page {
241        Page::new(
242            if level != 0 {
243                self.key_size
244            } else {
245                self.rec_size
246            },
247            level,
248            nd(),
249            u64::MAX,
250        )
251    }
252
253    /// Allocate a page number, publish the page in the cache.
254    fn alloc_page(&self, db: &DB, p: Page) -> u64 {
255        let pnum = db.alloc_page();
256        self.publish_page(pnum, p);
257        pnum
258    }
259
260    /// Publish a page in the cache with specified page number.
261    fn publish_page(&self, pnum: u64, p: Page) {
262        let pp = util::new(p);
263        {
264            let p = &mut *pp.borrow_mut();
265            p.pnum = pnum;
266            self.set_dirty(p, &pp);
267        }
268        self.dirty_pages.borrow_mut().insert(pnum, pp);
269    }
270
271    fn remove_page(&self, pnum: u64) {
272        self.dirty_pages.borrow_mut().remove(&pnum);
273    }
274
275    /// Get a page from the cache, or if it is not in the cache, load it from external storage.
276    fn load_page(&self, db: &DB, pnum: u64) -> PagePtr {
277        if !self.ok.get() {
278            panic!()
279        }
280        if let Some(p) = self.dirty_pages.borrow().get(&pnum) {
281            return p.clone();
282        }
283        let data = db.apd.get_data(pnum);
284        let level = if data.len() == 0 { 0 } else { data[0] };
285        util::new(Page::new(
286            if level != 0 {
287                self.key_size
288            } else {
289                self.rec_size
290            },
291            level,
292            data,
293            pnum,
294        ))
295    }
296
297    /// Mark a page as changed.
298    pub fn set_dirty(&self, p: &mut Page, pp: &PagePtr) {
299        if !p.is_dirty {
300            p.is_dirty = true;
301            self.dirty_pages.borrow_mut().insert(p.pnum, pp.clone());
302        }
303    }
304
305    #[cfg(feature = "pack")]
306    /// Attempt to free up logical pages by re-packing child pages.
307    pub fn repack(&self, db: &DB, r: &dyn Record) -> i64 {
308        let mut freed = 0;
309        self.repack_page(db, self.root_page.get(), r, &mut freed);
310        freed
311    }
312
313    /* Notes on repacking.
314       When repacking a page of level 2 or more, no keys are dropped or created.
315       Instead keys move from the level 1 pages to the level 2 page or vice versa.
316       When repacking a level 1 page, parent keys are dropped, and new keys may be created.
317    */
318
319    #[cfg(feature = "pack")]
320    /// Repack a page. Result is number of pages freed.
321    fn repack_page(&self, db: &DB, pnum: u64, r: &dyn Record, freed: &mut i64) {
322        let pp = self.load_page(db, pnum);
323        let p = &mut pp.borrow_mut();
324
325        if p.level == 0 {
326            return;
327        }
328
329        if p.level > 1 {
330            self.repack_page(db, p.first_page, r, freed);
331            if *freed >= REPACK_LIMIT {
332                return;
333            }
334            self.repack_children(db, p, p.root, r, freed);
335            if *freed >= REPACK_LIMIT {
336                return;
337            }
338        }
339
340        let (x, y) = Self::page_total(db, p, p.root);
341        let n = 1 + x;
342        if n < 2 {
343            return;
344        }
345        let total = y + db.lp_size(p.first_page);
346        let full = (n * db.page_size_max) as u64;
347        let space = full - total;
348
349        let div = std::cmp::min(5, n as u64);
350        if space < full / div {
351            return;
352        }
353
354        // Iterate over the page child records, appending them into a PageList of new pages.
355        let mut plist = PageList::default();
356        plist.add(db, p.first_page, r, self, None, 0);
357        self.move_children(db, p, p.root, r, &mut plist);
358        let n1 = plist.list.len();
359
360        if TRACE_PACK {
361            println!(
362                "pnum={} level={} #children={} total={} full={} space={}",
363                pnum,
364                p.level,
365                n,
366                total,
367                full,
368                full - total
369            );
370            println!(
371                "new #children={} diff={} record count={}",
372                n1,
373                n - n1,
374                plist.packed_record_count
375            );
376        }
377        *freed += plist.store_to(db, p, self);
378    }
379
380    #[cfg(feature = "pack")]
381    /// Count number child pages and their total size, to decide whether to repack a parent page.
382    fn page_total(db: &DB, p: &Page, x: usize) -> (usize, u64) {
383        if x == 0 {
384            return (0, 0);
385        }
386        let cp = p.child_page(x);
387        let cp_size = db.lp_size(cp);
388        let (n1, t1) = Self::page_total(db, p, p.left(x));
389        let (n2, t2) = Self::page_total(db, p, p.right(x));
390        (1 + n1 + n2, cp_size + t1 + t2)
391    }
392
393    #[cfg(feature = "pack")]
394    /// Move child records into PageList.
395    fn move_children(&self, db: &DB, p: &Page, x: usize, r: &dyn Record, plist: &mut PageList) {
396        if x != 0 {
397            self.move_children(db, p, p.left(x), r, plist);
398            let cp = p.child_page(x);
399            plist.add(db, cp, r, self, Some(p), x);
400            if p.level == 1 {
401                p.drop_key(db, x, r);
402            }
403            self.move_children(db, p, p.right(x), r, plist);
404        }
405    }
406
407    #[cfg(feature = "pack")]
408    fn repack_children(&self, db: &DB, p: &Page, x: usize, r: &dyn Record, freed: &mut i64) {
409        if x != 0 {
410            self.repack_page(db, p.child_page(x), r, freed);
411            if *freed >= REPACK_LIMIT {
412                return;
413            }
414            self.repack_children(db, p, p.left(x), r, freed);
415            if *freed >= REPACK_LIMIT {
416                return;
417            }
418            self.repack_children(db, p, p.right(x), r, freed);
419        }
420    }
421
422    #[cfg(feature = "verify")]
423    /// Get the set of used logical pages.
424    pub fn get_used(&self, db: &DB, to: &mut HashSet<u64>) {
425        self.get_used_page(db, to, self.root_page.get(), 255);
426    }
427
428    #[cfg(feature = "verify")]
429    fn get_used_page(&self, db: &DB, to: &mut HashSet<u64>, pnum: u64, level: u8) {
430        assert!(to.insert(pnum));
431        if level == 0 {
432            return;
433        }
434        let pp = self.load_page(db, pnum);
435        let p = &pp.borrow();
436        if p.level != 0 {
437            self.get_used_page(db, to, p.first_page, p.level - 1);
438            self.get_used_pages(db, to, p, p.root);
439        }
440    }
441
442    #[cfg(feature = "verify")]
443    fn get_used_pages(&self, db: &DB, to: &mut HashSet<u64>, p: &Page, x: usize) {
444        if x != 0 {
445            self.get_used_pages(db, to, p, p.left(x));
446            self.get_used_pages(db, to, p, p.right(x));
447            if p.level > 0 {
448                self.get_used_page(db, to, p.child_page(x), p.level - 1);
449            }
450        }
451    }
452
453    #[cfg(feature = "renumber")]
454    /// Renumber, adjusting cache.
455    pub fn ren(&self, from: u64, db: &DB) -> u64 {
456        let to = db.apd.renumber_page(from);
457        let mut dp = self.dirty_pages.borrow_mut();
458        if let Some(p) = dp.remove(&from) {
459            {
460                let mut p = p.borrow_mut();
461                debug_assert!(p.pnum == from);
462                p.pnum = to;
463            }
464            dp.insert(to, p);
465        }
466        to
467    }
468
469    #[cfg(feature = "renumber")]
470    /// Renumber pages >= target.
471    pub fn renumber(&self, db: &DB, target: u64) {
472        self.renumber_page(db, target, self.root_page.get());
473    }
474
475    #[cfg(feature = "renumber")]
476    fn renumber_page(&self, db: &DB, target: u64, pnum: u64) {
477        let pp = self.load_page(db, pnum);
478        let p = &mut pp.borrow_mut();
479        if p.level != 0 {
480            if p.first_page >= target {
481                p.first_page = self.ren(p.first_page, db);
482                self.set_dirty(p, &pp);
483            }
484            if p.level > 1 {
485                self.renumber_page(db, target, p.first_page);
486            }
487            let root = p.root;
488            if self.renumber_node(db, target, p, root) {
489                self.set_dirty(p, &pp);
490            }
491        }
492    }
493
494    #[cfg(feature = "renumber")]
495    fn renumber_node(&self, db: &DB, target: u64, p: &mut Page, x: usize) -> bool {
496        if x == 0 {
497            return false;
498        }
499        let mut cp = p.child_page(x);
500        let cp_ren = cp >= target;
501        if cp_ren {
502            cp = self.ren(cp, db);
503            p.set_child_page(x, cp);
504        }
505        if p.level > 1 {
506            self.renumber_page(db, target, cp);
507        }
508        cp_ren
509            | self.renumber_node(db, target, p, p.left(x))
510            | self.renumber_node(db, target, p, p.right(x))
511    }
512} // end impl SortedFile
513
514/// Used to pass parent page number for insert operations.
515struct ParentInfo<'a> {
516    pnum: u64,
517    parent: Option<&'a ParentInfo<'a>>,
518}
519
520/// For dividing full pages into two.
521struct Split {
522    count: usize,
523    split_node: usize,
524    left: Page,
525    right: Page,
526    half_page_size: usize,
527    left_full: bool,
528    got_split: bool,
529}
530
531impl Split {
532    /// Split the records of p into two new pages.
533    fn new(p: &mut Page, db: &DB) -> Self {
534        let half_page_size = db.apd.spd.psi.half_size_page();
535        let mut result = Split {
536            count: 0,
537            split_node: 0,
538            left: p.new_page(half_page_size),
539            right: p.new_page(half_page_size),
540            half_page_size,
541            left_full: false,
542            got_split: false,
543        };
544        result.left.first_page = p.first_page;
545        result.split(p, p.root);
546        assert!(result.split_node != 0);
547        result
548    }
549
550    fn split(&mut self, p: &Page, x: usize) {
551        if x != 0 {
552            self.split(p, p.left(x));
553            if !self.left_full
554                && !self.left.full(self.half_page_size)
555                && self.left.count + 1 < p.count
556            {
557                self.left.append_from(p, x);
558            } else {
559                self.left_full = true;
560                if !self.got_split {
561                    self.split_node = x;
562                    self.got_split = true;
563                }
564                self.right.append_from(p, x);
565            }
566            self.count += 1;
567            self.split(p, p.right(x));
568        }
569    }
570} // end impl Split
571
572/// A record to be stored in a SortedFile.
573pub trait Record {
574    /// Compare record with stored bytes.
575    fn compare(&self, db: &DB, data: &[u8]) -> Ordering;
576    /// Save record as bytes.
577    fn save(&self, _data: &mut [u8]) {}
578    /// Load key from bytes ( to store in parent page ).
579    fn key(&self, _db: &DB, data: &[u8]) -> Box<dyn Record> {
580        Box::new(Id {
581            id: util::getu64(data, 0),
582        })
583    }
584    /// Drop parent key ( may need to delete codes ).
585    fn drop_key(&self, _db: &DB, _data: &[u8]) {}
586}
587
588/// Id record.
589pub struct Id {
590    /// Id.
591    pub id: u64,
592}
593
594impl Record for Id {
595    fn compare(&self, _db: &DB, data: &[u8]) -> Ordering {
596        let id = util::getu64(data, 0);
597        self.id.cmp(&id)
598    }
599
600    fn save(&self, data: &mut [u8]) {
601        util::setu64(data, self.id);
602    }
603}
604
605/// Fetch records from SortedFile in ascending order. The iterator result is a PagePtr and offset of the data.
606pub struct Asc {
607    stk: Stack,
608    file: Rc<SortedFile>,
609}
610
611impl Asc {
612    fn new(db: &DB, start: Box<dyn Record>, file: &Rc<SortedFile>) -> Self {
613        let root_page = file.root_page.get();
614        let mut result = Asc {
615            stk: Stack::new(db, start),
616            file: file.clone(),
617        };
618        let pp = file.load_page(db, root_page);
619        result.stk.push(&pp, 0);
620        result
621    }
622}
623
624impl Iterator for Asc {
625    type Item = (PagePtr, usize);
626
627    fn next(&mut self) -> Option<<Self as Iterator>::Item> {
628        self.stk.next(&self.file)
629    }
630}
631
632/// Fetch records from SortedFile in descending order.
633pub struct Dsc {
634    stk: Stack,
635    file: Rc<SortedFile>,
636}
637
638impl Dsc {
639    fn new(db: &DB, start: Box<dyn Record>, file: &Rc<SortedFile>) -> Self {
640        let root_page = file.root_page.get();
641        let mut result = Dsc {
642            stk: Stack::new(db, start),
643            file: file.clone(),
644        };
645        result.stk.add_page_dsc(file, root_page);
646        result
647    }
648}
649
650impl Iterator for Dsc {
651    type Item = (PagePtr, usize);
652
653    fn next(&mut self) -> Option<<Self as Iterator>::Item> {
654        self.stk.prev(&self.file)
655    }
656}
657
658/// Stack for implementing iteration.
659struct Stack {
660    v: Vec<(PagePtr, usize)>,
661    start: Box<dyn Record>,
662    seeking: bool,
663    db: DB,
664}
665
666impl Stack {
667    /// Create a new Stack with specified start key.
668    fn new(db: &DB, start: Box<dyn Record>) -> Self {
669        Stack {
670            v: Vec::with_capacity(16),
671            start,
672            seeking: true,
673            db: db.clone(),
674        }
675    }
676
677    /// Push page ptr and offset onto stack.
678    fn push(&mut self, pp: &PagePtr, off: usize) {
679        self.v.push((pp.clone(), off));
680    }
681
682    /// Fetch the next record.
683    fn next(&mut self, file: &SortedFile) -> Option<(PagePtr, usize)> {
684        while let Some((pp, x)) = self.v.pop() {
685            if x == 0 {
686                self.add_page_asc(file, pp);
687            } else {
688                // Do it this way to avoid clone of pp.
689                let off = {
690                    let p = &pp.borrow();
691                    self.add_asc(p, &pp, p.left(x));
692                    if p.level != 0 {
693                        let cpnum = p.child_page(x);
694                        let cpp = file.load_page(&self.db, cpnum);
695                        self.add_page_asc(file, cpp);
696                        continue;
697                    } else {
698                        p.rec_offset(x)
699                    }
700                };
701                self.seeking = false;
702                return Some((pp, off));
703            }
704        }
705        None
706    }
707
708    /// Fetch the previous record.
709    fn prev(&mut self, file: &SortedFile) -> Option<(PagePtr, usize)> {
710        while let Some((pp, x)) = self.v.pop() {
711            let off = {
712                let p = &pp.borrow();
713                self.add_dsc(p, &pp, p.right(x));
714                if p.level != 0 {
715                    let cpnum = p.child_page(x);
716                    self.add_page_dsc(file, cpnum);
717                    continue;
718                } else {
719                    p.rec_offset(x)
720                }
721            };
722            self.seeking = false;
723            return Some((pp, off));
724        }
725        None
726    }
727
728    /// Seek ascending order. Note that smaller keys are in the right sub-tree.
729    fn seek_asc(&mut self, p: &Page, pp: &PagePtr, mut x: usize) {
730        while x != 0 {
731            match p.compare(&self.db, &*self.start, x) {
732                Ordering::Less => {
733                    // Start is less than node key. node needs to be visited, so push it onto stack.
734                    self.push(pp, x);
735                    x = p.right(x);
736                }
737                Ordering::Equal => {
738                    self.push(pp, x);
739                    break;
740                }
741                Ordering::Greater => x = p.left(x),
742            }
743        }
744    }
745
746    /// Returns true if a node is found which is <= start.
747    /// This is used to decide whether the the preceding child page is added.
748    fn seek_dsc(&mut self, p: &Page, pp: &PagePtr, mut x: usize) -> bool {
749        while x != 0 {
750            match p.compare(&self.db, &*self.start, x) {
751                Ordering::Less => {
752                    if !self.seek_dsc(p, pp, p.right(x)) && p.level != 0 {
753                        self.push(pp, x);
754                    }
755                    return true;
756                }
757                Ordering::Equal => {
758                    self.push(pp, x);
759                    return true;
760                }
761                Ordering::Greater => {
762                    self.push(pp, x);
763                    x = p.left(x);
764                }
765            }
766        }
767        false
768    }
769
770    fn add_asc(&mut self, p: &Page, pp: &PagePtr, mut x: usize) {
771        while x != 0 {
772            self.push(pp, x);
773            x = p.right(x);
774        }
775    }
776
777    fn add_dsc(&mut self, p: &Page, pp: &PagePtr, mut x: usize) {
778        while x != 0 {
779            self.push(pp, x);
780            x = p.left(x);
781        }
782    }
783
784    fn add_page_asc(&mut self, file: &SortedFile, pp: PagePtr) {
785        let p = &pp.borrow();
786        if p.level != 0 {
787            let fp = file.load_page(&self.db, p.first_page);
788            self.push(&fp, 0);
789        }
790        let root = p.root;
791        if self.seeking {
792            self.seek_asc(p, &pp, root);
793        } else {
794            self.add_asc(p, &pp, root);
795        }
796    }
797
798    fn add_page_dsc(&mut self, file: &SortedFile, mut pnum: u64) {
799        loop {
800            let pp = file.load_page(&self.db, pnum);
801            let p = &pp.borrow();
802            let root = p.root;
803            if self.seeking {
804                if self.seek_dsc(p, &pp, root) {
805                    return;
806                }
807            } else {
808                self.add_dsc(p, &pp, root);
809            }
810            if p.level == 0 {
811                return;
812            }
813            pnum = p.first_page;
814        }
815    }
816} // end impl Stack
817
818#[cfg(feature = "pack")]
819enum PKey {
820    None,
821    Dyn(Box<dyn Record>),
822    Copy(Vec<u8>),
823}
824
825#[cfg(feature = "pack")]
826/// PageList is used to implement repacking of child pages ( REPACKFILE builtin function ).
827/// First add is called repeatedly to add records of the original child pages.
828/// Then store_to is called to build the new parent page.
829
830#[derive(Default)]
831struct PageList {
832    /// List of child pages and their keys.
833    list: Vec<(Page, PKey)>,
834    /// Child page numbers.
835    pnums: Vec<u64>,
836    /// Number of child records (for tracing only).
837    packed_record_count: usize,
838}
839
840#[cfg(feature = "pack")]
841const TRACE_PACK: bool = false;
842
843#[cfg(feature = "pack")]
844/// Limit on how many pages to free in one transaction.
845const REPACK_LIMIT: i64 = 100;
846
847#[cfg(feature = "pack")]
848impl PageList {
849    /// Add a page to the PageList.
850    fn add(
851        &mut self,
852        db: &DB,
853        pnum: u64,
854        r: &dyn Record,
855        file: &SortedFile,
856        par: Option<&Page>,
857        px: usize,
858    ) {
859        let pp = file.load_page(db, pnum);
860        let p = &mut pp.borrow_mut();
861
862        if self.list.is_empty() {
863            self.list.push((p.new_page(8192), PKey::None));
864        }
865        self.pnums.push(pnum);
866        file.remove_page(pnum);
867        p.pnum = u64::MAX;
868
869        if p.level > 0
870        // Need to save p.first_page. Append the parent key, then fixup the page number.
871        {
872            if let Some(pp) = par {
873                self.append_one(db, pp, px, r);
874            }
875            let cur = self.list.len() - 1;
876            let ap = &mut self.list[cur].0;
877            let x = ap.count;
878            if x == 0 {
879                ap.first_page = p.first_page;
880            } else {
881                ap.set_child_page(x, p.first_page);
882            }
883        }
884        // Append the page records by recursing from p.root.
885        self.append(db, p, p.root, r);
886    }
887
888    /// Append page records.
889    fn append(&mut self, db: &DB, p: &Page, x: usize, r: &dyn Record) {
890        if x != 0 {
891            self.append(db, p, p.left(x), r);
892            self.append_one(db, p, x, r);
893            self.append(db, p, p.right(x), r);
894        }
895    }
896
897    /// Append a single page record.
898    fn append_one(&mut self, db: &DB, p: &Page, x: usize, r: &dyn Record) {
899        self.packed_record_count += 1;
900        let cur = self.list.len() - 1;
901        let mut ap = &mut self.list[cur].0;
902        if ap.full(db.page_size_max) {
903            // Start a new page.
904            let key = if ap.level == 0 {
905                PKey::Dyn(p.get_key(db, x, r))
906            } else {
907                PKey::Copy(p.copy(x))
908            };
909            self.list.push((p.new_page(8192), key));
910            ap = &mut self.list[cur + 1].0;
911        }
912        ap.append_from(p, x);
913    }
914
915    /// Build new parent page.
916    fn store_to(&mut self, db: &DB, p: &mut Page, file: &SortedFile) -> i64 {
917        let mut pnums = std::mem::take(&mut self.pnums);
918        let list = std::mem::take(&mut self.list);
919        let mut np = p.new_page(8192);
920
921        for (cp, key) in list {
922            let cpnum = pnums.pop().unwrap();
923            file.publish_page(cpnum, cp);
924            match key {
925                PKey::Copy(b) => np.append_page_copy(&b, cpnum),
926                PKey::Dyn(key) => np.append_page(&*key, cpnum),
927                PKey::None => np.first_page = cpnum,
928            }
929        }
930
931        let pnum = p.pnum;
932        p.pnum = u64::MAX;
933        file.publish_page(pnum, np);
934
935        if TRACE_PACK {
936            println!("Free pages from pack={:?}", pnums);
937        }
938        let result = pnums.len();
939        while let Some(pnum) = pnums.pop() {
940            db.free_page(pnum);
941        }
942        result as i64
943    }
944}