1use either::Either;
2use std::cell::RefCell;
3use std::collections::{hash_map::Entry, HashMap};
4use std::fmt;
5use std::{isize, usize};
6
7use crate::consts::{Flags, PGID};
8use crate::db::DB;
9use crate::errors::Error;
10use crate::node::{Node, NodeBuilder, WeakNode};
11use crate::page::{BranchPageElement, LeafPageElement, OwnedPage, Page};
12use crate::tx::{Tx, WeakTx};
13
14use super::consts::*;
15use super::BucketStats;
16use super::Cursor;
17use super::IBucket;
18use super::PageNode;
19
20pub struct Bucket {
22 pub(crate) bucket: IBucket,
24
25 pub(crate) tx: WeakTx,
27
28 buckets: RefCell<HashMap<Vec<u8>, Bucket>>,
30
31 page: Option<OwnedPage>,
33
34 root_node: Option<Node>,
36
37 pub(crate) nodes: RefCell<HashMap<PGID, Node>>,
39
40 pub(crate) fill_percent: f64,
46}
47
48impl fmt::Debug for Bucket {
49 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50 let tx = self.tx().ok().map(|tx| &tx as *const Tx);
51
52 f.debug_struct("Bucket")
53 .field("bucket", &self.bucket)
54 .field("tx", &tx)
55 .field("buckets", &self.buckets)
56 .field("page", &self.page.as_ref())
57 .field("root_node", &self.root_node)
58 .field("nodes", &*self.nodes.borrow())
59 .field("fill_percent", &self.fill_percent)
60 .finish()
61 }
62}
63
64impl Bucket {
65 pub(crate) const MIN_FILL_PERCENT: f64 = 0.1;
66 pub(crate) const MAX_FILL_PERCENT: f64 = 1.0;
67
68 pub(crate) const DEFAULT_FILL_PERCENT: f64 = 0.5;
69
70 pub(crate) const FLAG: u32 = 0x01;
71
72 pub(crate) fn new(tx: WeakTx) -> Self {
73 Self {
74 bucket: IBucket::new(),
75 tx,
76 buckets: RefCell::new(HashMap::new()),
77 page: None,
78 root_node: None,
79 nodes: RefCell::new(HashMap::new()),
80 fill_percent: Self::DEFAULT_FILL_PERCENT,
81 }
82 }
83
84 pub fn tx(&self) -> Result<Tx, Error> {
86 self.tx.upgrade().ok_or(Error::TxGone)
87 }
88
89 pub fn db(&self) -> Result<DB, Error> {
91 self.tx()?.db()
92 }
93
94 pub fn root(&self) -> PGID {
96 self.bucket.root
97 }
98
99 fn root_node(&self) -> Option<Node> {
100 self.root_node.clone()
101 }
102
103 pub fn cursor(&self) -> Result<Cursor<&Bucket>, Error> {
105 self.tx()?.0.stats.lock().cursor_count += 1;
106
107 Ok(Cursor::new(self))
108 }
109
110 fn __bucket(&self, name: &[u8]) -> Option<*mut Bucket> {
111 if let Some(b) = self.buckets.borrow_mut().get_mut(name) {
112 return Some(b);
113 };
114 let (key, value) = {
115 let c = self.cursor().unwrap();
116 let (key, value, flags) = c.seek_to_item(name).unwrap().unwrap();
117
118 if key != Some(name) || (flags & Self::FLAG) == 0 {
120 return None;
121 };
122
123 (key.map(|k| k.to_vec()), value.map(|v| v.to_vec()))
124 };
125
126 let child = self.open_bucket(value.unwrap());
128
129 let mut buckets = self.buckets.borrow_mut();
130 let bucket = match buckets.entry(key.unwrap()) {
131 Entry::Vacant(e) => e.insert(child),
132 Entry::Occupied(e) => {
133 let c = e.into_mut();
134 *c = child;
135 c
136 }
137 };
138 Some(bucket)
139 }
140
141 pub fn bucket(&self, key: &[u8]) -> Option<&Bucket> {
144 self.__bucket(key).map(|b| unsafe { &*b })
145 }
146
147 pub fn bucket_mut(&mut self, key: &[u8]) -> Option<&mut Bucket> {
150 if !self.tx().unwrap().writable() {
151 return None;
152 };
153 self.__bucket(key).map(|b| unsafe { &mut *b })
154 }
155
156 pub(crate) fn open_bucket(&self, value: Vec<u8>) -> Bucket {
161 let mut child = Bucket::new(self.tx.clone());
162 {
177 let b = unsafe { (*(value.as_ptr() as *const IBucket)).clone() };
180 child.bucket = b;
181 }
182
183 if child.bucket.root == 0 {
185 let data = unsafe {
186 let slice = &value[IBucket::SIZE..];
187 let mut vec = vec![0u8; slice.len()];
188 std::ptr::copy_nonoverlapping(slice.as_ptr(), vec.as_mut_ptr(), slice.len());
189 vec
190 };
191
192 let p = OwnedPage::from_vec(data);
193 child.page = Some(p);
194 }
195
196 child
197 }
198
199 pub(crate) fn clear(&mut self) {
200 self.buckets.borrow_mut().clear();
201 self.nodes.borrow_mut().clear();
202 self.page = None;
203 self.root_node = None;
204 }
205
206 pub fn create_bucket(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
208 {
209 let tx = self.tx()?;
210 if !tx.opened() {
211 return Err(Error::TxClosed);
212 }
213 if !tx.writable() {
214 return Err(Error::DatabaseReadonly);
215 }
216 if key.is_empty() {
217 return Err(Error::NameRequired);
218 }
219 }
220
221 let tx_clone = self.tx.clone();
222
223 {
224 let mut cursor = self.cursor()?;
225 let (ckey, flags) = {
226 let (key, _, flags) = cursor.seek_to_item(key)?.unwrap();
227 (key, flags)
228 };
229
230 if ckey == Some(key) {
231 if (flags & Self::FLAG) != 0 {
232 return Err(Error::BucketExists);
233 };
234 return Err(Error::IncompatibleValue);
235 };
236
237 let mut bucket = Bucket::new(tx_clone);
238 bucket.root_node = Some(NodeBuilder::new(&bucket).is_leaf(true).build());
239 bucket.fill_percent = Self::DEFAULT_FILL_PERCENT;
240
241 let value = bucket.write();
242 cursor.node().unwrap().put(key, key, value, 0, Self::FLAG);
243 self.page = None;
244 }
245
246 self.bucket_mut(key)
247 .ok_or_else(|| Error::Unexpected("cannot find bucket".to_string()))
248 }
249
250 pub fn create_bucket_if_not_exists(&mut self, key: &[u8]) -> Result<&mut Bucket, Error> {
252 match unsafe { &mut *(self as *mut Self) }.create_bucket(key) {
253 Ok(b) => Ok(b),
254 Err(Error::BucketExists) => self
255 .bucket_mut(key)
256 .ok_or_else(|| Error::Unexpected("can't find bucket".to_string())),
257 v => v,
258 }
259 }
260
261 pub fn delete_bucket(&mut self, key: &[u8]) -> Result<(), Error> {
265 {
266 let tx = self.tx()?;
267 if !tx.opened() {
268 return Err(Error::DatabaseClosed);
269 }
270 if !tx.writable() {
271 return Err(Error::DatabaseReadonly);
272 }
273 if key.is_empty() {
274 return Err(Error::NameRequired);
275 }
276 };
277
278 let mut c = self.cursor()?;
279 {
280 let item = c.seek(key)?;
281 if item.key.unwrap() != key {
282 return Err(Error::BucketNotFound);
283 }
284 if !item.is_bucket() {
285 return Err(Error::IncompatibleValue);
286 }
287 }
288 let mut node = c.node()?;
289 {
290 let child = self.bucket_mut(key).ok_or("Can't get bucket")?;
291 let child_buckets = child.buckets();
292
293 for bucket in &child_buckets {
294 child.delete_bucket(bucket)?;
295 }
296
297 child.nodes.borrow_mut().clear();
299 child.root_node = None;
300 child.free();
301 }
302
303 self.buckets.borrow_mut().remove(key);
304 node.del(key);
305
306 Ok(())
307 }
308
309 pub fn buckets(&self) -> Vec<Vec<u8>> {
311 let mut names = vec![];
312 self.for_each(Box::new(|k, v| -> Result<(), Error> {
313 if v.is_none() {
314 names.push(k.to_vec());
315 }
316 Ok(())
317 }))
318 .unwrap();
319 names
320 }
321
322 pub fn get(&self, key: &[u8]) -> Option<&[u8]> {
337 let (ckey, value, flags) = self.cursor().unwrap().seek(key).unwrap().unwrap();
338 if (flags & Self::FLAG) != 0 {
342 return None;
343 }
344
345 if ckey != Some(key) {
347 return None;
348 }
349
350 value
351 }
352
353 pub fn put(&mut self, key: &[u8], value: Vec<u8>) -> Result<(), Error> {
369 if !self.tx()?.opened() {
370 return Err(Error::TxClosed);
371 }
372 if !self.tx()?.writable() {
373 return Err(Error::TxReadonly);
374 }
375 if key.is_empty() {
376 return Err(Error::KeyRequired);
377 }
378 if key.len() > MAX_KEY_SIZE {
379 return Err(Error::KeyTooLarge);
380 }
381 if value.len() > MAX_VALUE_SIZE {
382 return Err(Error::ValueTooLarge);
383 }
384
385 let mut c = self.cursor()?;
387 let item = c.seek(key)?;
388
389 if (Some(key) == item.key) && item.is_bucket() {
391 return Err(Error::IncompatibleValue);
392 }
393
394 c.node().unwrap().put(key, key, value, 0, 0);
396
397 Ok(())
398 }
399
400 pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
416 if self.tx()?.db().is_err() {
417 return Err(Error::TxClosed);
418 }
419 if !self.tx()?.writable() {
420 return Err(Error::TxReadonly);
421 }
422
423 let mut c = self.cursor()?;
425 let item = c.seek(key)?;
426
427 if item.is_bucket() {
429 return Err(Error::IncompatibleValue);
430 };
431
432 c.node().unwrap().del(key);
434
435 Ok(())
436 }
437
438 pub fn sequence(&self) -> u64 {
440 self.bucket.sequence
441 }
442
443 pub fn next_sequence(&mut self) -> Result<u64, Error> {
445 if !self.tx()?.writable() {
446 return Err(Error::TxReadonly);
447 }
448
449 if self.root_node.is_none() {
450 self.node(self.root(), WeakNode::new());
451 }
452
453 self.bucket.sequence += 1;
454
455 Ok(self.bucket.sequence)
456 }
457
458 pub fn set_sequence(&mut self, value: u64) -> Result<(), Error> {
460 if !self.tx()?.writable() {
461 return Err(Error::TxReadonly);
462 };
463
464 if self.root_node.is_none() {
465 let pgid = self.root();
466 self.node(pgid, WeakNode::new());
467 };
468
469 self.bucket.sequence = value;
470 Ok(())
471 }
472
473 #[allow(clippy::type_complexity)]
474 pub fn for_each<'a, E: Into<Error>>(
478 &self,
479 mut handler: Box<dyn FnMut(&[u8], Option<&[u8]>) -> Result<(), E> + 'a>,
480 ) -> Result<(), Error> {
481 if !self.tx()?.opened() {
482 return Err(Error::TxClosed);
483 }
484 let c = self.cursor()?;
485 let mut item = c.first()?;
486 loop {
487 if item.is_none() {
488 break;
489 };
490 handler(item.key.unwrap(), item.value).map_err(|e| e.into())?;
491 item = c.next()?;
492 }
493 Ok(())
494 }
495
496 pub fn stats(&self) -> BucketStats {
498 let mut stats = BucketStats::default();
499 let mut sub_stats = BucketStats::default();
500 let page_size = self.tx().unwrap().db().unwrap().page_size();
501 stats.bucket_n += 1;
502 if self.bucket.root == 0 {
503 stats.inline_bucket_n += 1;
504 };
505 self.for_each_page(Box::new(|p, depth| {
506 let page_count = p.count as usize;
507 if p.flags == Flags::LEAVES {
508 stats.key_n += page_count;
509 let mut used = Page::header_size();
510 if page_count != 0 {
511 used += LeafPageElement::SIZE * (page_count - 1);
513 let last_element = p.leaf_page_element(page_count - 1);
519 used += (last_element.pos + last_element.ksize + last_element.vsize) as usize;
520
521 if self.bucket.root == 0 {
522 stats.inline_bucket_in_use += used;
524 } else {
525 stats.leaf_page_n += 1;
527 stats.leaf_in_use += used;
528 stats.leaf_overflow_n += p.overflow as usize;
529
530 for i in 0..page_count {
534 let e = p.leaf_page_element(i);
535 if (e.flags & Self::FLAG) != 0 {
536 sub_stats += self.open_bucket(e.value().to_vec()).stats();
539 };
540 }
541 }
542 } else if p.flags == Flags::BRANCHES {
543 stats.branch_page_n += 1;
544 let last_element = p.branch_page_element(page_count - 1);
545
546 let mut used =
549 Page::header_size() + (BranchPageElement::SIZE * (page_count - 1));
550
551 used += (last_element.pos + last_element.ksize) as usize;
555 stats.branch_in_use += used;
556 stats.branch_overflow_n += p.overflow as usize;
557 };
558 };
559
560 if depth + 1 > stats.depth {
562 stats.depth = depth + 1;
563 }
564 }));
565
566 stats.branch_alloc = (stats.branch_page_n + stats.branch_overflow_n) * page_size;
568 stats.leaf_alloc = (stats.leaf_page_n + stats.leaf_overflow_n) * page_size;
569
570 stats.depth += sub_stats.depth;
572
573 stats += sub_stats;
575 stats
576 }
577
578 fn for_each_page<'a>(&self, mut handler: Box<dyn FnMut(&Page, usize) + 'a>) {
580 if let Some(ref page) = self.page {
582 handler(page, 0);
583 return;
584 }
585
586 self.tx()
588 .unwrap()
589 .for_each_page(self.bucket.root, 0, handler);
590 }
591
592 fn for_each_page_node<F>(&self, mut handler: F)
595 where
596 F: FnMut(Either<&Page, &Node>, isize),
597 {
598 if let Some(ref page) = self.page {
600 handler(PageNode::from(&**page as *const Page).upgrade(), 0);
601 return;
602 }
603 self.__for_each_page_node(self.bucket.root, 0, &mut handler)
604 }
605
606 fn __for_each_page_node<F>(&self, pgid: PGID, depth: isize, handler: &mut F)
607 where
608 F: FnMut(Either<&Page, &Node>, isize),
609 {
610 let item = self.page_node(pgid).unwrap();
611
612 handler(item.upgrade(), depth);
614
615 match item.upgrade() {
617 Either::Left(p) => {
618 let is_branch = matches!(p.flags, Flags::BRANCHES);
619 if is_branch {
620 for i in 0..p.count as usize {
621 let elem = p.branch_page_element(i);
622 self.__for_each_page_node(elem.pgid, depth + 1, handler);
623 }
624 }
625 }
626 Either::Right(n) => {
627 if !n.is_leaf() {
628 for inode in &*n.0.inodes.borrow() {
629 self.__for_each_page_node(inode.pgid, depth + 1, handler)
630 }
631 }
632 }
633 }
634 }
635
636 pub(crate) fn spill(&mut self) -> Result<(), Error> {
638 let mutself = unsafe { &mut *(self as *mut Self) };
639
640 for (name, child) in &mut *self.buckets.borrow_mut() {
642 let value = if child.inlineable() {
646 child.free();
647 child.write()
648 } else {
649 child.spill()?;
650
651 let mut vec = vec![0u8; IBucket::SIZE];
653 let bucket_ptr = vec.as_mut_ptr() as *mut IBucket;
654 unsafe { std::ptr::copy_nonoverlapping(&child.bucket, bucket_ptr, 1) };
655 vec
656 };
657
658 if child.root_node.is_none() {
660 continue;
661 };
662
663 let mut c = mutself.cursor()?;
665 let item = c.seek(name)?;
666 if item.key != Some(name.as_slice()) {
667 return Err(format!(
668 "misplaced bucket header: {:?} -> {:?}",
669 name,
670 item.key.as_ref().unwrap()
671 )
672 .into());
673 }
674 if !item.is_bucket() {
675 return Err(format!("unexpected bucket header flag: {}", item.flags).into());
676 }
677 c.node()?.put(name, name, value, 0, Self::FLAG);
678 }
679
680 if self.root_node.is_none() {
682 return Ok(());
683 }
684
685 {
686 let mut root_node = self.root_node.clone().ok_or("root node empty")?.root();
688 root_node.spill()?;
689 self.root_node = Some(root_node);
690
691 let pgid = self.root_node.as_ref().unwrap().pgid();
692 let txpgid = self.tx()?.pgid();
693
694 if pgid >= txpgid {
696 panic!("pgid ({}) above high water mark ({})", pgid, txpgid);
697 }
698
699 self.bucket.root = pgid;
700 }
701
702 Ok(())
703 }
704
705 fn inlineable(&self) -> bool {
708 if self.root_node().is_none() || !self.root_node().unwrap().is_leaf() {
709 return false;
710 }
711
712 let mut size = Page::header_size();
713 let node = self.root_node.clone().unwrap();
714
715 for inode in &*node.0.inodes.borrow() {
716 if inode.flags & Self::FLAG != 0 {
717 return false;
718 }
719
720 size += LeafPageElement::SIZE + inode.key.len() + inode.value.len();
721 if size > self.max_inline_bucket_size() {
722 return false;
723 }
724 }
725
726 true
727 }
728
729 fn max_inline_bucket_size(&self) -> usize {
731 self.tx().unwrap().db().unwrap().page_size() / 4
732 }
733
734 fn write(&mut self) -> Vec<u8> {
736 let n = self.root_node.as_ref().unwrap();
739 let node_size = n.size();
740 let mut value = vec![0u8; IBucket::SIZE + node_size];
741
742 let bucket_ptr = value.as_mut_ptr() as *mut IBucket;
744 unsafe { std::ptr::copy_nonoverlapping(&self.bucket, bucket_ptr, 1) };
745
746 {
748 let page_buf = &mut value[IBucket::SIZE..];
749 let page = Page::from_buf_mut(page_buf);
750 n.write(page);
751 }
752
753 value
754 }
755
756 pub(crate) fn rebalance(&mut self) {
758 for node in self.nodes.borrow_mut().values_mut() {
759 node.rebalance()
760 }
761 for child in self.buckets.borrow_mut().values_mut() {
762 child.rebalance()
763 }
764 }
765
766 pub(crate) fn node(&mut self, pgid: PGID, parent: WeakNode) -> Node {
768 if !self.tx().unwrap().writable() {
769 panic!("tx is read-only");
770 }
771
772 if let Some(n) = self.nodes.borrow().get(&pgid) {
773 return n.clone();
774 };
775
776 let mut node = NodeBuilder::new(self).parent(parent.clone()).build();
777 match parent.upgrade() {
778 None => {
779 self.root_node.replace(node.clone());
780 }
781 Some(ref mut p) => {
782 p.0.children.borrow_mut().push(node.clone());
783 }
784 }
785
786 if let Some(ref page) = self.page {
787 node.read(page);
788 } else {
789 unsafe {
790 node.read(&*self.tx().unwrap().page(pgid).unwrap());
791 }
792 }
793
794 self.nodes.borrow_mut().insert(pgid, node.clone());
795
796 self.tx().unwrap().0.stats.lock().node_count += 1;
798
799 node
800 }
801
802 fn free(&mut self) {
804 if self.bucket.root == 0 {
805 return;
806 };
807
808 let tx = self.tx().unwrap();
809 let db = tx.db().unwrap();
810 self.for_each_page_node(|p, _| match p {
811 Either::Left(p) => {
812 let txid = tx.id();
813 db.0.freelist.write().free(txid, p).unwrap()
814 }
815 Either::Right(n) => n.clone().free(),
816 });
817 self.bucket.root = 0;
818 }
819
820 pub(crate) fn page_node(&self, id: PGID) -> Result<PageNode, Error> {
823 if self.bucket.root == 0 {
826 if id != 0 {
827 return Err(format!("inline bucket non-zero page access: {} != 0", id).into());
828 }
829 if let Some(ref node) = self.root_node {
830 return Ok(PageNode::from(node.clone()));
831 }
832 return Ok(PageNode::from(
833 &**self.page.as_ref().ok_or("page empty")? as *const Page
834 ));
835 }
836
837 if let Some(node) = self.nodes.borrow().get(&id) {
839 return Ok(PageNode::from(node.clone()));
840 };
841
842 Ok(PageNode::from(self.tx()?.page(id)?))
844 }
845}