1use either::Either;
2use std::cell::RefCell;
3use std::collections::{hash_map::Entry, HashMap};
4use std::fmt;
5
6use crate::consts::{Flags, PGID};
7use crate::db::DB;
8use crate::errors::Error;
9use crate::node::{Node, NodeBuilder, WeakNode};
10use crate::page::{BranchPageElement, LeafPageElement, OwnedPage, Page};
11use crate::tx::{Tx, WeakTx};
12
13use super::consts::*;
14use super::BucketStats;
15use super::Cursor;
16use super::IBucket;
17use super::PageNode;
18
19pub struct Bucket {
21 pub(crate) bucket: IBucket,
23
24 pub(crate) tx: WeakTx,
26
27 buckets: RefCell<HashMap<Vec<u8>, Bucket>>,
29
30 page: Option<OwnedPage>,
32
33 root_node: Option<Node>,
35
36 pub(crate) nodes: RefCell<HashMap<PGID, Node>>,
38
39 pub(crate) fill_percent: f64,
45}
46
47impl fmt::Debug for Bucket {
48 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
49 let tx = self.tx().ok().map(|tx| &tx as *const Tx);
50
51 f.debug_struct("Bucket")
52 .field("bucket", &self.bucket)
53 .field("tx", &tx)
54 .field("buckets", &self.buckets)
55 .field("page", &self.page.as_ref())
56 .field("root_node", &self.root_node)
57 .field("nodes", &*self.nodes.borrow())
58 .field("fill_percent", &self.fill_percent)
59 .finish()
60 }
61}
62
63impl Bucket {
64 pub(crate) const MIN_FILL_PERCENT: f64 = 0.1;
65 pub(crate) const MAX_FILL_PERCENT: f64 = 1.0;
66
67 pub(crate) const DEFAULT_FILL_PERCENT: f64 = 0.5;
68
69 pub(crate) const FLAG: u32 = 0x01;
70
71 pub(crate) fn new(tx: WeakTx) -> Self {
72 Self {
73 bucket: IBucket::new(),
74 tx,
75 buckets: RefCell::new(HashMap::new()),
76 page: None,
77 root_node: None,
78 nodes: RefCell::new(HashMap::new()),
79 fill_percent: Self::DEFAULT_FILL_PERCENT,
80 }
81 }
82
83 pub fn tx(&self) -> Result<Tx, Error> {
85 self.tx.upgrade().ok_or(Error::TxGone)
86 }
87
88 pub fn db(&self) -> Result<DB, Error> {
90 self.tx()?.db()
91 }
92
93 pub fn root(&self) -> PGID {
95 self.bucket.root
96 }
97
98 fn root_node(&self) -> Option<Node> {
99 self.root_node.clone()
100 }
101
102 pub fn cursor(&self) -> Result<Cursor<&Bucket>, Error> {
104 self.tx()?.0.stats.lock().cursor_count += 1;
105
106 Ok(Cursor::new(self))
107 }
108
109 fn __bucket(&self, name: &[u8]) -> Option<*mut Bucket> {
110 if let Some(b) = self.buckets.borrow_mut().get_mut(name) {
111 return Some(b);
112 };
113 let (key, value) = {
114 let c = self.cursor().unwrap();
115 let (key, value, flags) = c.seek_to_item(name).unwrap().unwrap();
116
117 if key != Some(name) || (flags & Self::FLAG) == 0 {
119 return None;
120 };
121
122 (key.map(|k| k.to_vec()), value.map(|v| v.to_vec()))
123 };
124
125 let child = self.open_bucket(value.unwrap());
127
128 let mut buckets = self.buckets.borrow_mut();
129 let bucket = match buckets.entry(key.unwrap()) {
130 Entry::Vacant(e) => e.insert(child),
131 Entry::Occupied(e) => {
132 let c = e.into_mut();
133 *c = child;
134 c
135 }
136 };
137 Some(bucket)
138 }
139
140 pub fn bucket(&self, key: &[u8]) -> Option<&Bucket> {
143 self.__bucket(key).map(|b| unsafe { &*b })
144 }
145
146 pub fn bucket_mut(&mut self, key: &[u8]) -> Option<&mut Bucket> {
149 if !self.tx().unwrap().writable() {
150 return None;
151 };
152 self.__bucket(key).map(|b| unsafe { &mut *b })
153 }
154
155 pub(crate) fn open_bucket(&self, value: Vec<u8>) -> Bucket {
160 let mut child = Bucket::new(self.tx.clone());
161 {
176 let b = unsafe { (*(value.as_ptr() as *const IBucket)).clone() };
179 child.bucket = b;
180 }
181
182 if child.bucket.root == 0 {
184 let data = unsafe {
185 let slice = &value[IBucket::SIZE..];
186 let mut vec = vec![0u8; slice.len()];
187 std::ptr::copy_nonoverlapping(slice.as_ptr(), vec.as_mut_ptr(), slice.len());
188 vec
189 };
190
191 let p = OwnedPage::from_vec(data);
192 child.page = Some(p);
193 }
194
195 child
196 }
197
198 pub(crate) fn clear(&mut self) {
199 self.buckets.borrow_mut().clear();
200 self.nodes.borrow_mut().clear();
201 self.page = None;
202 self.root_node = None;
203 }
204
205 pub fn create_bucket(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
207 {
208 let tx = self.tx()?;
209 if !tx.opened() {
210 return Err(Error::TxClosed);
211 }
212 if !tx.writable() {
213 return Err(Error::DatabaseReadonly);
214 }
215 if key.is_empty() {
216 return Err(Error::NameRequired);
217 }
218 }
219
220 let tx_clone = self.tx.clone();
221
222 {
223 let mut cursor = self.cursor()?;
224 let (ckey, flags) = {
225 let (key, _, flags) = cursor.seek_to_item(key)?.unwrap();
226 (key, flags)
227 };
228
229 if ckey == Some(key) {
230 if (flags & Self::FLAG) != 0 {
231 return Err(Error::BucketExists);
232 };
233 return Err(Error::IncompatibleValue);
234 };
235
236 let mut bucket = Bucket::new(tx_clone);
237 bucket.root_node = Some(NodeBuilder::new(&bucket).is_leaf(true).build());
238 bucket.fill_percent = Self::DEFAULT_FILL_PERCENT;
239
240 let value = bucket.write();
241 cursor.node().unwrap().put(key, key, value, 0, Self::FLAG);
242 self.page = None;
243 }
244
245 self.bucket_mut(key)
246 .ok_or_else(|| Error::Unexpected("cannot find bucket".to_string()))
247 }
248
249 pub fn create_bucket_if_not_exists(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
251 match unsafe { &mut *(self as *mut Self) }.create_bucket(key) {
252 Ok(b) => Ok(b),
253 Err(Error::BucketExists) => self
254 .bucket_mut(key)
255 .ok_or_else(|| Error::Unexpected("can't find bucket".to_string())),
256 v => v,
257 }
258 }
259
260 pub fn delete_bucket(&mut self, key: &[u8]) -> Result<(), Error> {
264 {
265 let tx = self.tx()?;
266 if !tx.opened() {
267 return Err(Error::DatabaseClosed);
268 }
269 if !tx.writable() {
270 return Err(Error::DatabaseReadonly);
271 }
272 if key.is_empty() {
273 return Err(Error::NameRequired);
274 }
275 };
276
277 let mut c = self.cursor()?;
278 {
279 let item = c.seek(key)?;
280 if item.key.unwrap() != key {
281 return Err(Error::BucketNotFound);
282 }
283 if !item.is_bucket() {
284 return Err(Error::IncompatibleValue);
285 }
286 }
287 let mut node = c.node()?;
288 {
289 let child = self.bucket_mut(key).ok_or("Can't get bucket")?;
290 let child_buckets = child.buckets();
291
292 for bucket in &child_buckets {
293 child.delete_bucket(bucket)?;
294 }
295
296 child.nodes.borrow_mut().clear();
298 child.root_node = None;
299 child.free();
300 }
301
302 self.buckets.borrow_mut().remove(key);
303 node.del(key);
304
305 Ok(())
306 }
307
308 pub fn buckets(&self) -> Vec<Vec<u8>> {
310 let mut names = vec![];
311 self.for_each(Box::new(|k, v| -> Result<(), Error> {
312 if v.is_none() {
313 names.push(k.to_vec());
314 }
315 Ok(())
316 }))
317 .unwrap();
318 names
319 }
320
321 pub fn get(&self, key: &[u8]) -> Option<&[u8]> {
336 let (ckey, value, flags) = self.cursor().unwrap().seek(key).unwrap().unwrap();
337 if (flags & Self::FLAG) != 0 {
341 return None;
342 }
343
344 if ckey != Some(key) {
346 return None;
347 }
348
349 value
350 }
351
352 pub fn put(&mut self, key: &[u8], value: Vec<u8>) -> Result<(), Error> {
368 if !self.tx()?.opened() {
369 return Err(Error::TxClosed);
370 }
371 if !self.tx()?.writable() {
372 return Err(Error::TxReadonly);
373 }
374 if key.is_empty() {
375 return Err(Error::KeyRequired);
376 }
377 if key.len() > MAX_KEY_SIZE {
378 return Err(Error::KeyTooLarge);
379 }
380 if value.len() > MAX_VALUE_SIZE {
381 return Err(Error::ValueTooLarge);
382 }
383
384 let mut c = self.cursor()?;
386 let item = c.seek(key)?;
387
388 if (Some(key) == item.key) && item.is_bucket() {
390 return Err(Error::IncompatibleValue);
391 }
392
393 c.node().unwrap().put(key, key, value, 0, 0);
395
396 Ok(())
397 }
398
399 pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
415 if self.tx()?.db().is_err() {
416 return Err(Error::TxClosed);
417 }
418 if !self.tx()?.writable() {
419 return Err(Error::TxReadonly);
420 }
421
422 let mut c = self.cursor()?;
424 let item = c.seek(key)?;
425
426 if item.is_bucket() {
428 return Err(Error::IncompatibleValue);
429 };
430
431 c.node().unwrap().del(key);
433
434 Ok(())
435 }
436
437 pub fn sequence(&self) -> u64 {
439 self.bucket.sequence
440 }
441
442 pub fn next_sequence(&mut self) -> Result<u64, Error> {
444 if !self.tx()?.writable() {
445 return Err(Error::TxReadonly);
446 }
447
448 if self.root_node.is_none() {
449 self.node(self.root(), WeakNode::new());
450 }
451
452 self.bucket.sequence += 1;
453
454 Ok(self.bucket.sequence)
455 }
456
457 pub fn set_sequence(&mut self, value: u64) -> Result<(), Error> {
459 if !self.tx()?.writable() {
460 return Err(Error::TxReadonly);
461 };
462
463 if self.root_node.is_none() {
464 let pgid = self.root();
465 self.node(pgid, WeakNode::new());
466 };
467
468 self.bucket.sequence = value;
469 Ok(())
470 }
471
472 #[allow(clippy::type_complexity)]
473 pub fn for_each<'a, E: Into<Error>>(
477 &self,
478 mut handler: Box<dyn FnMut(&[u8], Option<&[u8]>) -> Result<(), E> + 'a>,
479 ) -> Result<(), Error> {
480 if !self.tx()?.opened() {
481 return Err(Error::TxClosed);
482 }
483 let c = self.cursor()?;
484 let mut item = c.first()?;
485 loop {
486 if item.is_none() {
487 break;
488 };
489 handler(item.key.unwrap(), item.value).map_err(|e| e.into())?;
490 item = c.next()?;
491 }
492 Ok(())
493 }
494
495 pub fn stats(&self) -> BucketStats {
497 let mut stats = BucketStats::default();
498 let mut sub_stats = BucketStats::default();
499 let page_size = self.tx().unwrap().db().unwrap().page_size();
500 stats.bucket_n += 1;
501 if self.bucket.root == 0 {
502 stats.inline_bucket_n += 1;
503 };
504 self.for_each_page(Box::new(|p, depth| {
505 let page_count = p.count as usize;
506 if p.flags == Flags::LEAVES {
507 stats.key_n += page_count;
508 let mut used = Page::header_size();
509 if page_count != 0 {
510 used += LeafPageElement::SIZE * (page_count - 1);
512 let last_element = p.leaf_page_element(page_count - 1);
518 used += (last_element.pos + last_element.ksize + last_element.vsize) as usize;
519
520 if self.bucket.root == 0 {
521 stats.inline_bucket_in_use += used;
523 } else {
524 stats.leaf_page_n += 1;
526 stats.leaf_in_use += used;
527 stats.leaf_overflow_n += p.overflow as usize;
528
529 for i in 0..page_count {
533 let e = p.leaf_page_element(i);
534 if (e.flags & Self::FLAG) != 0 {
535 sub_stats += self.open_bucket(e.value().to_vec()).stats();
538 };
539 }
540 }
541 } else if p.flags == Flags::BRANCHES {
542 stats.branch_page_n += 1;
543 let last_element = p.branch_page_element(page_count - 1);
544
545 let mut used =
548 Page::header_size() + (BranchPageElement::SIZE * (page_count - 1));
549
550 used += (last_element.pos + last_element.ksize) as usize;
554 stats.branch_in_use += used;
555 stats.branch_overflow_n += p.overflow as usize;
556 };
557 };
558
559 if depth + 1 > stats.depth {
561 stats.depth = depth + 1;
562 }
563 }));
564
565 stats.branch_alloc = (stats.branch_page_n + stats.branch_overflow_n) * page_size;
567 stats.leaf_alloc = (stats.leaf_page_n + stats.leaf_overflow_n) * page_size;
568
569 stats.depth += sub_stats.depth;
571
572 stats += sub_stats;
574 stats
575 }
576
577 fn for_each_page<'a>(&self, mut handler: Box<dyn FnMut(&Page, usize) + 'a>) {
579 if let Some(ref page) = self.page {
581 handler(page, 0);
582 return;
583 }
584
585 self.tx()
587 .unwrap()
588 .for_each_page(self.bucket.root, 0, handler);
589 }
590
591 fn for_each_page_node<F>(&self, mut handler: F)
594 where
595 F: FnMut(Either<&Page, &Node>, isize),
596 {
597 if let Some(ref page) = self.page {
599 handler(PageNode::from(&**page as *const Page).upgrade(), 0);
600 return;
601 }
602 self.__for_each_page_node(self.bucket.root, 0, &mut handler)
603 }
604
605 fn __for_each_page_node<F>(&self, pgid: PGID, depth: isize, handler: &mut F)
606 where
607 F: FnMut(Either<&Page, &Node>, isize),
608 {
609 let item = self.page_node(pgid).unwrap();
610
611 handler(item.upgrade(), depth);
613
614 match item.upgrade() {
616 Either::Left(p) => {
617 let is_branch = matches!(p.flags, Flags::BRANCHES);
618 if is_branch {
619 for i in 0..p.count as usize {
620 let elem = p.branch_page_element(i);
621 self.__for_each_page_node(elem.pgid, depth + 1, handler);
622 }
623 }
624 }
625 Either::Right(n) => {
626 if !n.is_leaf() {
627 for inode in &*n.0.inodes.borrow() {
628 self.__for_each_page_node(inode.pgid, depth + 1, handler)
629 }
630 }
631 }
632 }
633 }
634
635 pub(crate) fn spill(&mut self) -> Result<(), Error> {
637 let mutself = unsafe { &mut *(self as *mut Self) };
638
639 for (name, child) in &mut *self.buckets.borrow_mut() {
641 let value = if child.inlineable() {
645 child.free();
646 child.write()
647 } else {
648 child.spill()?;
649
650 let mut vec = vec![0u8; IBucket::SIZE];
652 let bucket_ptr = vec.as_mut_ptr() as *mut IBucket;
653 unsafe { std::ptr::copy_nonoverlapping(&child.bucket, bucket_ptr, 1) };
654 vec
655 };
656
657 if child.root_node.is_none() {
659 continue;
660 };
661
662 let mut c = mutself.cursor()?;
664 let item = c.seek(name)?;
665 if item.key != Some(name.as_slice()) {
666 return Err(format!(
667 "misplaced bucket header: {:?} -> {:?}",
668 name,
669 item.key.as_ref().unwrap()
670 )
671 .into());
672 }
673 if !item.is_bucket() {
674 return Err(format!("unexpected bucket header flag: {}", item.flags).into());
675 }
676 c.node()?.put(name, name, value, 0, Self::FLAG);
677 }
678
679 if self.root_node.is_none() {
681 return Ok(());
682 }
683
684 {
685 let mut root_node = self.root_node.clone().ok_or("root node empty")?.root();
687 root_node.spill()?;
688 self.root_node = Some(root_node);
689
690 let pgid = self.root_node.as_ref().unwrap().pgid();
691 let txpgid = self.tx()?.pgid();
692
693 if pgid >= txpgid {
695 panic!("pgid ({}) above high water mark ({})", pgid, txpgid);
696 }
697
698 self.bucket.root = pgid;
699 }
700
701 Ok(())
702 }
703
704 fn inlineable(&self) -> bool {
707 if self.root_node().is_none() || !self.root_node().unwrap().is_leaf() {
708 return false;
709 }
710
711 let mut size = Page::header_size();
712 let node = self.root_node.clone().unwrap();
713
714 for inode in &*node.0.inodes.borrow() {
715 if inode.flags & Self::FLAG != 0 {
716 return false;
717 }
718
719 size += LeafPageElement::SIZE + inode.key.len() + inode.value.len();
720 if size > self.max_inline_bucket_size() {
721 return false;
722 }
723 }
724
725 true
726 }
727
728 fn max_inline_bucket_size(&self) -> usize {
730 self.tx().unwrap().db().unwrap().page_size() / 4
731 }
732
733 fn write(&mut self) -> Vec<u8> {
735 let n = self.root_node.as_ref().unwrap();
738 let node_size = n.size();
739 let mut value = vec![0u8; IBucket::SIZE + node_size];
740
741 let bucket_ptr = value.as_mut_ptr() as *mut IBucket;
743 unsafe { std::ptr::copy_nonoverlapping(&self.bucket, bucket_ptr, 1) };
744
745 {
747 let page_buf = &mut value[IBucket::SIZE..];
748 let page = Page::from_buf_mut(page_buf);
749 n.write(page);
750 }
751
752 value
753 }
754
755 pub(crate) fn rebalance(&mut self) {
757 for node in self.nodes.borrow_mut().values_mut() {
758 node.rebalance()
759 }
760 for child in self.buckets.borrow_mut().values_mut() {
761 child.rebalance()
762 }
763 }
764
765 pub(crate) fn node(&mut self, pgid: PGID, parent: WeakNode) -> Node {
767 if !self.tx().unwrap().writable() {
768 panic!("tx is read-only");
769 }
770
771 if let Some(n) = self.nodes.borrow().get(&pgid) {
772 return n.clone();
773 };
774
775 let mut node = NodeBuilder::new(self).parent(parent.clone()).build();
776 match parent.upgrade() {
777 None => {
778 self.root_node.replace(node.clone());
779 }
780 Some(ref mut p) => {
781 p.0.children.borrow_mut().push(node.clone());
782 }
783 }
784
785 if let Some(ref page) = self.page {
786 node.read(page);
787 } else {
788 unsafe {
789 node.read(&*self.tx().unwrap().page(pgid).unwrap());
790 }
791 }
792
793 self.nodes.borrow_mut().insert(pgid, node.clone());
794
795 self.tx().unwrap().0.stats.lock().node_count += 1;
797
798 node
799 }
800
801 fn free(&mut self) {
803 if self.bucket.root == 0 {
804 return;
805 };
806
807 let tx = self.tx().unwrap();
808 let db = tx.db().unwrap();
809 self.for_each_page_node(|p, _| match p {
810 Either::Left(p) => {
811 let txid = tx.id();
812 db.0.freelist.write().free(txid, p).unwrap()
813 }
814 Either::Right(n) => n.clone().free(),
815 });
816 self.bucket.root = 0;
817 }
818
819 pub(crate) fn page_node(&self, id: PGID) -> Result<PageNode, Error> {
822 if self.bucket.root == 0 {
825 if id != 0 {
826 return Err(format!("inline bucket non-zero page access: {id} != 0").into());
827 }
828 if let Some(ref node) = self.root_node {
829 return Ok(PageNode::from(node.clone()));
830 }
831 return Ok(PageNode::from(
832 &**self.page.as_ref().ok_or("page empty")? as *const Page
833 ));
834 }
835
836 if let Some(node) = self.nodes.borrow().get(&id) {
838 return Ok(PageNode::from(node.clone()));
839 };
840
841 Ok(PageNode::from(self.tx()?.page(id)?))
843 }
844}