1use std::collections::HashSet;
37use std::path::Path;
38use std::sync::{Arc, Mutex, MutexGuard};
39
40use crate::btree::codec::RawBytesCodec;
41use crate::btree::cursor::BTreeCursor;
42use crate::btree::index::{BTreeIndex, TreeMutation};
43use crate::btree::node::BTreeNode;
44use crate::btree::tree::{
45 collect_tree_page_ids, collect_tree_space_stats, reset_tree_pages, BTreeManager, TreeSpaceStats,
46};
47use crate::btree::value_store::{
48 free_stored_value_overflow, hydrate_stored_value, materialize_stored_value, StoredValueLayout,
49};
50use crate::btree::NodeType;
51use crate::error::{HematiteError, Result};
52use crate::storage::overflow::{collect_overflow_page_ids, validate_overflow_chain};
53use crate::storage::{
54 JournalMode, Page, PageId, Pager, PagerIntegrityReport, DB_HEADER_PAGE_ID, INVALID_PAGE_ID,
55 PAGE_SIZE, STORAGE_METADATA_PAGE_ID,
56};
57
58#[derive(Debug, Clone)]
59pub struct ByteTreeStore {
60 storage: Arc<Mutex<Pager>>,
61}
62
63#[derive(Debug, Clone)]
64pub(crate) struct ByteTreeStoreSnapshot {
65 pager: crate::storage::pager::PagerSnapshot,
66}
67
68impl ByteTreeStore {
69 pub const PAGE_SIZE: usize = PAGE_SIZE;
70 pub const INVALID_PAGE_ID: PageId = INVALID_PAGE_ID;
71 pub const DB_HEADER_PAGE_ID: PageId = DB_HEADER_PAGE_ID;
72 pub const RESERVED_METADATA_PAGE_ID: PageId = STORAGE_METADATA_PAGE_ID;
73
74 fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
75 self.storage.lock().map_err(|_| {
76 HematiteError::InternalError("ByteTreeStore storage mutex is poisoned".to_string())
77 })
78 }
79
80 pub fn open_path<P: AsRef<Path>>(path: P, cache_capacity: usize) -> Result<Self> {
81 Ok(Self::new(Pager::new(path, cache_capacity)?))
82 }
83
84 pub fn new_in_memory(cache_capacity: usize) -> Result<Self> {
85 Ok(Self::new(Pager::new_in_memory(cache_capacity)?))
86 }
87
88 pub fn new(storage: Pager) -> Self {
89 Self {
90 storage: Arc::new(Mutex::new(storage)),
91 }
92 }
93
94 pub fn from_shared_storage(storage: Arc<Mutex<Pager>>) -> Self {
95 Self { storage }
96 }
97
98 pub fn shared_storage(&self) -> Arc<Mutex<Pager>> {
99 self.storage.clone()
100 }
101
102 pub fn read_reserved_blob(&self, page_id: PageId) -> Result<Option<Vec<u8>>> {
103 let mut pager = self.lock_storage()?;
104 match pager.read_page(page_id) {
105 Ok(page) => Ok(Some(page.data)),
106 Err(_) => Ok(None),
107 }
108 }
109
110 pub fn write_reserved_blob(&self, page_id: PageId, bytes: &[u8]) -> Result<()> {
111 if bytes.len() > PAGE_SIZE {
112 return Err(HematiteError::StorageError(format!(
113 "Reserved page payload exceeds page size: {} > {}",
114 bytes.len(),
115 PAGE_SIZE
116 )));
117 }
118 let mut page = Page::new(page_id);
119 page.data[..bytes.len()].copy_from_slice(bytes);
120 self.lock_storage()?.write_page(page)
121 }
122
123 pub fn flush(&self) -> Result<()> {
124 self.lock_storage()?.flush()
125 }
126
127 pub fn begin_transaction(&self) -> Result<()> {
128 self.lock_storage()?.begin_transaction()
129 }
130
131 pub fn commit_transaction(&self) -> Result<()> {
132 self.lock_storage()?.commit_transaction()
133 }
134
135 pub fn rollback_transaction(&self) -> Result<()> {
136 self.lock_storage()?.rollback_transaction()
137 }
138
139 pub fn transaction_active(&self) -> Result<bool> {
140 Ok(self.lock_storage()?.transaction_active())
141 }
142
143 pub(crate) fn snapshot(&self) -> Result<ByteTreeStoreSnapshot> {
144 Ok(ByteTreeStoreSnapshot {
145 pager: self.lock_storage()?.snapshot()?,
146 })
147 }
148
149 pub(crate) fn restore_snapshot(&self, snapshot: ByteTreeStoreSnapshot) -> Result<()> {
150 self.lock_storage()?.restore_snapshot(snapshot.pager)
151 }
152
153 pub fn begin_read(&self) -> Result<()> {
154 self.lock_storage()?.begin_read()
155 }
156
157 pub fn end_read(&self) -> Result<()> {
158 self.lock_storage()?.end_read()
159 }
160
161 pub fn journal_mode(&self) -> Result<JournalMode> {
162 Ok(self.lock_storage()?.journal_mode())
163 }
164
165 pub fn set_journal_mode(&self, journal_mode: JournalMode) -> Result<()> {
166 self.lock_storage()?.set_journal_mode(journal_mode)
167 }
168
169 pub fn checkpoint_wal(&self) -> Result<()> {
170 self.lock_storage()?.checkpoint_wal()
171 }
172
173 pub fn file_len(&self) -> Result<u64> {
174 self.lock_storage()?.file_len()
175 }
176
177 pub fn allocated_page_count(&self) -> Result<usize> {
178 Ok(self.lock_storage()?.allocated_page_count())
179 }
180
181 pub fn free_page_ids(&self) -> Result<Vec<PageId>> {
182 Ok(self.lock_storage()?.free_pages().to_vec())
183 }
184
185 pub fn fragmented_free_page_count(&self) -> Result<usize> {
186 Ok(self.lock_storage()?.fragmented_free_page_count())
187 }
188
189 pub fn trailing_free_page_count(&self) -> Result<usize> {
190 Ok(self.lock_storage()?.trailing_free_page_count())
191 }
192
193 pub fn validate_storage(&self) -> Result<PagerIntegrityReport> {
194 self.lock_storage()?.validate_integrity()
195 }
196
197 pub fn create_tree(&self) -> Result<PageId> {
198 let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
199 manager.create_tree()
200 }
201
202 pub fn open_tree(&self, root_page_id: PageId) -> Result<ByteTree> {
203 let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
204 let index = manager.open_tree(root_page_id)?;
205 Ok(ByteTree {
206 storage: self.storage.clone(),
207 index,
208 })
209 }
210
211 pub fn delete_tree(&self, root_page_id: PageId) -> Result<()> {
212 {
213 let mut pager = self.lock_storage()?;
214 free_tree_overflow(&mut pager, root_page_id)?;
215 }
216 let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
217 manager.delete_tree(root_page_id)
218 }
219
220 pub fn validate_tree(&self, root_page_id: PageId) -> Result<bool> {
221 let mut manager = BTreeManager::from_shared_storage(self.storage.clone());
222 if !manager.validate_tree(root_page_id)? {
223 return Ok(false);
224 }
225 Ok(self.validate_tree_overflow(root_page_id).is_ok())
226 }
227
228 pub fn validate_tree_overflow(&self, root_page_id: PageId) -> Result<()> {
229 let mut pager = self.lock_storage()?;
230 let mut tree_page_ids = Vec::new();
231 collect_tree_page_ids(&mut pager, root_page_id, &mut tree_page_ids)?;
232 let tree_pages = tree_page_ids.into_iter().collect::<HashSet<_>>();
233 let free_pages = pager.free_pages().iter().copied().collect::<HashSet<_>>();
234 let mut owned_overflow_pages = HashSet::new();
235 validate_tree_overflow_pages(
236 &mut pager,
237 root_page_id,
238 &tree_pages,
239 &free_pages,
240 &mut owned_overflow_pages,
241 )
242 }
243
244 pub fn reset_tree(&self, root_page_id: PageId) -> Result<()> {
245 let mut pager = self.lock_storage()?;
246 free_tree_overflow(&mut pager, root_page_id)?;
247 reset_tree_pages(&mut pager, root_page_id)
248 }
249
250 pub fn collect_page_ids(&self, root_page_id: PageId) -> Result<Vec<PageId>> {
251 let mut pager = self.lock_storage()?;
252 let mut page_ids = Vec::new();
253 collect_tree_page_ids(&mut pager, root_page_id, &mut page_ids)?;
254 Ok(page_ids)
255 }
256
257 pub fn collect_space_stats(&self, root_page_id: PageId) -> Result<TreeSpaceStats> {
258 let mut pager = self.lock_storage()?;
259 collect_tree_space_stats(&mut pager, root_page_id)
260 }
261}
262
263pub struct ByteTree {
264 storage: Arc<Mutex<Pager>>,
265 index: BTreeIndex,
266}
267
268impl ByteTree {
269 fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
270 self.storage.lock().map_err(|_| {
271 HematiteError::InternalError("ByteTree storage mutex is poisoned".to_string())
272 })
273 }
274
275 pub fn root_page_id(&self) -> PageId {
276 self.index.root_page_id()
277 }
278
279 pub fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
280 match self.index.search_typed::<RawBytesCodec>(&key.to_vec())? {
281 Some(stored_value) => {
282 let mut storage = self.lock_storage()?;
283 Ok(Some(hydrate_stored_value(&mut storage, &stored_value)?))
284 }
285 None => Ok(None),
286 }
287 }
288
289 pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
290 self.insert_with_mutation(key, value).map(|_| ())
291 }
292
293 pub fn insert_with_mutation(&mut self, key: &[u8], value: &[u8]) -> Result<TreeMutation> {
294 let existing_stored_value = self.index.search_typed::<RawBytesCodec>(&key.to_vec())?;
295 let stored_value = {
296 let mut storage = self.lock_storage()?;
297 materialize_stored_value(&mut storage, value)?
298 };
299 let mutation = self
300 .index
301 .insert_typed_with_mutation::<RawBytesCodec>(&key.to_vec(), &stored_value)?;
302
303 if let Some(existing_stored_value) = existing_stored_value {
304 let mut storage = self.lock_storage()?;
305 free_stored_value_overflow(&mut storage, &existing_stored_value)?;
306 }
307
308 Ok(mutation)
309 }
310
311 pub fn delete(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
312 self.delete_with_mutation(key).map(|(value, _)| value)
313 }
314
315 pub fn delete_with_mutation(&mut self, key: &[u8]) -> Result<(Option<Vec<u8>>, TreeMutation)> {
316 let (stored_value, mutation) = self
317 .index
318 .delete_typed_with_mutation::<RawBytesCodec>(&key.to_vec())?;
319 let logical_value = match stored_value {
320 Some(stored_value) => {
321 let mut storage = self.lock_storage()?;
322 let logical_value = hydrate_stored_value(&mut storage, &stored_value)?;
323 free_stored_value_overflow(&mut storage, &stored_value)?;
324 Some(logical_value)
325 }
326 None => None,
327 };
328 Ok((logical_value, mutation))
329 }
330
331 pub fn entry(&mut self, key: &[u8]) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
332 Ok(self.get(key)?.map(|value| (key.to_vec(), value)))
333 }
334
335 pub fn entries(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
336 let mut cursor = self.cursor()?;
337 cursor.collect_all()
338 }
339
340 pub fn entries_from(&self, start_key: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
341 let mut cursor = self.cursor()?;
342 cursor.seek(start_key)?;
343 cursor.collect_remaining()
344 }
345
346 pub fn entries_with_prefix(&self, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
347 let mut cursor = self.cursor()?;
348 cursor.seek(prefix)?;
349 let mut entries = Vec::new();
350 while let Some((key, value)) = cursor.current()? {
351 if !key.starts_with(prefix) {
352 break;
353 }
354 entries.push((key, value));
355 if cursor.next().is_err() {
356 break;
357 }
358 }
359 Ok(entries)
360 }
361
362 pub fn cursor(&self) -> Result<ByteTreeCursor> {
363 Ok(ByteTreeCursor {
364 storage: self.storage.clone(),
365 inner: self.index.cursor()?,
366 })
367 }
368}
369
370fn free_tree_overflow(storage: &mut Pager, root_page_id: PageId) -> Result<()> {
371 let page = storage.read_page(root_page_id)?;
372 let node = BTreeNode::from_page(page)?;
373
374 match node.node_type {
375 NodeType::Leaf => {
376 for value in node.values {
377 free_stored_value_overflow(storage, value.as_bytes())?;
378 }
379 }
380 NodeType::Internal => {
381 for child_page_id in node.children {
382 free_tree_overflow(storage, child_page_id)?;
383 }
384 }
385 }
386
387 Ok(())
388}
389
390fn validate_tree_overflow_pages(
391 storage: &mut Pager,
392 root_page_id: PageId,
393 tree_pages: &HashSet<PageId>,
394 free_pages: &HashSet<PageId>,
395 owned_overflow_pages: &mut HashSet<PageId>,
396) -> Result<()> {
397 let page = storage.read_page(root_page_id)?;
398 let node = BTreeNode::from_page(page)?;
399
400 match node.node_type {
401 NodeType::Leaf => {
402 for value in node.values {
403 let layout = StoredValueLayout::decode(value.as_bytes())?;
404 if layout.overflow_first_page != crate::storage::INVALID_PAGE_ID {
405 let first_page = Some(layout.overflow_first_page);
406 validate_overflow_chain(storage, first_page, layout.overflow_len())?;
407 for overflow_page_id in collect_overflow_page_ids(storage, first_page)? {
408 if tree_pages.contains(&overflow_page_id) {
409 return Err(crate::error::HematiteError::CorruptedData(format!(
410 "Overflow page {} overlaps a B-tree page",
411 overflow_page_id
412 )));
413 }
414 if free_pages.contains(&overflow_page_id) {
415 return Err(crate::error::HematiteError::CorruptedData(format!(
416 "Overflow page {} is also on the freelist",
417 overflow_page_id
418 )));
419 }
420 if !owned_overflow_pages.insert(overflow_page_id) {
421 return Err(crate::error::HematiteError::CorruptedData(format!(
422 "Overflow page {} is shared by multiple values",
423 overflow_page_id
424 )));
425 }
426 }
427 }
428 }
429 }
430 NodeType::Internal => {
431 for child_page_id in node.children {
432 validate_tree_overflow_pages(
433 storage,
434 child_page_id,
435 tree_pages,
436 free_pages,
437 owned_overflow_pages,
438 )?;
439 }
440 }
441 }
442
443 Ok(())
444}
445
446pub struct ByteTreeCursor {
447 storage: Arc<Mutex<Pager>>,
448 inner: BTreeCursor,
449}
450
451impl ByteTreeCursor {
452 fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
453 self.storage.lock().map_err(|_| {
454 HematiteError::InternalError("ByteTreeCursor storage mutex is poisoned".to_string())
455 })
456 }
457
458 pub fn is_valid(&self) -> bool {
459 self.inner.is_valid()
460 }
461
462 pub fn first(&mut self) -> Result<()> {
463 self.inner.first()
464 }
465
466 pub fn next(&mut self) -> Result<()> {
467 self.inner.next()
468 }
469
470 pub fn seek(&mut self, key: &[u8]) -> Result<()> {
471 self.inner.seek(&crate::btree::BTreeKey::new(key.to_vec()))
472 }
473
474 pub fn key(&self) -> Option<&[u8]> {
475 self.inner.key().map(|key| key.as_bytes())
476 }
477
478 pub fn value(&self) -> Option<&[u8]> {
479 self.inner.value().map(|value| value.as_bytes())
480 }
481
482 pub fn current(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
483 match self.inner.current() {
484 Some((key, value)) => {
485 let mut storage = self.lock_storage()?;
486 Ok(Some((
487 key.as_bytes().to_vec(),
488 hydrate_stored_value(&mut storage, value.as_bytes())?,
489 )))
490 }
491 None => Ok(None),
492 }
493 }
494
495 pub fn collect_all(&mut self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
496 self.first()?;
497 self.collect_remaining()
498 }
499
500 pub fn collect_remaining(&mut self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
501 let mut entries = Vec::new();
502 while let Some(entry) = self.current()? {
503 entries.push(entry);
504 if self.next().is_err() {
505 break;
506 }
507 }
508 Ok(entries)
509 }
510}