Skip to main content

hematite/btree/
bytes.rs

1//! Generic byte-tree facade.
2//!
3//! This file is the main reusable interface of the B-tree layer. It lets callers treat a tree as
4//! an ordered map from `&[u8]` to `&[u8]` while the implementation handles page layout, splitting,
5//! merging, cursor navigation, and large-value overflow.
6//!
7//! Layer split:
8//!
9//! ```text
10//! caller
11//!   provides: ordered key bytes, opaque value bytes
12//!   sees:     insert / delete / get / cursor / range helpers / stats
13//!
14//! byte tree
15//!   owns:     root tracking, node mutation, structural validation, overflow-backed values
16//!
17//! pager
18//!   owns:     page IO, free-page reuse, checksums, journaling, WAL, locking
19//! ```
20//!
21//! Large values are represented with a B-tree-owned wrapper:
22//!
23//! ```text
24//! logical value
25//!      |
26//!      v
27//! StoredValueLayout
28//!   local payload bytes
29//!   total length
30//!   first overflow page
31//! ```
32//!
33//! That extra indirection is what keeps overflow handling generic instead of pushing it into the
34//! catalog or table code.
35
36use std::collections::HashSet;
37use std::path::Path;
38use std::sync::{Arc, Mutex, MutexGuard};
39
40use crate::btree::codec::RawBytesCodec;
41use crate::btree::cursor::BTreeCursor;
42use crate::btree::index::{BTreeIndex, TreeMutation};
43use crate::btree::node::BTreeNode;
44use crate::btree::tree::{
45    collect_tree_page_ids, collect_tree_space_stats, reset_tree_pages, BTreeManager, TreeSpaceStats,
46};
47use crate::btree::value_store::{
48    free_stored_value_overflow, hydrate_stored_value, materialize_stored_value, StoredValueLayout,
49};
50use crate::btree::NodeType;
51use crate::error::{HematiteError, Result};
52use crate::storage::overflow::{collect_overflow_page_ids, validate_overflow_chain};
53use crate::storage::{
54    JournalMode, Page, PageId, Pager, PagerIntegrityReport, DB_HEADER_PAGE_ID, INVALID_PAGE_ID,
55    PAGE_SIZE, STORAGE_METADATA_PAGE_ID,
56};
57
58#[derive(Debug, Clone)]
59pub struct ByteTreeStore {
60    storage: Arc<Mutex<Pager>>,
61}
62
63#[derive(Debug, Clone)]
64pub(crate) struct ByteTreeStoreSnapshot {
65    pager: crate::storage::pager::PagerSnapshot,
66}
67
68impl ByteTreeStore {
69    pub const PAGE_SIZE: usize = PAGE_SIZE;
70    pub const INVALID_PAGE_ID: PageId = INVALID_PAGE_ID;
71    pub const DB_HEADER_PAGE_ID: PageId = DB_HEADER_PAGE_ID;
72    pub const RESERVED_METADATA_PAGE_ID: PageId = STORAGE_METADATA_PAGE_ID;
73
74    fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
75        self.storage.lock().map_err(|_| {
76            HematiteError::InternalError("ByteTreeStore storage mutex is poisoned".to_string())
77        })
78    }
79
80    pub fn open_path<P: AsRef<Path>>(path: P, cache_capacity: usize) -> Result<Self> {
81        Ok(Self::new(Pager::new(path, cache_capacity)?))
82    }
83
84    pub fn new_in_memory(cache_capacity: usize) -> Result<Self> {
85        Ok(Self::new(Pager::new_in_memory(cache_capacity)?))
86    }
87
88    pub fn new(storage: Pager) -> Self {
89        Self {
90            storage: Arc::new(Mutex::new(storage)),
91        }
92    }
93
94    pub fn from_shared_storage(storage: Arc<Mutex<Pager>>) -> Self {
95        Self { storage }
96    }
97
98    pub fn shared_storage(&self) -> Arc<Mutex<Pager>> {
99        self.storage.clone()
100    }
101
102    pub fn read_reserved_blob(&self, page_id: PageId) -> Result<Option<Vec<u8>>> {
103        let mut pager = self.lock_storage()?;
104        match pager.read_page(page_id) {
105            Ok(page) => Ok(Some(page.data)),
106            Err(_) => Ok(None),
107        }
108    }
109
110    pub fn write_reserved_blob(&self, page_id: PageId, bytes: &[u8]) -> Result<()> {
111        if bytes.len() > PAGE_SIZE {
112            return Err(HematiteError::StorageError(format!(
113                "Reserved page payload exceeds page size: {} > {}",
114                bytes.len(),
115                PAGE_SIZE
116            )));
117        }
118        let mut page = Page::new(page_id);
119        page.data[..bytes.len()].copy_from_slice(bytes);
120        self.lock_storage()?.write_page(page)
121    }
122
123    pub fn flush(&self) -> Result<()> {
124        self.lock_storage()?.flush()
125    }
126
127    pub fn begin_transaction(&self) -> Result<()> {
128        self.lock_storage()?.begin_transaction()
129    }
130
131    pub fn commit_transaction(&self) -> Result<()> {
132        self.lock_storage()?.commit_transaction()
133    }
134
135    pub fn rollback_transaction(&self) -> Result<()> {
136        self.lock_storage()?.rollback_transaction()
137    }
138
139    pub fn transaction_active(&self) -> Result<bool> {
140        Ok(self.lock_storage()?.transaction_active())
141    }
142
143    pub(crate) fn snapshot(&self) -> Result<ByteTreeStoreSnapshot> {
144        Ok(ByteTreeStoreSnapshot {
145            pager: self.lock_storage()?.snapshot()?,
146        })
147    }
148
149    pub(crate) fn restore_snapshot(&self, snapshot: ByteTreeStoreSnapshot) -> Result<()> {
150        self.lock_storage()?.restore_snapshot(snapshot.pager)
151    }
152
153    pub fn begin_read(&self) -> Result<()> {
154        self.lock_storage()?.begin_read()
155    }
156
157    pub fn end_read(&self) -> Result<()> {
158        self.lock_storage()?.end_read()
159    }
160
161    pub fn journal_mode(&self) -> Result<JournalMode> {
162        Ok(self.lock_storage()?.journal_mode())
163    }
164
165    pub fn set_journal_mode(&self, journal_mode: JournalMode) -> Result<()> {
166        self.lock_storage()?.set_journal_mode(journal_mode)
167    }
168
169    pub fn checkpoint_wal(&self) -> Result<()> {
170        self.lock_storage()?.checkpoint_wal()
171    }
172
173    pub fn file_len(&self) -> Result<u64> {
174        self.lock_storage()?.file_len()
175    }
176
177    pub fn allocated_page_count(&self) -> Result<usize> {
178        Ok(self.lock_storage()?.allocated_page_count())
179    }
180
181    pub fn free_page_ids(&self) -> Result<Vec<PageId>> {
182        Ok(self.lock_storage()?.free_pages().to_vec())
183    }
184
185    pub fn fragmented_free_page_count(&self) -> Result<usize> {
186        Ok(self.lock_storage()?.fragmented_free_page_count())
187    }
188
189    pub fn trailing_free_page_count(&self) -> Result<usize> {
190        Ok(self.lock_storage()?.trailing_free_page_count())
191    }
192
193    pub fn validate_storage(&self) -> Result<PagerIntegrityReport> {
194        self.lock_storage()?.validate_integrity()
195    }
196
197    pub fn create_tree(&self) -> Result<PageId> {
198        let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
199        manager.create_tree()
200    }
201
202    pub fn open_tree(&self, root_page_id: PageId) -> Result<ByteTree> {
203        let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
204        let index = manager.open_tree(root_page_id)?;
205        Ok(ByteTree {
206            storage: self.storage.clone(),
207            index,
208        })
209    }
210
211    pub fn delete_tree(&self, root_page_id: PageId) -> Result<()> {
212        {
213            let mut pager = self.lock_storage()?;
214            free_tree_overflow(&mut pager, root_page_id)?;
215        }
216        let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
217        manager.delete_tree(root_page_id)
218    }
219
220    pub fn validate_tree(&self, root_page_id: PageId) -> Result<bool> {
221        let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
222        if !manager.validate_tree(root_page_id)? {
223            return Ok(false);
224        }
225        Ok(self.validate_tree_overflow(root_page_id).is_ok())
226    }
227
228    pub fn validate_tree_overflow(&self, root_page_id: PageId) -> Result<()> {
229        let mut pager = self.lock_storage()?;
230        let mut tree_page_ids = Vec::new();
231        collect_tree_page_ids(&mut pager, root_page_id, &mut tree_page_ids)?;
232        let tree_pages = tree_page_ids.into_iter().collect::<HashSet<_>>();
233        let free_pages = pager.free_pages().iter().copied().collect::<HashSet<_>>();
234        let mut owned_overflow_pages = HashSet::new();
235        validate_tree_overflow_pages(
236            &mut pager,
237            root_page_id,
238            &tree_pages,
239            &free_pages,
240            &mut owned_overflow_pages,
241        )
242    }
243
244    pub fn reset_tree(&self, root_page_id: PageId) -> Result<()> {
245        let mut pager = self.lock_storage()?;
246        free_tree_overflow(&mut pager, root_page_id)?;
247        reset_tree_pages(&mut pager, root_page_id)
248    }
249
250    pub fn collect_page_ids(&self, root_page_id: PageId) -> Result<Vec<PageId>> {
251        let mut pager = self.lock_storage()?;
252        let mut page_ids = Vec::new();
253        collect_tree_page_ids(&mut pager, root_page_id, &mut page_ids)?;
254        Ok(page_ids)
255    }
256
257    pub fn collect_space_stats(&self, root_page_id: PageId) -> Result<TreeSpaceStats> {
258        let mut pager = self.lock_storage()?;
259        collect_tree_space_stats(&mut pager, root_page_id)
260    }
261}
262
263pub struct ByteTree {
264    storage: Arc<Mutex<Pager>>,
265    index: BTreeIndex,
266}
267
268impl ByteTree {
269    fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
270        self.storage.lock().map_err(|_| {
271            HematiteError::InternalError("ByteTree storage mutex is poisoned".to_string())
272        })
273    }
274
275    pub fn root_page_id(&self) -> PageId {
276        self.index.root_page_id()
277    }
278
279    pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
280        match self.index.search_typed::<RawBytesCodec>(&key.to_vec())? {
281            Some(stored_value) => {
282                let mut storage = self.lock_storage()?;
283                Ok(Some(hydrate_stored_value(&mut storage, &stored_value)?))
284            }
285            None => Ok(None),
286        }
287    }
288
289    pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
290        self.insert_with_mutation(key, value).map(|_| ())
291    }
292
293    pub fn insert_with_mutation(&mut self, key: &[u8], value: &[u8]) -> Result<TreeMutation> {
294        let existing_stored_value = self.index.search_typed::<RawBytesCodec>(&key.to_vec())?;
295        let stored_value = {
296            let mut storage = self.lock_storage()?;
297            materialize_stored_value(&mut storage, value)?
298        };
299        let mutation = self
300            .index
301            .insert_typed_with_mutation::<RawBytesCodec>(&key.to_vec(), &stored_value)?;
302
303        if let Some(existing_stored_value) = existing_stored_value {
304            let mut storage = self.lock_storage()?;
305            free_stored_value_overflow(&mut storage, &existing_stored_value)?;
306        }
307
308        Ok(mutation)
309    }
310
311    pub fn delete(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
312        self.delete_with_mutation(key).map(|(value, _)| value)
313    }
314
315    pub fn delete_with_mutation(&mut self, key: &[u8]) -> Result<(Option<Vec<u8>>, TreeMutation)> {
316        let (stored_value, mutation) = self
317            .index
318            .delete_typed_with_mutation::<RawBytesCodec>(&key.to_vec())?;
319        let logical_value = match stored_value {
320            Some(stored_value) => {
321                let mut storage = self.lock_storage()?;
322                let logical_value = hydrate_stored_value(&mut storage, &stored_value)?;
323                free_stored_value_overflow(&mut storage, &stored_value)?;
324                Some(logical_value)
325            }
326            None => None,
327        };
328        Ok((logical_value, mutation))
329    }
330
331    pub fn entry(&mut self, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
332        Ok(self.get(key)?.map(|value| (key.to_vec(), value)))
333    }
334
335    pub fn entries(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
336        let mut cursor = self.cursor()?;
337        cursor.collect_all()
338    }
339
340    pub fn entries_from(&self, start_key: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
341        let mut cursor = self.cursor()?;
342        cursor.seek(start_key)?;
343        cursor.collect_remaining()
344    }
345
346    pub fn entries_with_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
347        let mut cursor = self.cursor()?;
348        cursor.seek(prefix)?;
349        let mut entries = Vec::new();
350        while let Some((key, value)) = cursor.current()? {
351            if !key.starts_with(prefix) {
352                break;
353            }
354            entries.push((key, value));
355            if cursor.next().is_err() {
356                break;
357            }
358        }
359        Ok(entries)
360    }
361
362    pub fn cursor(&self) -> Result<ByteTreeCursor> {
363        Ok(ByteTreeCursor {
364            storage: self.storage.clone(),
365            inner: self.index.cursor()?,
366        })
367    }
368}
369
370fn free_tree_overflow(storage: &mut Pager, root_page_id: PageId) -> Result<()> {
371    let page = storage.read_page(root_page_id)?;
372    let node = BTreeNode::from_page(page)?;
373
374    match node.node_type {
375        NodeType::Leaf => {
376            for value in node.values {
377                free_stored_value_overflow(storage, value.as_bytes())?;
378            }
379        }
380        NodeType::Internal => {
381            for child_page_id in node.children {
382                free_tree_overflow(storage, child_page_id)?;
383            }
384        }
385    }
386
387    Ok(())
388}
389
390fn validate_tree_overflow_pages(
391    storage: &mut Pager,
392    root_page_id: PageId,
393    tree_pages: &HashSet<PageId>,
394    free_pages: &HashSet<PageId>,
395    owned_overflow_pages: &mut HashSet<PageId>,
396) -> Result<()> {
397    let page = storage.read_page(root_page_id)?;
398    let node = BTreeNode::from_page(page)?;
399
400    match node.node_type {
401        NodeType::Leaf => {
402            for value in node.values {
403                let layout = StoredValueLayout::decode(value.as_bytes())?;
404                if layout.overflow_first_page != crate::storage::INVALID_PAGE_ID {
405                    let first_page = Some(layout.overflow_first_page);
406                    validate_overflow_chain(storage, first_page, layout.overflow_len())?;
407                    for overflow_page_id in collect_overflow_page_ids(storage, first_page)? {
408                        if tree_pages.contains(&overflow_page_id) {
409                            return Err(crate::error::HematiteError::CorruptedData(format!(
410                                "Overflow page {} overlaps a B-tree page",
411                                overflow_page_id
412                            )));
413                        }
414                        if free_pages.contains(&overflow_page_id) {
415                            return Err(crate::error::HematiteError::CorruptedData(format!(
416                                "Overflow page {} is also on the freelist",
417                                overflow_page_id
418                            )));
419                        }
420                        if !owned_overflow_pages.insert(overflow_page_id) {
421                            return Err(crate::error::HematiteError::CorruptedData(format!(
422                                "Overflow page {} is shared by multiple values",
423                                overflow_page_id
424                            )));
425                        }
426                    }
427                }
428            }
429        }
430        NodeType::Internal => {
431            for child_page_id in node.children {
432                validate_tree_overflow_pages(
433                    storage,
434                    child_page_id,
435                    tree_pages,
436                    free_pages,
437                    owned_overflow_pages,
438                )?;
439            }
440        }
441    }
442
443    Ok(())
444}
445
446pub struct ByteTreeCursor {
447    storage: Arc<Mutex<Pager>>,
448    inner: BTreeCursor,
449}
450
451impl ByteTreeCursor {
452    fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
453        self.storage.lock().map_err(|_| {
454            HematiteError::InternalError("ByteTreeCursor storage mutex is poisoned".to_string())
455        })
456    }
457
458    pub fn is_valid(&self) -> bool {
459        self.inner.is_valid()
460    }
461
462    pub fn first(&mut self) -> Result<()> {
463        self.inner.first()
464    }
465
466    pub fn next(&mut self) -> Result<()> {
467        self.inner.next()
468    }
469
470    pub fn seek(&mut self, key: &[u8]) -> Result<()> {
471        self.inner.seek(&crate::btree::BTreeKey::new(key.to_vec()))
472    }
473
474    pub fn key(&self) -> Option<&[u8]> {
475        self.inner.key().map(|key| key.as_bytes())
476    }
477
478    pub fn value(&self) -> Option<&[u8]> {
479        self.inner.value().map(|value| value.as_bytes())
480    }
481
482    pub fn current(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
483        match self.inner.current() {
484            Some((key, value)) => {
485                let mut storage = self.lock_storage()?;
486                Ok(Some((
487                    key.as_bytes().to_vec(),
488                    hydrate_stored_value(&mut storage, value.as_bytes())?,
489                )))
490            }
491            None => Ok(None),
492        }
493    }
494
495    pub fn collect_all(&mut self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
496        self.first()?;
497        self.collect_remaining()
498    }
499
500    pub fn collect_remaining(&mut self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
501        let mut entries = Vec::new();
502        while let Some(entry) = self.current()? {
503            entries.push(entry);
504            if self.next().is_err() {
505                break;
506            }
507        }
508        Ok(entries)
509    }
510}