nut/bucket/
bucket.rs

1use either::Either;
2use std::cell::RefCell;
3use std::collections::{hash_map::Entry, HashMap};
4use std::fmt;
5use std::{isize, usize};
6
7use crate::consts::{Flags, PGID};
8use crate::db::DB;
9use crate::errors::Error;
10use crate::node::{Node, NodeBuilder, WeakNode};
11use crate::page::{BranchPageElement, LeafPageElement, OwnedPage, Page};
12use crate::tx::{Tx, WeakTx};
13
14use super::consts::*;
15use super::BucketStats;
16use super::Cursor;
17use super::IBucket;
18use super::PageNode;
19
20/// Bucket represents a collection of key/value pairs inside the database.
21pub struct Bucket {
22    /// ref to on-file representation of a bucket
23    pub(crate) bucket: IBucket,
24
25    /// the associated transaction
26    pub(crate) tx: WeakTx,
27
28    /// subbucket cache
29    buckets: RefCell<HashMap<Vec<u8>, Bucket>>,
30
31    /// inline page reference
32    page: Option<OwnedPage>,
33
34    /// materialized node for the root page
35    root_node: Option<Node>,
36
37    /// node cache
38    pub(crate) nodes: RefCell<HashMap<PGID, Node>>,
39
40    /// Sets the threshold for filling nodes when they split. By default,
41    /// the bucket will fill to 50% but it can be useful to increase this
42    /// amount if you know that your write workloads are mostly append-only.
43    ///
44    /// This is non-persisted across transactions so it must be set in every Tx.
45    pub(crate) fill_percent: f64,
46}
47
48impl fmt::Debug for Bucket {
49    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50        let tx = self.tx().ok().map(|tx| &tx as *const Tx);
51
52        f.debug_struct("Bucket")
53            .field("bucket", &self.bucket)
54            .field("tx", &tx)
55            .field("buckets", &self.buckets)
56            .field("page", &self.page.as_ref())
57            .field("root_node", &self.root_node)
58            .field("nodes", &*self.nodes.borrow())
59            .field("fill_percent", &self.fill_percent)
60            .finish()
61    }
62}
63
64impl Bucket {
65    pub(crate) const MIN_FILL_PERCENT: f64 = 0.1;
66    pub(crate) const MAX_FILL_PERCENT: f64 = 1.0;
67
68    pub(crate) const DEFAULT_FILL_PERCENT: f64 = 0.5;
69
70    pub(crate) const FLAG: u32 = 0x01;
71
72    pub(crate) fn new(tx: WeakTx) -> Self {
73        Self {
74            bucket: IBucket::new(),
75            tx,
76            buckets: RefCell::new(HashMap::new()),
77            page: None,
78            root_node: None,
79            nodes: RefCell::new(HashMap::new()),
80            fill_percent: Self::DEFAULT_FILL_PERCENT,
81        }
82    }
83
84    // Returns transaction
85    pub fn tx(&self) -> Result<Tx, Error> {
86        self.tx.upgrade().ok_or(Error::TxGone)
87    }
88
89    // Returns database
90    pub fn db(&self) -> Result<DB, Error> {
91        self.tx()?.db()
92    }
93
94    // Returns the root of the bucket
95    pub fn root(&self) -> PGID {
96        self.bucket.root
97    }
98
99    fn root_node(&self) -> Option<Node> {
100        self.root_node.clone()
101    }
102
103    /// Creates a cursor associated with the bucket.
104    pub fn cursor(&self) -> Result<Cursor<&Bucket>, Error> {
105        self.tx()?.0.stats.lock().cursor_count += 1;
106
107        Ok(Cursor::new(self))
108    }
109
110    fn __bucket(&self, name: &[u8]) -> Option<*mut Bucket> {
111        if let Some(b) = self.buckets.borrow_mut().get_mut(name) {
112            return Some(b);
113        };
114        let (key, value) = {
115            let c = self.cursor().unwrap();
116            let (key, value, flags) = c.seek_to_item(name).unwrap().unwrap();
117
118            // Return None if the key doesn't exist or it is not a bucket.
119            if key != Some(name) || (flags & Self::FLAG) == 0 {
120                return None;
121            };
122
123            (key.map(|k| k.to_vec()), value.map(|v| v.to_vec()))
124        };
125
126        // Otherwise create a bucket and cache it.
127        let child = self.open_bucket(value.unwrap());
128
129        let mut buckets = self.buckets.borrow_mut();
130        let bucket = match buckets.entry(key.unwrap()) {
131            Entry::Vacant(e) => e.insert(child),
132            Entry::Occupied(e) => {
133                let c = e.into_mut();
134                *c = child;
135                c
136            }
137        };
138        Some(bucket)
139    }
140
141    /// Retrieves a nested bucket by name.
142    /// Returns None if the bucket does not exist or found item is not bucket.
143    pub fn bucket(&self, key: &[u8]) -> Option<&Bucket> {
144        self.__bucket(key).map(|b| unsafe { &*b })
145    }
146
147    /// Retrieves a nested mutable bucket by name.
148    /// Returns None if the bucket does not exist or found item is not bucket.
149    pub fn bucket_mut(&mut self, key: &[u8]) -> Option<&mut Bucket> {
150        if !self.tx().unwrap().writable() {
151            return None;
152        };
153        self.__bucket(key).map(|b| unsafe { &mut *b })
154    }
155
156    /// Helper method that re-interprets a sub-bucket value
157    /// from a parent into a Bucket
158    ///
159    /// value is bytes serialized bucket
160    pub(crate) fn open_bucket(&self, value: Vec<u8>) -> Bucket {
161        let mut child = Bucket::new(self.tx.clone());
162        // let value = unsafe { value.as_ref().unwrap() };
163
164        // TODO: !!!
165        // If unaligned load/stores are broken on this arch and value is
166        // unaligned simply clone to an aligned byte array.
167        // let unaligned = BROKEN_UNALIGNED && &value[0] uintptr(unsafe.Pointer(&value[0]))&3 != 0
168        // let unaligned = (|| -> bool { unimplemented!() })();
169        // if unaligned {
170        // 	// value = cloneBytes(value)
171        // 	mutvalue = (|| -> &mut [u8] { unimplemented!() })();
172        // }
173
174        // If this is a writable transaction then we need to copy the bucket entry.
175        // Read-only transactions can point directly at the mmap entry.
176        {
177            // TODO:
178            // if self.tx().unwrap().writable() {
179            let b = unsafe { (*(value.as_ptr() as *const IBucket)).clone() };
180            child.bucket = b;
181        }
182
183        // Save a reference to the inline page if the bucket is inline.
184        if child.bucket.root == 0 {
185            let data = unsafe {
186                let slice = &value[IBucket::SIZE..];
187                let mut vec = vec![0u8; slice.len()];
188                std::ptr::copy_nonoverlapping(slice.as_ptr(), vec.as_mut_ptr(), slice.len());
189                vec
190            };
191
192            let p = OwnedPage::from_vec(data);
193            child.page = Some(p);
194        }
195
196        child
197    }
198
199    pub(crate) fn clear(&mut self) {
200        self.buckets.borrow_mut().clear();
201        self.nodes.borrow_mut().clear();
202        self.page = None;
203        self.root_node = None;
204    }
205
206    /// Creates bucket
207    pub fn create_bucket(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
208        {
209            let tx = self.tx()?;
210            if !tx.opened() {
211                return Err(Error::TxClosed);
212            }
213            if !tx.writable() {
214                return Err(Error::DatabaseReadonly);
215            }
216            if key.is_empty() {
217                return Err(Error::NameRequired);
218            }
219        }
220
221        let tx_clone = self.tx.clone();
222
223        {
224            let mut cursor = self.cursor()?;
225            let (ckey, flags) = {
226                let (key, _, flags) = cursor.seek_to_item(key)?.unwrap();
227                (key, flags)
228            };
229
230            if ckey == Some(key) {
231                if (flags & Self::FLAG) != 0 {
232                    return Err(Error::BucketExists);
233                };
234                return Err(Error::IncompatibleValue);
235            };
236
237            let mut bucket = Bucket::new(tx_clone);
238            bucket.root_node = Some(NodeBuilder::new(&bucket).is_leaf(true).build());
239            bucket.fill_percent = Self::DEFAULT_FILL_PERCENT;
240
241            let value = bucket.write();
242            cursor.node().unwrap().put(key, key, value, 0, Self::FLAG);
243            self.page = None;
244        }
245
246        self.bucket_mut(key)
247            .ok_or_else(|| Error::Unexpected("cannot find bucket".to_string()))
248    }
249
250    /// Creates bucket if it not exists
251    pub fn create_bucket_if_not_exists(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
252        match unsafe { &mut *(self as *mut Self) }.create_bucket(key) {
253            Ok(b) => Ok(b),
254            Err(Error::BucketExists) => self
255                .bucket_mut(key)
256                .ok_or_else(|| Error::Unexpected("can't find bucket".to_string())),
257            v => v,
258        }
259    }
260
261    /// Removes bucket and its contents
262    ///
263    /// Returns error if bucket not found or value is not bucket
264    pub fn delete_bucket(&mut self, key: &[u8]) -> Result<(), Error> {
265        {
266            let tx = self.tx()?;
267            if !tx.opened() {
268                return Err(Error::DatabaseClosed);
269            }
270            if !tx.writable() {
271                return Err(Error::DatabaseReadonly);
272            }
273            if key.is_empty() {
274                return Err(Error::NameRequired);
275            }
276        };
277
278        let mut c = self.cursor()?;
279        {
280            let item = c.seek(key)?;
281            if item.key.unwrap() != key {
282                return Err(Error::BucketNotFound);
283            }
284            if !item.is_bucket() {
285                return Err(Error::IncompatibleValue);
286            }
287        }
288        let mut node = c.node()?;
289        {
290            let child = self.bucket_mut(key).ok_or("Can't get bucket")?;
291            let child_buckets = child.buckets();
292
293            for bucket in &child_buckets {
294                child.delete_bucket(bucket)?;
295            }
296
297            // Release all bucket pages to freelist.
298            child.nodes.borrow_mut().clear();
299            child.root_node = None;
300            child.free();
301        }
302
303        self.buckets.borrow_mut().remove(key);
304        node.del(key);
305
306        Ok(())
307    }
308
309    /// Returns list of subbuckets' keys
310    pub fn buckets(&self) -> Vec<Vec<u8>> {
311        let mut names = vec![];
312        self.for_each(Box::new(|k, v| -> Result<(), Error> {
313            if v.is_none() {
314                names.push(k.to_vec());
315            }
316            Ok(())
317        }))
318        .unwrap();
319        names
320    }
321
322    /// Retrieves the value for a key in the bucket.
323    /// Returns a None value if the key does not exist or if the key is a nested bucket.
324    ///
325    /// # Example
326    ///
327    /// ```no_run
328    /// use nut::DBBuilder;
329    ///
330    /// let db = DBBuilder::new("./test.db").build().unwrap();
331    /// let tx = db.begin_tx().unwrap();
332    /// let flowers = tx.bucket(b"flowers").unwrap();
333    ///
334    /// assert_eq!(flowers.get(b"irises").unwrap(), &b"Iris is a genus of species of flowering plants"[..]);
335    /// ```
336    pub fn get(&self, key: &[u8]) -> Option<&[u8]> {
337        let (ckey, value, flags) = self.cursor().unwrap().seek(key).unwrap().unwrap();
338        // let value = value.map(|v| v.to_vec());
339
340        // Return nil if this is a bucket.
341        if (flags & Self::FLAG) != 0 {
342            return None;
343        }
344
345        // If our target node isn't the same key as what's passed in then return nil.
346        if ckey != Some(key) {
347            return None;
348        }
349
350        value
351    }
352
353    /// Sets the value for a key in the bucket.
354    /// If the key exist then its previous value will be overwritten.
355    /// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
356    ///
357    /// # Example
358    ///
359    /// ```no_run
360    /// use nut::DBBuilder;
361    ///
362    /// let mut db = DBBuilder::new("./test.db").build().unwrap();
363    /// let mut tx = db.begin_rw_tx().unwrap();
364    /// let mut flowers = tx.create_bucket(b"flowers").unwrap();
365    ///
366    /// flowers.put(b"irises", b"Iris is a genus of species of flowering plants".to_vec()).unwrap()
367    /// ```
368    pub fn put(&mut self, key: &[u8], value: Vec<u8>) -> Result<(), Error> {
369        if !self.tx()?.opened() {
370            return Err(Error::TxClosed);
371        }
372        if !self.tx()?.writable() {
373            return Err(Error::TxReadonly);
374        }
375        if key.is_empty() {
376            return Err(Error::KeyRequired);
377        }
378        if key.len() > MAX_KEY_SIZE {
379            return Err(Error::KeyTooLarge);
380        }
381        if value.len() > MAX_VALUE_SIZE {
382            return Err(Error::ValueTooLarge);
383        }
384
385        // Move cursor to correct position.
386        let mut c = self.cursor()?;
387        let item = c.seek(key)?;
388
389        // Return an error if there is an existing key with a bucket value.
390        if (Some(key) == item.key) && item.is_bucket() {
391            return Err(Error::IncompatibleValue);
392        }
393
394        // Insert into node.
395        c.node().unwrap().put(key, key, value, 0, 0);
396
397        Ok(())
398    }
399
400    /// Removes a key from the bucket.
401    /// If the key does not exist then nothing is done.
402    /// Returns an error if the bucket was created from a read-only transaction.
403    ///
404    /// # Example
405    ///
406    /// ```no_run
407    /// use nut::DBBuilder;
408    ///
409    /// let mut db = DBBuilder::new("./test.db").build().unwrap();
410    /// let mut tx = db.begin_rw_tx().unwrap();
411    /// let mut flowers = tx.bucket_mut(b"flowers").unwrap();
412    ///
413    /// flowers.delete(b"irises").unwrap()
414    /// ```
415    pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
416        if self.tx()?.db().is_err() {
417            return Err(Error::TxClosed);
418        }
419        if !self.tx()?.writable() {
420            return Err(Error::TxReadonly);
421        }
422
423        // Move cursor to correct position.
424        let mut c = self.cursor()?;
425        let item = c.seek(key)?;
426
427        // Return an error if there is already existing bucket value.
428        if item.is_bucket() {
429            return Err(Error::IncompatibleValue);
430        };
431
432        // Delete the node if we have a matching key.
433        c.node().unwrap().del(key);
434
435        Ok(())
436    }
437
438    /// Returns the current integer for the bucket without incrementing it.
439    pub fn sequence(&self) -> u64 {
440        self.bucket.sequence
441    }
442
443    /// Returns an autoincrementing integer for the bucket.
444    pub fn next_sequence(&mut self) -> Result<u64, Error> {
445        if !self.tx()?.writable() {
446            return Err(Error::TxReadonly);
447        }
448
449        if self.root_node.is_none() {
450            self.node(self.root(), WeakNode::new());
451        }
452
453        self.bucket.sequence += 1;
454
455        Ok(self.bucket.sequence)
456    }
457
458    /// Updates the sequence number for the bucket.
459    pub fn set_sequence(&mut self, value: u64) -> Result<(), Error> {
460        if !self.tx()?.writable() {
461            return Err(Error::TxReadonly);
462        };
463
464        if self.root_node.is_none() {
465            let pgid = self.root();
466            self.node(pgid, WeakNode::new());
467        };
468
469        self.bucket.sequence = value;
470        Ok(())
471    }
472
473    #[allow(clippy::type_complexity)]
474    /// Executes a function for each key/value pair in a bucket.
475    /// If the provided function returns an error then the iteration is stopped and
476    /// the error is returned to the caller.
477    pub fn for_each<'a, E: Into<Error>>(
478        &self,
479        mut handler: Box<dyn FnMut(&[u8], Option<&[u8]>) -> Result<(), E> + 'a>,
480    ) -> Result<(), Error> {
481        if !self.tx()?.opened() {
482            return Err(Error::TxClosed);
483        }
484        let c = self.cursor()?;
485        let mut item = c.first()?;
486        loop {
487            if item.is_none() {
488                break;
489            };
490            handler(item.key.unwrap(), item.value).map_err(|e| e.into())?;
491            item = c.next()?;
492        }
493        Ok(())
494    }
495
496    /// Returns stats on a bucket.
497    pub fn stats(&self) -> BucketStats {
498        let mut stats = BucketStats::default();
499        let mut sub_stats = BucketStats::default();
500        let page_size = self.tx().unwrap().db().unwrap().page_size();
501        stats.bucket_n += 1;
502        if self.bucket.root == 0 {
503            stats.inline_bucket_n += 1;
504        };
505        self.for_each_page(Box::new(|p, depth| {
506            let page_count = p.count as usize;
507            if p.flags == Flags::LEAVES {
508                stats.key_n += page_count;
509                let mut used = Page::header_size();
510                if page_count != 0 {
511                    // If page has any elements, add all element headers.
512                    used += LeafPageElement::SIZE * (page_count - 1);
513                    // Add all element key, value sizes.
514                    // The computation takes advantage of the fact that the position
515                    // of the last element's key/value equals to the total of the sizes
516                    // of all previous elements' keys and values.
517                    // It also includes the last element's header.
518                    let last_element = p.leaf_page_element(page_count - 1);
519                    used += (last_element.pos + last_element.ksize + last_element.vsize) as usize;
520
521                    if self.bucket.root == 0 {
522                        // For inlined bucket just update the inline stats
523                        stats.inline_bucket_in_use += used;
524                    } else {
525                        // For non-inlined bucket update all the leaf stats
526                        stats.leaf_page_n += 1;
527                        stats.leaf_in_use += used;
528                        stats.leaf_overflow_n += p.overflow as usize;
529
530                        // Collect stats from sub-buckets.
531                        // Do that by iterating over all element headers
532                        // looking for the ones with the bucketLeafFlag.
533                        for i in 0..page_count {
534                            let e = p.leaf_page_element(i);
535                            if (e.flags & Self::FLAG) != 0 {
536                                // For any bucket element, open the element value
537                                // and recursively call Stats on the contained bucket.
538                                sub_stats += self.open_bucket(e.value().to_vec()).stats();
539                            };
540                        }
541                    }
542                } else if p.flags == Flags::BRANCHES {
543                    stats.branch_page_n += 1;
544                    let last_element = p.branch_page_element(page_count - 1);
545
546                    // used totals the used bytes for the page
547                    // Add header and all element headers.
548                    let mut used =
549                        Page::header_size() + (BranchPageElement::SIZE * (page_count - 1));
550
551                    // Add size of all keys and values.
552                    // Again, use the fact that last element's position equals to
553                    // the total of key, value sizes of all previous elements.
554                    used += (last_element.pos + last_element.ksize) as usize;
555                    stats.branch_in_use += used;
556                    stats.branch_overflow_n += p.overflow as usize;
557                };
558            };
559
560            // Keep track of maximum page depth.
561            if depth + 1 > stats.depth {
562                stats.depth = depth + 1;
563            }
564        }));
565
566        // Alloc stats can be computed from page counts and pageSize.
567        stats.branch_alloc = (stats.branch_page_n + stats.branch_overflow_n) * page_size;
568        stats.leaf_alloc = (stats.leaf_page_n + stats.leaf_overflow_n) * page_size;
569
570        // Add the max depth of sub-buckets to get total nested depth.
571        stats.depth += sub_stats.depth;
572
573        // Add the stats for all sub-buckets
574        stats += sub_stats;
575        stats
576    }
577
578    /// Iterates over every page in a bucket, including inline pages.
579    fn for_each_page<'a>(&self, mut handler: Box<dyn FnMut(&Page, usize) + 'a>) {
580        // If we have an inline page then just use that.
581        if let Some(ref page) = self.page {
582            handler(page, 0);
583            return;
584        }
585
586        // Otherwise traverse the page hierarchy.
587        self.tx()
588            .unwrap()
589            .for_each_page(self.bucket.root, 0, handler);
590    }
591
592    /// Iterates over every page (or node) in a bucket.
593    /// This also includes inline pages.
594    fn for_each_page_node<F>(&self, mut handler: F)
595    where
596        F: FnMut(Either<&Page, &Node>, isize),
597    {
598        // If we have an inline page then just use that.
599        if let Some(ref page) = self.page {
600            handler(PageNode::from(&**page as *const Page).upgrade(), 0);
601            return;
602        }
603        self.__for_each_page_node(self.bucket.root, 0, &mut handler)
604    }
605
606    fn __for_each_page_node<F>(&self, pgid: PGID, depth: isize, handler: &mut F)
607    where
608        F: FnMut(Either<&Page, &Node>, isize),
609    {
610        let item = self.page_node(pgid).unwrap();
611
612        // Execute function.
613        handler(item.upgrade(), depth);
614
615        // Recursively loop over children.
616        match item.upgrade() {
617            Either::Left(p) => {
618                let is_branch = matches!(p.flags, Flags::BRANCHES);
619                if is_branch {
620                    for i in 0..p.count as usize {
621                        let elem = p.branch_page_element(i);
622                        self.__for_each_page_node(elem.pgid, depth + 1, handler);
623                    }
624                }
625            }
626            Either::Right(n) => {
627                if !n.is_leaf() {
628                    for inode in &*n.0.inodes.borrow() {
629                        self.__for_each_page_node(inode.pgid, depth + 1, handler)
630                    }
631                }
632            }
633        }
634    }
635
636    /// Writes all the nodes for this bucket to dirty pages.
637    pub(crate) fn spill(&mut self) -> Result<(), Error> {
638        let mutself = unsafe { &mut *(self as *mut Self) };
639
640        // Spill all child buckets first.
641        for (name, child) in &mut *self.buckets.borrow_mut() {
642            // If the child bucket is small enough and it has no child buckets then
643            // write it inline into the parent bucket's page. Otherwise spill it
644            // like a normal bucket and make the parent value a pointer to the page.
645            let value = if child.inlineable() {
646                child.free();
647                child.write()
648            } else {
649                child.spill()?;
650
651                // Update the child bucket header in this bucket.
652                let mut vec = vec![0u8; IBucket::SIZE];
653                let bucket_ptr = vec.as_mut_ptr() as *mut IBucket;
654                unsafe { std::ptr::copy_nonoverlapping(&child.bucket, bucket_ptr, 1) };
655                vec
656            };
657
658            // Skip writing the bucket if there are no materialized nodes.
659            if child.root_node.is_none() {
660                continue;
661            };
662
663            // Update parent node.
664            let mut c = mutself.cursor()?;
665            let item = c.seek(name)?;
666            if item.key != Some(name.as_slice()) {
667                return Err(format!(
668                    "misplaced bucket header: {:?} -> {:?}",
669                    name,
670                    item.key.as_ref().unwrap()
671                )
672                .into());
673            }
674            if !item.is_bucket() {
675                return Err(format!("unexpected bucket header flag: {}", item.flags).into());
676            }
677            c.node()?.put(name, name, value, 0, Self::FLAG);
678        }
679
680        // Ignore if there's not a materialized root node.
681        if self.root_node.is_none() {
682            return Ok(());
683        }
684
685        {
686            // Spill nodes.
687            let mut root_node = self.root_node.clone().ok_or("root node empty")?.root();
688            root_node.spill()?;
689            self.root_node = Some(root_node);
690
691            let pgid = self.root_node.as_ref().unwrap().pgid();
692            let txpgid = self.tx()?.pgid();
693
694            // Update the root node for this bucket.
695            if pgid >= txpgid {
696                panic!("pgid ({}) above high water mark ({})", pgid, txpgid);
697            }
698
699            self.bucket.root = pgid;
700        }
701
702        Ok(())
703    }
704
705    /// Returns true if a bucket is small enough to be written inline
706    /// and if it contains no subbuckets. Otherwise returns false.
707    fn inlineable(&self) -> bool {
708        if self.root_node().is_none() || !self.root_node().unwrap().is_leaf() {
709            return false;
710        }
711
712        let mut size = Page::header_size();
713        let node = self.root_node.clone().unwrap();
714
715        for inode in &*node.0.inodes.borrow() {
716            if inode.flags & Self::FLAG != 0 {
717                return false;
718            }
719
720            size += LeafPageElement::SIZE + inode.key.len() + inode.value.len();
721            if size > self.max_inline_bucket_size() {
722                return false;
723            }
724        }
725
726        true
727    }
728
729    /// Returns the maximum total size of a bucket to make it a candidate for inlining.
730    fn max_inline_bucket_size(&self) -> usize {
731        self.tx().unwrap().db().unwrap().page_size() / 4
732    }
733
734    /// Allocates and writes a bucket to a byte slice.
735    fn write(&mut self) -> Vec<u8> {
736        // TODO: optimize this page alloc things
737        // Allocate the appropriate size.
738        let n = self.root_node.as_ref().unwrap();
739        let node_size = n.size();
740        let mut value = vec![0u8; IBucket::SIZE + node_size];
741
742        // Write a bucket header.
743        let bucket_ptr = value.as_mut_ptr() as *mut IBucket;
744        unsafe { std::ptr::copy_nonoverlapping(&self.bucket, bucket_ptr, 1) };
745
746        // Convert byte slice to a fake page and write the root node.
747        {
748            let page_buf = &mut value[IBucket::SIZE..];
749            let page = Page::from_buf_mut(page_buf);
750            n.write(page);
751        }
752
753        value
754    }
755
756    /// Attempts to balance all nodes.
757    pub(crate) fn rebalance(&mut self) {
758        for node in self.nodes.borrow_mut().values_mut() {
759            node.rebalance()
760        }
761        for child in self.buckets.borrow_mut().values_mut() {
762            child.rebalance()
763        }
764    }
765
766    /// Creates a node from a page and associates it with a given parent.
767    pub(crate) fn node(&mut self, pgid: PGID, parent: WeakNode) -> Node {
768        if !self.tx().unwrap().writable() {
769            panic!("tx is read-only");
770        }
771
772        if let Some(n) = self.nodes.borrow().get(&pgid) {
773            return n.clone();
774        };
775
776        let mut node = NodeBuilder::new(self).parent(parent.clone()).build();
777        match parent.upgrade() {
778            None => {
779                self.root_node.replace(node.clone());
780            }
781            Some(ref mut p) => {
782                p.0.children.borrow_mut().push(node.clone());
783            }
784        }
785
786        if let Some(ref page) = self.page {
787            node.read(page);
788        } else {
789            unsafe {
790                node.read(&*self.tx().unwrap().page(pgid).unwrap());
791            }
792        }
793
794        self.nodes.borrow_mut().insert(pgid, node.clone());
795
796        // Update statistics.
797        self.tx().unwrap().0.stats.lock().node_count += 1;
798
799        node
800    }
801
802    /// Recursively frees all pages in the bucket.
803    fn free(&mut self) {
804        if self.bucket.root == 0 {
805            return;
806        };
807
808        let tx = self.tx().unwrap();
809        let db = tx.db().unwrap();
810        self.for_each_page_node(|p, _| match p {
811            Either::Left(p) => {
812                let txid = tx.id();
813                db.0.freelist.write().free(txid, p).unwrap()
814            }
815            Either::Right(n) => n.clone().free(),
816        });
817        self.bucket.root = 0;
818    }
819
820    /// Returns the in-memory node, if it exists.
821    /// Otherwise returns the underlying page.
822    pub(crate) fn page_node(&self, id: PGID) -> Result<PageNode, Error> {
823        // Inline buckets have a fake page embedded in their value so treat them
824        // differently. We'll return the rootNode (if available) or the fake page.
825        if self.bucket.root == 0 {
826            if id != 0 {
827                return Err(format!("inline bucket non-zero page access: {} != 0", id).into());
828            }
829            if let Some(ref node) = self.root_node {
830                return Ok(PageNode::from(node.clone()));
831            }
832            return Ok(PageNode::from(
833                &**self.page.as_ref().ok_or("page empty")? as *const Page
834            ));
835        }
836
837        // Check the node cache for non-inline buckets.
838        if let Some(node) = self.nodes.borrow().get(&id) {
839            return Ok(PageNode::from(node.clone()));
840        };
841
842        // Finally lookup the page from the transaction if no node is materialized.
843        Ok(PageNode::from(self.tx()?.page(id)?))
844    }
845}