nut/bucket/
bucket.rs

1use either::Either;
2use std::cell::RefCell;
3use std::collections::{hash_map::Entry, HashMap};
4use std::fmt;
5
6use crate::consts::{Flags, PGID};
7use crate::db::DB;
8use crate::errors::Error;
9use crate::node::{Node, NodeBuilder, WeakNode};
10use crate::page::{BranchPageElement, LeafPageElement, OwnedPage, Page};
11use crate::tx::{Tx, WeakTx};
12
13use super::consts::*;
14use super::BucketStats;
15use super::Cursor;
16use super::IBucket;
17use super::PageNode;
18
19/// Bucket represents a collection of key/value pairs inside the database.
20pub struct Bucket {
21    /// ref to on-file representation of a bucket
22    pub(crate) bucket: IBucket,
23
24    /// the associated transaction
25    pub(crate) tx: WeakTx,
26
27    /// subbucket cache
28    buckets: RefCell<HashMap<Vec<u8>, Bucket>>,
29
30    /// inline page reference
31    page: Option<OwnedPage>,
32
33    /// materialized node for the root page
34    root_node: Option<Node>,
35
36    /// node cache
37    pub(crate) nodes: RefCell<HashMap<PGID, Node>>,
38
39    /// Sets the threshold for filling nodes when they split. By default,
40    /// the bucket will fill to 50% but it can be useful to increase this
41    /// amount if you know that your write workloads are mostly append-only.
42    ///
43    /// This is non-persisted across transactions so it must be set in every Tx.
44    pub(crate) fill_percent: f64,
45}
46
47impl fmt::Debug for Bucket {
48    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
49        let tx = self.tx().ok().map(|tx| &tx as *const Tx);
50
51        f.debug_struct("Bucket")
52            .field("bucket", &self.bucket)
53            .field("tx", &tx)
54            .field("buckets", &self.buckets)
55            .field("page", &self.page.as_ref())
56            .field("root_node", &self.root_node)
57            .field("nodes", &*self.nodes.borrow())
58            .field("fill_percent", &self.fill_percent)
59            .finish()
60    }
61}
62
63impl Bucket {
64    pub(crate) const MIN_FILL_PERCENT: f64 = 0.1;
65    pub(crate) const MAX_FILL_PERCENT: f64 = 1.0;
66
67    pub(crate) const DEFAULT_FILL_PERCENT: f64 = 0.5;
68
69    pub(crate) const FLAG: u32 = 0x01;
70
71    pub(crate) fn new(tx: WeakTx) -> Self {
72        Self {
73            bucket: IBucket::new(),
74            tx,
75            buckets: RefCell::new(HashMap::new()),
76            page: None,
77            root_node: None,
78            nodes: RefCell::new(HashMap::new()),
79            fill_percent: Self::DEFAULT_FILL_PERCENT,
80        }
81    }
82
83    // Returns transaction
84    pub fn tx(&self) -> Result<Tx, Error> {
85        self.tx.upgrade().ok_or(Error::TxGone)
86    }
87
88    // Returns database
89    pub fn db(&self) -> Result<DB, Error> {
90        self.tx()?.db()
91    }
92
93    // Returns the root of the bucket
94    pub fn root(&self) -> PGID {
95        self.bucket.root
96    }
97
98    fn root_node(&self) -> Option<Node> {
99        self.root_node.clone()
100    }
101
102    /// Creates a cursor associated with the bucket.
103    pub fn cursor(&self) -> Result<Cursor<&Bucket>, Error> {
104        self.tx()?.0.stats.lock().cursor_count += 1;
105
106        Ok(Cursor::new(self))
107    }
108
109    fn __bucket(&self, name: &[u8]) -> Option<*mut Bucket> {
110        if let Some(b) = self.buckets.borrow_mut().get_mut(name) {
111            return Some(b);
112        };
113        let (key, value) = {
114            let c = self.cursor().unwrap();
115            let (key, value, flags) = c.seek_to_item(name).unwrap().unwrap();
116
117            // Return None if the key doesn't exist or it is not a bucket.
118            if key != Some(name) || (flags & Self::FLAG) == 0 {
119                return None;
120            };
121
122            (key.map(|k| k.to_vec()), value.map(|v| v.to_vec()))
123        };
124
125        // Otherwise create a bucket and cache it.
126        let child = self.open_bucket(value.unwrap());
127
128        let mut buckets = self.buckets.borrow_mut();
129        let bucket = match buckets.entry(key.unwrap()) {
130            Entry::Vacant(e) => e.insert(child),
131            Entry::Occupied(e) => {
132                let c = e.into_mut();
133                *c = child;
134                c
135            }
136        };
137        Some(bucket)
138    }
139
140    /// Retrieves a nested bucket by name.
141    /// Returns None if the bucket does not exist or found item is not bucket.
142    pub fn bucket(&self, key: &[u8]) -> Option<&Bucket> {
143        self.__bucket(key).map(|b| unsafe { &*b })
144    }
145
146    /// Retrieves a nested mutable bucket by name.
147    /// Returns None if the bucket does not exist or found item is not bucket.
148    pub fn bucket_mut(&mut self, key: &[u8]) -> Option<&mut Bucket> {
149        if !self.tx().unwrap().writable() {
150            return None;
151        };
152        self.__bucket(key).map(|b| unsafe { &mut *b })
153    }
154
155    /// Helper method that re-interprets a sub-bucket value
156    /// from a parent into a Bucket
157    ///
158    /// value is bytes serialized bucket
159    pub(crate) fn open_bucket(&self, value: Vec<u8>) -> Bucket {
160        let mut child = Bucket::new(self.tx.clone());
161        // let value = unsafe { value.as_ref().unwrap() };
162
163        // TODO: !!!
164        // If unaligned load/stores are broken on this arch and value is
165        // unaligned simply clone to an aligned byte array.
166        // let unaligned = BROKEN_UNALIGNED && &value[0] uintptr(unsafe.Pointer(&value[0]))&3 != 0
167        // let unaligned = (|| -> bool { unimplemented!() })();
168        // if unaligned {
169        // 	// value = cloneBytes(value)
170        // 	mutvalue = (|| -> &mut [u8] { unimplemented!() })();
171        // }
172
173        // If this is a writable transaction then we need to copy the bucket entry.
174        // Read-only transactions can point directly at the mmap entry.
175        {
176            // TODO:
177            // if self.tx().unwrap().writable() {
178            let b = unsafe { (*(value.as_ptr() as *const IBucket)).clone() };
179            child.bucket = b;
180        }
181
182        // Save a reference to the inline page if the bucket is inline.
183        if child.bucket.root == 0 {
184            let data = unsafe {
185                let slice = &value[IBucket::SIZE..];
186                let mut vec = vec![0u8; slice.len()];
187                std::ptr::copy_nonoverlapping(slice.as_ptr(), vec.as_mut_ptr(), slice.len());
188                vec
189            };
190
191            let p = OwnedPage::from_vec(data);
192            child.page = Some(p);
193        }
194
195        child
196    }
197
198    pub(crate) fn clear(&mut self) {
199        self.buckets.borrow_mut().clear();
200        self.nodes.borrow_mut().clear();
201        self.page = None;
202        self.root_node = None;
203    }
204
205    /// Creates bucket
206    pub fn create_bucket(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
207        {
208            let tx = self.tx()?;
209            if !tx.opened() {
210                return Err(Error::TxClosed);
211            }
212            if !tx.writable() {
213                return Err(Error::DatabaseReadonly);
214            }
215            if key.is_empty() {
216                return Err(Error::NameRequired);
217            }
218        }
219
220        let tx_clone = self.tx.clone();
221
222        {
223            let mut cursor = self.cursor()?;
224            let (ckey, flags) = {
225                let (key, _, flags) = cursor.seek_to_item(key)?.unwrap();
226                (key, flags)
227            };
228
229            if ckey == Some(key) {
230                if (flags & Self::FLAG) != 0 {
231                    return Err(Error::BucketExists);
232                };
233                return Err(Error::IncompatibleValue);
234            };
235
236            let mut bucket = Bucket::new(tx_clone);
237            bucket.root_node = Some(NodeBuilder::new(&bucket).is_leaf(true).build());
238            bucket.fill_percent = Self::DEFAULT_FILL_PERCENT;
239
240            let value = bucket.write();
241            cursor.node().unwrap().put(key, key, value, 0, Self::FLAG);
242            self.page = None;
243        }
244
245        self.bucket_mut(key)
246            .ok_or_else(|| Error::Unexpected("cannot find bucket".to_string()))
247    }
248
249    /// Creates bucket if it not exists
250    pub fn create_bucket_if_not_exists(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
251        match unsafe { &mut *(self as *mut Self) }.create_bucket(key) {
252            Ok(b) => Ok(b),
253            Err(Error::BucketExists) => self
254                .bucket_mut(key)
255                .ok_or_else(|| Error::Unexpected("can't find bucket".to_string())),
256            v => v,
257        }
258    }
259
260    /// Removes bucket and its contents
261    ///
262    /// Returns error if bucket not found or value is not bucket
263    pub fn delete_bucket(&mut self, key: &[u8]) -> Result<(), Error> {
264        {
265            let tx = self.tx()?;
266            if !tx.opened() {
267                return Err(Error::DatabaseClosed);
268            }
269            if !tx.writable() {
270                return Err(Error::DatabaseReadonly);
271            }
272            if key.is_empty() {
273                return Err(Error::NameRequired);
274            }
275        };
276
277        let mut c = self.cursor()?;
278        {
279            let item = c.seek(key)?;
280            if item.key.unwrap() != key {
281                return Err(Error::BucketNotFound);
282            }
283            if !item.is_bucket() {
284                return Err(Error::IncompatibleValue);
285            }
286        }
287        let mut node = c.node()?;
288        {
289            let child = self.bucket_mut(key).ok_or("Can't get bucket")?;
290            let child_buckets = child.buckets();
291
292            for bucket in &child_buckets {
293                child.delete_bucket(bucket)?;
294            }
295
296            // Release all bucket pages to freelist.
297            child.nodes.borrow_mut().clear();
298            child.root_node = None;
299            child.free();
300        }
301
302        self.buckets.borrow_mut().remove(key);
303        node.del(key);
304
305        Ok(())
306    }
307
308    /// Returns list of subbuckets' keys
309    pub fn buckets(&self) -> Vec<Vec<u8>> {
310        let mut names = vec![];
311        self.for_each(Box::new(|k, v| -> Result<(), Error> {
312            if v.is_none() {
313                names.push(k.to_vec());
314            }
315            Ok(())
316        }))
317        .unwrap();
318        names
319    }
320
321    /// Retrieves the value for a key in the bucket.
322    /// Returns a None value if the key does not exist or if the key is a nested bucket.
323    ///
324    /// # Example
325    ///
326    /// ```no_run
327    /// use nut::DBBuilder;
328    ///
329    /// let db = DBBuilder::new("./test.db").build().unwrap();
330    /// let tx = db.begin_tx().unwrap();
331    /// let flowers = tx.bucket(b"flowers").unwrap();
332    ///
333    /// assert_eq!(flowers.get(b"irises").unwrap(), &b"Iris is a genus of species of flowering plants"[..]);
334    /// ```
335    pub fn get(&self, key: &[u8]) -> Option<&[u8]> {
336        let (ckey, value, flags) = self.cursor().unwrap().seek(key).unwrap().unwrap();
337        // let value = value.map(|v| v.to_vec());
338
339        // Return nil if this is a bucket.
340        if (flags & Self::FLAG) != 0 {
341            return None;
342        }
343
344        // If our target node isn't the same key as what's passed in then return nil.
345        if ckey != Some(key) {
346            return None;
347        }
348
349        value
350    }
351
352    /// Sets the value for a key in the bucket.
353    /// If the key exist then its previous value will be overwritten.
354    /// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
355    ///
356    /// # Example
357    ///
358    /// ```no_run
359    /// use nut::DBBuilder;
360    ///
361    /// let mut db = DBBuilder::new("./test.db").build().unwrap();
362    /// let mut tx = db.begin_rw_tx().unwrap();
363    /// let mut flowers = tx.create_bucket(b"flowers").unwrap();
364    ///
365    /// flowers.put(b"irises", b"Iris is a genus of species of flowering plants".to_vec()).unwrap()
366    /// ```
367    pub fn put(&mut self, key: &[u8], value: Vec<u8>) -> Result<(), Error> {
368        if !self.tx()?.opened() {
369            return Err(Error::TxClosed);
370        }
371        if !self.tx()?.writable() {
372            return Err(Error::TxReadonly);
373        }
374        if key.is_empty() {
375            return Err(Error::KeyRequired);
376        }
377        if key.len() > MAX_KEY_SIZE {
378            return Err(Error::KeyTooLarge);
379        }
380        if value.len() > MAX_VALUE_SIZE {
381            return Err(Error::ValueTooLarge);
382        }
383
384        // Move cursor to correct position.
385        let mut c = self.cursor()?;
386        let item = c.seek(key)?;
387
388        // Return an error if there is an existing key with a bucket value.
389        if (Some(key) == item.key) && item.is_bucket() {
390            return Err(Error::IncompatibleValue);
391        }
392
393        // Insert into node.
394        c.node().unwrap().put(key, key, value, 0, 0);
395
396        Ok(())
397    }
398
399    /// Removes a key from the bucket.
400    /// If the key does not exist then nothing is done.
401    /// Returns an error if the bucket was created from a read-only transaction.
402    ///
403    /// # Example
404    ///
405    /// ```no_run
406    /// use nut::DBBuilder;
407    ///
408    /// let mut db = DBBuilder::new("./test.db").build().unwrap();
409    /// let mut tx = db.begin_rw_tx().unwrap();
410    /// let mut flowers = tx.bucket_mut(b"flowers").unwrap();
411    ///
412    /// flowers.delete(b"irises").unwrap()
413    /// ```
414    pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
415        if self.tx()?.db().is_err() {
416            return Err(Error::TxClosed);
417        }
418        if !self.tx()?.writable() {
419            return Err(Error::TxReadonly);
420        }
421
422        // Move cursor to correct position.
423        let mut c = self.cursor()?;
424        let item = c.seek(key)?;
425
426        // Return an error if there is already existing bucket value.
427        if item.is_bucket() {
428            return Err(Error::IncompatibleValue);
429        };
430
431        // Delete the node if we have a matching key.
432        c.node().unwrap().del(key);
433
434        Ok(())
435    }
436
437    /// Returns the current integer for the bucket without incrementing it.
438    pub fn sequence(&self) -> u64 {
439        self.bucket.sequence
440    }
441
442    /// Returns an autoincrementing integer for the bucket.
443    pub fn next_sequence(&mut self) -> Result<u64, Error> {
444        if !self.tx()?.writable() {
445            return Err(Error::TxReadonly);
446        }
447
448        if self.root_node.is_none() {
449            self.node(self.root(), WeakNode::new());
450        }
451
452        self.bucket.sequence += 1;
453
454        Ok(self.bucket.sequence)
455    }
456
457    /// Updates the sequence number for the bucket.
458    pub fn set_sequence(&mut self, value: u64) -> Result<(), Error> {
459        if !self.tx()?.writable() {
460            return Err(Error::TxReadonly);
461        };
462
463        if self.root_node.is_none() {
464            let pgid = self.root();
465            self.node(pgid, WeakNode::new());
466        };
467
468        self.bucket.sequence = value;
469        Ok(())
470    }
471
472    #[allow(clippy::type_complexity)]
473    /// Executes a function for each key/value pair in a bucket.
474    /// If the provided function returns an error then the iteration is stopped and
475    /// the error is returned to the caller.
476    pub fn for_each<'a, E: Into<Error>>(
477        &self,
478        mut handler: Box<dyn FnMut(&[u8], Option<&[u8]>) -> Result<(), E> + 'a>,
479    ) -> Result<(), Error> {
480        if !self.tx()?.opened() {
481            return Err(Error::TxClosed);
482        }
483        let c = self.cursor()?;
484        let mut item = c.first()?;
485        loop {
486            if item.is_none() {
487                break;
488            };
489            handler(item.key.unwrap(), item.value).map_err(|e| e.into())?;
490            item = c.next()?;
491        }
492        Ok(())
493    }
494
495    /// Returns stats on a bucket.
496    pub fn stats(&self) -> BucketStats {
497        let mut stats = BucketStats::default();
498        let mut sub_stats = BucketStats::default();
499        let page_size = self.tx().unwrap().db().unwrap().page_size();
500        stats.bucket_n += 1;
501        if self.bucket.root == 0 {
502            stats.inline_bucket_n += 1;
503        };
504        self.for_each_page(Box::new(|p, depth| {
505            let page_count = p.count as usize;
506            if p.flags == Flags::LEAVES {
507                stats.key_n += page_count;
508                let mut used = Page::header_size();
509                if page_count != 0 {
510                    // If page has any elements, add all element headers.
511                    used += LeafPageElement::SIZE * (page_count - 1);
512                    // Add all element key, value sizes.
513                    // The computation takes advantage of the fact that the position
514                    // of the last element's key/value equals to the total of the sizes
515                    // of all previous elements' keys and values.
516                    // It also includes the last element's header.
517                    let last_element = p.leaf_page_element(page_count - 1);
518                    used += (last_element.pos + last_element.ksize + last_element.vsize) as usize;
519
520                    if self.bucket.root == 0 {
521                        // For inlined bucket just update the inline stats
522                        stats.inline_bucket_in_use += used;
523                    } else {
524                        // For non-inlined bucket update all the leaf stats
525                        stats.leaf_page_n += 1;
526                        stats.leaf_in_use += used;
527                        stats.leaf_overflow_n += p.overflow as usize;
528
529                        // Collect stats from sub-buckets.
530                        // Do that by iterating over all element headers
531                        // looking for the ones with the bucketLeafFlag.
532                        for i in 0..page_count {
533                            let e = p.leaf_page_element(i);
534                            if (e.flags & Self::FLAG) != 0 {
535                                // For any bucket element, open the element value
536                                // and recursively call Stats on the contained bucket.
537                                sub_stats += self.open_bucket(e.value().to_vec()).stats();
538                            };
539                        }
540                    }
541                } else if p.flags == Flags::BRANCHES {
542                    stats.branch_page_n += 1;
543                    let last_element = p.branch_page_element(page_count - 1);
544
545                    // used totals the used bytes for the page
546                    // Add header and all element headers.
547                    let mut used =
548                        Page::header_size() + (BranchPageElement::SIZE * (page_count - 1));
549
550                    // Add size of all keys and values.
551                    // Again, use the fact that last element's position equals to
552                    // the total of key, value sizes of all previous elements.
553                    used += (last_element.pos + last_element.ksize) as usize;
554                    stats.branch_in_use += used;
555                    stats.branch_overflow_n += p.overflow as usize;
556                };
557            };
558
559            // Keep track of maximum page depth.
560            if depth + 1 > stats.depth {
561                stats.depth = depth + 1;
562            }
563        }));
564
565        // Alloc stats can be computed from page counts and pageSize.
566        stats.branch_alloc = (stats.branch_page_n + stats.branch_overflow_n) * page_size;
567        stats.leaf_alloc = (stats.leaf_page_n + stats.leaf_overflow_n) * page_size;
568
569        // Add the max depth of sub-buckets to get total nested depth.
570        stats.depth += sub_stats.depth;
571
572        // Add the stats for all sub-buckets
573        stats += sub_stats;
574        stats
575    }
576
577    /// Iterates over every page in a bucket, including inline pages.
578    fn for_each_page<'a>(&self, mut handler: Box<dyn FnMut(&Page, usize) + 'a>) {
579        // If we have an inline page then just use that.
580        if let Some(ref page) = self.page {
581            handler(page, 0);
582            return;
583        }
584
585        // Otherwise traverse the page hierarchy.
586        self.tx()
587            .unwrap()
588            .for_each_page(self.bucket.root, 0, handler);
589    }
590
591    /// Iterates over every page (or node) in a bucket.
592    /// This also includes inline pages.
593    fn for_each_page_node<F>(&self, mut handler: F)
594    where
595        F: FnMut(Either<&Page, &Node>, isize),
596    {
597        // If we have an inline page then just use that.
598        if let Some(ref page) = self.page {
599            handler(PageNode::from(&**page as *const Page).upgrade(), 0);
600            return;
601        }
602        self.__for_each_page_node(self.bucket.root, 0, &mut handler)
603    }
604
605    fn __for_each_page_node<F>(&self, pgid: PGID, depth: isize, handler: &mut F)
606    where
607        F: FnMut(Either<&Page, &Node>, isize),
608    {
609        let item = self.page_node(pgid).unwrap();
610
611        // Execute function.
612        handler(item.upgrade(), depth);
613
614        // Recursively loop over children.
615        match item.upgrade() {
616            Either::Left(p) => {
617                let is_branch = matches!(p.flags, Flags::BRANCHES);
618                if is_branch {
619                    for i in 0..p.count as usize {
620                        let elem = p.branch_page_element(i);
621                        self.__for_each_page_node(elem.pgid, depth + 1, handler);
622                    }
623                }
624            }
625            Either::Right(n) => {
626                if !n.is_leaf() {
627                    for inode in &*n.0.inodes.borrow() {
628                        self.__for_each_page_node(inode.pgid, depth + 1, handler)
629                    }
630                }
631            }
632        }
633    }
634
635    /// Writes all the nodes for this bucket to dirty pages.
636    pub(crate) fn spill(&mut self) -> Result<(), Error> {
637        let mutself = unsafe { &mut *(self as *mut Self) };
638
639        // Spill all child buckets first.
640        for (name, child) in &mut *self.buckets.borrow_mut() {
641            // If the child bucket is small enough and it has no child buckets then
642            // write it inline into the parent bucket's page. Otherwise spill it
643            // like a normal bucket and make the parent value a pointer to the page.
644            let value = if child.inlineable() {
645                child.free();
646                child.write()
647            } else {
648                child.spill()?;
649
650                // Update the child bucket header in this bucket.
651                let mut vec = vec![0u8; IBucket::SIZE];
652                let bucket_ptr = vec.as_mut_ptr() as *mut IBucket;
653                unsafe { std::ptr::copy_nonoverlapping(&child.bucket, bucket_ptr, 1) };
654                vec
655            };
656
657            // Skip writing the bucket if there are no materialized nodes.
658            if child.root_node.is_none() {
659                continue;
660            };
661
662            // Update parent node.
663            let mut c = mutself.cursor()?;
664            let item = c.seek(name)?;
665            if item.key != Some(name.as_slice()) {
666                return Err(format!(
667                    "misplaced bucket header: {:?} -> {:?}",
668                    name,
669                    item.key.as_ref().unwrap()
670                )
671                .into());
672            }
673            if !item.is_bucket() {
674                return Err(format!("unexpected bucket header flag: {}", item.flags).into());
675            }
676            c.node()?.put(name, name, value, 0, Self::FLAG);
677        }
678
679        // Ignore if there's not a materialized root node.
680        if self.root_node.is_none() {
681            return Ok(());
682        }
683
684        {
685            // Spill nodes.
686            let mut root_node = self.root_node.clone().ok_or("root node empty")?.root();
687            root_node.spill()?;
688            self.root_node = Some(root_node);
689
690            let pgid = self.root_node.as_ref().unwrap().pgid();
691            let txpgid = self.tx()?.pgid();
692
693            // Update the root node for this bucket.
694            if pgid >= txpgid {
695                panic!("pgid ({}) above high water mark ({})", pgid, txpgid);
696            }
697
698            self.bucket.root = pgid;
699        }
700
701        Ok(())
702    }
703
704    /// Returns true if a bucket is small enough to be written inline
705    /// and if it contains no subbuckets. Otherwise returns false.
706    fn inlineable(&self) -> bool {
707        if self.root_node().is_none() || !self.root_node().unwrap().is_leaf() {
708            return false;
709        }
710
711        let mut size = Page::header_size();
712        let node = self.root_node.clone().unwrap();
713
714        for inode in &*node.0.inodes.borrow() {
715            if inode.flags & Self::FLAG != 0 {
716                return false;
717            }
718
719            size += LeafPageElement::SIZE + inode.key.len() + inode.value.len();
720            if size > self.max_inline_bucket_size() {
721                return false;
722            }
723        }
724
725        true
726    }
727
728    /// Returns the maximum total size of a bucket to make it a candidate for inlining.
729    fn max_inline_bucket_size(&self) -> usize {
730        self.tx().unwrap().db().unwrap().page_size() / 4
731    }
732
733    /// Allocates and writes a bucket to a byte slice.
734    fn write(&mut self) -> Vec<u8> {
735        // TODO: optimize this page alloc things
736        // Allocate the appropriate size.
737        let n = self.root_node.as_ref().unwrap();
738        let node_size = n.size();
739        let mut value = vec![0u8; IBucket::SIZE + node_size];
740
741        // Write a bucket header.
742        let bucket_ptr = value.as_mut_ptr() as *mut IBucket;
743        unsafe { std::ptr::copy_nonoverlapping(&self.bucket, bucket_ptr, 1) };
744
745        // Convert byte slice to a fake page and write the root node.
746        {
747            let page_buf = &mut value[IBucket::SIZE..];
748            let page = Page::from_buf_mut(page_buf);
749            n.write(page);
750        }
751
752        value
753    }
754
755    /// Attempts to balance all nodes.
756    pub(crate) fn rebalance(&mut self) {
757        for node in self.nodes.borrow_mut().values_mut() {
758            node.rebalance()
759        }
760        for child in self.buckets.borrow_mut().values_mut() {
761            child.rebalance()
762        }
763    }
764
765    /// Creates a node from a page and associates it with a given parent.
766    pub(crate) fn node(&mut self, pgid: PGID, parent: WeakNode) -> Node {
767        if !self.tx().unwrap().writable() {
768            panic!("tx is read-only");
769        }
770
771        if let Some(n) = self.nodes.borrow().get(&pgid) {
772            return n.clone();
773        };
774
775        let mut node = NodeBuilder::new(self).parent(parent.clone()).build();
776        match parent.upgrade() {
777            None => {
778                self.root_node.replace(node.clone());
779            }
780            Some(ref mut p) => {
781                p.0.children.borrow_mut().push(node.clone());
782            }
783        }
784
785        if let Some(ref page) = self.page {
786            node.read(page);
787        } else {
788            unsafe {
789                node.read(&*self.tx().unwrap().page(pgid).unwrap());
790            }
791        }
792
793        self.nodes.borrow_mut().insert(pgid, node.clone());
794
795        // Update statistics.
796        self.tx().unwrap().0.stats.lock().node_count += 1;
797
798        node
799    }
800
801    /// Recursively frees all pages in the bucket.
802    fn free(&mut self) {
803        if self.bucket.root == 0 {
804            return;
805        };
806
807        let tx = self.tx().unwrap();
808        let db = tx.db().unwrap();
809        self.for_each_page_node(|p, _| match p {
810            Either::Left(p) => {
811                let txid = tx.id();
812                db.0.freelist.write().free(txid, p).unwrap()
813            }
814            Either::Right(n) => n.clone().free(),
815        });
816        self.bucket.root = 0;
817    }
818
819    /// Returns the in-memory node, if it exists.
820    /// Otherwise returns the underlying page.
821    pub(crate) fn page_node(&self, id: PGID) -> Result<PageNode, Error> {
822        // Inline buckets have a fake page embedded in their value so treat them
823        // differently. We'll return the rootNode (if available) or the fake page.
824        if self.bucket.root == 0 {
825            if id != 0 {
826                return Err(format!("inline bucket non-zero page access: {id} != 0").into());
827            }
828            if let Some(ref node) = self.root_node {
829                return Ok(PageNode::from(node.clone()));
830            }
831            return Ok(PageNode::from(
832                &**self.page.as_ref().ok_or("page empty")? as *const Page
833            ));
834        }
835
836        // Check the node cache for non-inline buckets.
837        if let Some(node) = self.nodes.borrow().get(&id) {
838            return Ok(PageNode::from(node.clone()));
839        };
840
841        // Finally lookup the page from the transaction if no node is materialized.
842        Ok(PageNode::from(self.tx()?.page(id)?))
843    }
844}