1use crate::config::{SQLITE_PAGE_SIZE, STABLE_PAGE_SIZE, SUPERBLOCK_SIZE};
7use crate::sqlite_vfs::overlay::{self, Overlay};
8use crate::stable::memory::{self, ContextId, StableMemoryError};
9use crate::stable::meta::{
10 fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
11 PAGE_MAP_LAYOUT_VERSION,
12};
13use std::cell::RefCell;
14use std::collections::{BTreeMap, VecDeque};
15
16const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
17const PAGE_TABLE_ENTRY_LEN: u64 = 8;
18const SEGMENT_PAGE_COUNT: u64 = 256;
19const READ_SEGMENT_CACHE_CAPACITY: usize = 8;
20const FILE_PAGE_OFFSET_CACHE_CAPACITY: usize = 64;
21const COMPACT_MIN_ORPHAN_BYTES: u64 = 16 * 1024 * 1024;
22
23#[derive(Clone, Debug, Eq, PartialEq)]
24pub struct ChecksumRefresh {
25 pub complete: bool,
26 pub checksum: u64,
27 pub scanned_bytes: u64,
28 pub db_size: u64,
29}
30
31#[derive(Clone, Debug, Eq, PartialEq)]
32pub struct StorageStats {
33 pub layout_version: u64,
34 pub page_count: u64,
35 pub page_table_bytes: u64,
36 pub active_bytes: u64,
37 pub allocated_bytes: u64,
38 pub orphan_bytes_estimate: u64,
39 pub orphan_ratio_basis_points: u64,
40 pub compact_recommended: bool,
41}
42
43#[derive(Clone, Copy, Debug, Eq, PartialEq)]
44pub(crate) enum StableBlobFailpoint {
45 OverlayWrite,
46 OverlayTruncate,
47 CommitCapacity,
48 CommitChunkWrite,
49 CommitPageTableWrite,
50 CommitSuperblockStore,
51}
52
53thread_local! {
54 static FAILPOINTS: RefCell<BTreeMap<ContextId, StableBlobFailpoint>> = const { RefCell::new(BTreeMap::new()) };
55 static READ_TABLE_CACHE: RefCell<BTreeMap<ContextId, ReadTableCache>> = const { RefCell::new(BTreeMap::new()) };
56}
57
58#[derive(Clone, Copy, Debug, Eq, PartialEq)]
59struct ReadCacheKey {
60 page_table_offset: u64,
61 page_count: u64,
62 db_size: u64,
63 last_tx_id: u64,
64}
65
66#[derive(Debug)]
67struct ReadTableCache {
68 key: Option<ReadCacheKey>,
69 root: Vec<u64>,
70 segments: BTreeMap<u64, Vec<u64>>,
71 segment_lru: VecDeque<u64>,
72}
73
74impl ReadTableCache {
75 fn new() -> Self {
76 Self {
77 key: None,
78 root: Vec::new(),
79 segments: BTreeMap::new(),
80 segment_lru: VecDeque::new(),
81 }
82 }
83
84 fn clear(&mut self) {
85 self.key = None;
86 self.root.clear();
87 self.segments.clear();
88 self.segment_lru.clear();
89 }
90
91 fn ensure_key(&mut self, key: ReadCacheKey) {
92 if self.key == Some(key) {
93 return;
94 }
95 self.clear();
96 self.key = Some(key);
97 }
98
99 fn touch_segment(&mut self, segment_no: u64) {
100 self.segment_lru.retain(|cached| *cached != segment_no);
101 self.segment_lru.push_back(segment_no);
102 }
103
104 fn insert_segment(&mut self, segment_no: u64, table: Vec<u64>) {
105 self.segments.insert(segment_no, table);
106 self.touch_segment(segment_no);
107 while self.segments.len() > READ_SEGMENT_CACHE_CAPACITY {
108 let Some(evicted) = self.segment_lru.pop_front() else {
109 return;
110 };
111 self.segments.remove(&evicted);
112 }
113 }
114}
115
116#[derive(Debug)]
117pub(crate) struct PageOffsetCache {
118 entries: Vec<(u64, u64)>,
119}
120
121impl PageOffsetCache {
122 pub(crate) fn new() -> Self {
123 Self {
124 entries: Vec::with_capacity(FILE_PAGE_OFFSET_CACHE_CAPACITY),
125 }
126 }
127
128 fn get(&self, page_no: u64) -> Option<u64> {
129 self.entries
130 .iter()
131 .find_map(|(cached_page, physical)| (*cached_page == page_no).then_some(*physical))
132 }
133
134 fn insert(&mut self, page_no: u64, physical: u64) {
135 if self
136 .entries
137 .iter()
138 .any(|(cached_page, _)| *cached_page == page_no)
139 {
140 return;
141 }
142 if self.entries.len() == FILE_PAGE_OFFSET_CACHE_CAPACITY {
143 self.entries.remove(0);
144 }
145 self.entries.push((page_no, physical));
146 }
147}
148
149#[cfg(test)]
150pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
151 if let Ok(context) = memory::active_context_id() {
152 FAILPOINTS.with(|slot| {
153 slot.borrow_mut().insert(context, failpoint);
154 });
155 }
156}
157
158#[cfg(test)]
159pub(crate) fn clear_failpoint() {
160 FAILPOINTS.with(|slot| slot.borrow_mut().clear());
161}
162
163pub(crate) fn ensure_page_map_layout() -> Result<(), StableMemoryError> {
164 let block = Superblock::load()?;
165 if block.layout_version >= PAGE_MAP_LAYOUT_VERSION {
166 return Ok(());
167 }
168 Err(StableMemoryError::UnsupportedLayoutVersion(
169 block.layout_version,
170 ))
171}
172
173pub(crate) fn begin_update() -> Result<(), StableMemoryError> {
174 ensure_page_map_layout()?;
175 overlay::begin(Superblock::load()?.db_size)
176}
177
178pub(crate) fn rollback_update() {
179 overlay::rollback();
180}
181
182#[doc(hidden)]
183pub fn invalidate_read_cache() {
184 READ_TABLE_CACHE.with(|cache| cache.borrow_mut().clear());
185}
186
187pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
188 let Some(overlay) = overlay::take() else {
189 return Ok(());
190 };
191 if overlay.is_empty() {
192 return Ok(());
193 }
194 commit_overlay(overlay, true)
195}
196
197pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
198 if let Some(result) = overlay::read_at(offset, dst) {
199 return result;
200 }
201 read_base_at(offset, dst)
202}
203
204pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
205 if dst.is_empty() {
206 return Ok(true);
207 }
208 let block = Superblock::load()?;
209 read_base_at_with_block(&block, offset, dst)
210}
211
212pub(crate) fn read_base_at_with_block(
213 block: &Superblock,
214 offset: u64,
215 dst: &mut [u8],
216) -> Result<bool, StableMemoryError> {
217 if dst.is_empty() {
218 return Ok(true);
219 }
220 if offset >= block.db_size {
221 dst.fill(0);
222 return Ok(false);
223 }
224 let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
225 let copied = requested.min(block.db_size - offset);
226 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
227 read_logical_range(block, offset, &mut dst[..copied_len])?;
228 dst[copied_len..].fill(0);
229 Ok(copied == requested)
230}
231
232pub(crate) fn read_base_at_with_page_cache(
233 block: &Superblock,
234 offset: u64,
235 dst: &mut [u8],
236 page_offsets: &mut PageOffsetCache,
237) -> Result<bool, StableMemoryError> {
238 if dst.is_empty() {
239 return Ok(true);
240 }
241 if offset >= block.db_size {
242 dst.fill(0);
243 return Ok(false);
244 }
245 let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
246 let copied = requested.min(block.db_size - offset);
247 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
248 read_logical_range_with_page_cache(block, offset, &mut dst[..copied_len], page_offsets)?;
249 dst[copied_len..].fill(0);
250 Ok(copied == requested)
251}
252
253pub(crate) fn read_base_page(page_no: u64) -> Result<Vec<u8>, StableMemoryError> {
254 let block = Superblock::load()?;
255 let mut page = zero_page();
256 if page_no >= active_page_count(&block)? {
257 return Ok(page);
258 }
259 let physical = page_offset_for(&block, page_no)?;
260 if physical != 0 {
261 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
262 crate::read_metrics::record_stable_data_read(page.len());
263 memory::read(physical, &mut page)?;
264 }
265 Ok(page)
266}
267
268pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
269 if let Some(result) = overlay::write_at(offset, bytes) {
270 hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
271 return result;
272 }
273 if bytes.is_empty() {
274 return Ok(());
275 }
276 ensure_page_map_layout()?;
277 let mut direct = Overlay::new(Superblock::load()?.db_size);
278 direct.write_at(offset, bytes)?;
279 commit_overlay(direct, false)
280}
281
282pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
283 if let Some(result) = overlay::truncate(size) {
284 hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
285 return result;
286 }
287 ensure_page_map_layout()?;
288 let mut direct = Overlay::new(Superblock::load()?.db_size);
289 direct.truncate(size)?;
290 if direct.is_empty() {
291 return Ok(());
292 }
293 commit_overlay(direct, false)
294}
295
296pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
297 if let Some(size) = overlay::file_size() {
298 return Ok(size);
299 }
300 Ok(Superblock::load()?.db_size)
301}
302
303pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
304 reject_during_update()?;
305 let block = Superblock::load()?;
306 if offset >= block.db_size {
307 return Ok(Vec::new());
308 }
309 let copied = len.min(block.db_size - offset);
310 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
311 let mut out = vec![0_u8; copied_len];
312 read_logical_range(&block, offset, &mut out)?;
313 Ok(out)
314}
315
316pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
317 reject_during_update()?;
318 let mut block = Superblock::load()?;
319 if !block.is_importing() {
320 return Err(StableMemoryError::ImportNotStarted);
321 }
322 let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
323 if offset != block.import_written_until {
324 return Err(StableMemoryError::ImportOutOfOrder {
325 offset,
326 expected: block.import_written_until,
327 });
328 }
329 let end = checked_add(offset, len)?;
330 if end > block.import_total_size {
331 return Err(StableMemoryError::ImportOutOfBounds {
332 offset,
333 len,
334 db_size: block.import_total_size,
335 });
336 }
337 memory::write(import_offset(&block, offset)?, bytes)?;
338 block.import_written_until = end;
339 block.store()?;
340 invalidate_read_cache();
341 Ok(())
342}
343
344pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
345 reject_during_update()?;
346 let mut block = Superblock::load()?;
347 if block.is_importing() {
348 return Err(StableMemoryError::ImportAlreadyStarted);
349 }
350 let import_base_offset = append_base()?;
351 checked_add(import_base_offset, total_size)?;
352 block.flags |= FLAG_IMPORTING;
353 block.clear_checksum_refresh();
354 block.import_expected_checksum = expected_checksum;
355 block.import_written_until = 0;
356 block.import_total_size = total_size;
357 block.import_base_offset = import_base_offset;
358 block.store()?;
359 invalidate_read_cache();
360 Ok(())
361}
362
363pub fn finish_import() -> Result<(), StableMemoryError> {
364 reject_during_update()?;
365 let mut block = Superblock::load()?;
366 if !block.is_importing() {
367 return Err(StableMemoryError::ImportNotStarted);
368 }
369 if block.import_written_until != block.import_total_size {
370 return Err(StableMemoryError::ImportIncomplete {
371 written_until: block.import_written_until,
372 db_size: block.import_total_size,
373 });
374 }
375 let checksum = checksum_physical_range(block.import_base_offset, block.import_total_size)?;
376 if checksum != block.import_expected_checksum {
377 let expected = block.import_expected_checksum;
378 clear_import(&mut block)?;
379 return Err(StableMemoryError::ChecksumMismatch {
380 expected,
381 actual: checksum,
382 });
383 }
384 let entries = imported_page_table(&block)?;
385 let (root_offset, root_len) = write_segmented_tables(&entries)?;
386 block.db_size = block.import_total_size;
387 block.db_base_offset = block.import_base_offset;
388 block.page_table_offset = root_offset;
389 block.page_count = root_len;
390 block.layout_version = PAGE_MAP_LAYOUT_VERSION;
391 block.flags &= !FLAG_IMPORTING;
392 block.flags &= !FLAG_CHECKSUM_STALE;
393 block.clear_checksum_refresh();
394 block.checksum = checksum;
395 block.import_expected_checksum = 0;
396 block.import_written_until = 0;
397 block.import_total_size = 0;
398 block.import_base_offset = 0;
399 block.store()?;
400 invalidate_read_cache();
401 Ok(())
402}
403
404pub fn cancel_import() -> Result<(), StableMemoryError> {
405 reject_during_update()?;
406 let mut block = Superblock::load()?;
407 if !block.is_importing() {
408 return Err(StableMemoryError::ImportNotStarted);
409 }
410 clear_import(&mut block)
411}
412
413pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
414 reject_during_update()?;
415 let checksum = checksum()?;
416 let mut block = Superblock::load()?;
417 block.checksum = checksum;
418 block.flags &= !FLAG_CHECKSUM_STALE;
419 block.clear_checksum_refresh();
420 block.store()?;
421 invalidate_read_cache();
422 Ok(checksum)
423}
424
425pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
426 reject_during_update()?;
427 if max_bytes == 0 {
428 return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
429 }
430
431 let mut block = Superblock::load()?;
432 if block.is_importing() {
433 return Err(StableMemoryError::ImportAlreadyStarted);
434 }
435 if !block.is_checksum_refreshing() {
436 block.flags |= FLAG_CHECKSUM_REFRESHING;
437 block.checksum_refresh_offset = 0;
438 block.checksum_refresh_hash = fnv1a64(&[]);
439 block.checksum_refresh_tx_id = block.last_tx_id;
440 }
441 if block.checksum_refresh_tx_id != block.last_tx_id {
442 block.clear_checksum_refresh();
443 block.store()?;
444 invalidate_read_cache();
445 return refresh_checksum_chunk(max_bytes);
446 }
447
448 let start = block.checksum_refresh_offset;
449 let end = block.db_size.min(start.saturating_add(max_bytes));
450 let mut offset = start;
451 let mut hash = block.checksum_refresh_hash;
452 while offset < end {
453 let len = (end - offset).min(CHECKSUM_CHUNK_LEN);
454 let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
455 let mut bytes = vec![0_u8; copied_len];
456 read_logical_range(&block, offset, &mut bytes)?;
457 hash = fold_fnv1a64(hash, &bytes);
458 offset += len;
459 }
460
461 block.checksum_refresh_offset = offset;
462 block.checksum_refresh_hash = hash;
463 if offset == block.db_size {
464 block.checksum = hash;
465 block.flags &= !FLAG_CHECKSUM_STALE;
466 block.clear_checksum_refresh();
467 }
468 let out = ChecksumRefresh {
469 complete: offset == block.db_size,
470 checksum: hash,
471 scanned_bytes: offset,
472 db_size: block.db_size,
473 };
474 block.store()?;
475 invalidate_read_cache();
476 Ok(out)
477}
478
479pub fn checksum() -> Result<u64, StableMemoryError> {
480 reject_during_update()?;
481 let block = Superblock::load()?;
482 checksum_logical_range(&block, block.db_size)
483}
484
485pub fn compact() -> Result<(), StableMemoryError> {
486 reject_during_update()?;
487 ensure_page_map_layout()?;
488 let block = Superblock::load()?;
489 let table = read_page_table(&block)?;
490 let mut compacted = Vec::with_capacity(table.len());
491 let mut cursor = append_base()?;
492
493 for offset in table {
494 if offset == 0 {
495 compacted.push(0);
496 continue;
497 }
498 let mut page = zero_page();
499 memory::read(offset, &mut page)?;
500 memory::write(cursor, &page)?;
501 compacted.push(cursor);
502 cursor = checked_add(cursor, page_size())?;
503 }
504
505 let (root_offset, root_len) = write_segmented_tables(&compacted)?;
506 Superblock::store_page_map_without_tx(root_offset, root_len, block.db_size)?;
507 invalidate_read_cache();
508 Ok(())
509}
510
511pub fn storage_stats() -> Result<StorageStats, StableMemoryError> {
512 let block = Superblock::load()?;
513 let table = read_page_table(&block)?;
514 let non_zero_pages = u64::try_from(table.iter().filter(|offset| **offset != 0).count())
515 .map_err(|_| StableMemoryError::OffsetOverflow)?;
516 let segment_count = active_segment_count(&block)?;
517 let root_bytes = root_table_bytes(segment_count)?;
518 let segment_bytes = segment_count
519 .checked_mul(segment_table_bytes()?)
520 .ok_or(StableMemoryError::OffsetOverflow)?;
521 let page_table_bytes = checked_add(root_bytes, segment_bytes)?;
522 let active_bytes = SUPERBLOCK_SIZE
523 .checked_add(non_zero_pages.saturating_mul(page_size()))
524 .and_then(|value| value.checked_add(page_table_bytes))
525 .ok_or(StableMemoryError::OffsetOverflow)?;
526 let allocated_bytes = memory::size_pages()
527 .checked_mul(STABLE_PAGE_SIZE)
528 .ok_or(StableMemoryError::OffsetOverflow)?;
529 let orphan_bytes_estimate = allocated_bytes.saturating_sub(active_bytes);
530 let orphan_ratio_basis_points = if active_bytes == 0 {
531 0
532 } else {
533 orphan_bytes_estimate.saturating_mul(10_000) / active_bytes
534 };
535 Ok(StorageStats {
536 layout_version: block.layout_version,
537 page_count: active_page_count(&block)?,
538 page_table_bytes,
539 active_bytes,
540 allocated_bytes,
541 orphan_bytes_estimate,
542 orphan_ratio_basis_points,
543 compact_recommended: orphan_bytes_estimate >= active_bytes
544 && orphan_bytes_estimate >= COMPACT_MIN_ORPHAN_BYTES,
545 })
546}
547
548pub(crate) fn page_count_for_size(size: u64) -> Result<u64, StableMemoryError> {
549 Ok(size.div_ceil(page_size()))
550}
551
552#[cfg(test)]
553pub(crate) fn debug_root_table_for_tests() -> Result<Vec<u64>, StableMemoryError> {
554 let block = Superblock::load()?;
555 read_root_table(&block)
556}
557
558fn commit_overlay(overlay: Overlay, advance_tx: bool) -> Result<(), StableMemoryError> {
559 hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
560 let block = Superblock::load()?;
561 let mut root = read_root_table(&block)?;
562 let final_page_count = page_count_for_size(overlay.size())?;
563 let final_segment_count = segment_count_for_pages(final_page_count)?;
564 let root_len =
565 usize::try_from(final_segment_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
566 root.resize(root_len, 0);
567 root.truncate(root_len);
568
569 let mut segment_updates = BTreeMap::<u64, Vec<u64>>::new();
570 let mut cursor = append_base()?;
571 for (page_no, page) in overlay.dirty_pages() {
572 if *page_no >= final_page_count {
573 continue;
574 }
575 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
576 memory::write(cursor, page)?;
577 let segment_no = segment_no(*page_no);
578 let index = segment_index(*page_no)?;
579 let table = load_segment_for_update(&block, &root, &mut segment_updates, segment_no)?;
580 table[index] = cursor;
581 cursor = checked_add(cursor, page_size())?;
582 }
583
584 clear_truncated_tail(&block, &root, &mut segment_updates, final_page_count)?;
585
586 hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
587 for (segment_no, table) in segment_updates {
588 if segment_no >= final_segment_count {
589 continue;
590 }
591 let offset = write_segment_table(&table)?;
592 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
593 root[index] = offset;
594 }
595 let root_offset = write_root_table(&root)?;
596 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
597 let result = if advance_tx {
598 Superblock::commit_page_map(root_offset, entries_len_u64(&root)?, overlay.size())
599 } else {
600 Superblock::store_page_map_without_tx(root_offset, entries_len_u64(&root)?, overlay.size())
601 };
602 if result.is_ok() {
603 invalidate_read_cache();
604 }
605 result
606}
607
608fn load_segment_for_update<'a>(
609 block: &Superblock,
610 root: &[u64],
611 updates: &'a mut BTreeMap<u64, Vec<u64>>,
612 segment_no: u64,
613) -> Result<&'a mut Vec<u64>, StableMemoryError> {
614 if let std::collections::btree_map::Entry::Vacant(entry) = updates.entry(segment_no) {
615 let table = read_segment_table(block, root, segment_no)?;
616 entry.insert(table);
617 }
618 updates
619 .get_mut(&segment_no)
620 .ok_or(StableMemoryError::OffsetOverflow)
621}
622
623fn clear_truncated_tail(
624 block: &Superblock,
625 root: &[u64],
626 updates: &mut BTreeMap<u64, Vec<u64>>,
627 final_page_count: u64,
628) -> Result<(), StableMemoryError> {
629 let old_page_count = active_page_count(block)?;
630 if final_page_count >= old_page_count || final_page_count == 0 {
631 return Ok(());
632 }
633 let boundary_segment = segment_no(final_page_count);
634 if boundary_segment >= segment_count_for_pages(final_page_count)? {
635 return Ok(());
636 }
637 let start = segment_index(final_page_count)?;
638 if start == 0 {
639 return Ok(());
640 }
641 let table = load_segment_for_update(block, root, updates, boundary_segment)?;
642 table[start..].fill(0);
643 Ok(())
644}
645
646fn reject_during_update() -> Result<(), StableMemoryError> {
647 if overlay::is_active() {
648 Err(StableMemoryError::UpdateInProgress)
649 } else {
650 Ok(())
651 }
652}
653
654fn read_logical_range(
655 block: &Superblock,
656 offset: u64,
657 dst: &mut [u8],
658) -> Result<(), StableMemoryError> {
659 if dst.is_empty() {
660 return Ok(());
661 }
662 let in_page =
663 usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
664 if dst.len() <= page_len() - in_page {
665 return read_logical_page_slice(block, offset / page_size(), in_page, dst);
666 }
667
668 let mut copied_total = 0_usize;
669 while copied_total < dst.len() {
670 let absolute = checked_add(
671 offset,
672 u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
673 )?;
674 let page_no = absolute / page_size();
675 let in_page = usize::try_from(absolute % page_size())
676 .map_err(|_| StableMemoryError::OffsetOverflow)?;
677 let copied = (page_len() - in_page).min(dst.len() - copied_total);
678 read_logical_page_slice(
679 block,
680 page_no,
681 in_page,
682 &mut dst[copied_total..copied_total + copied],
683 )?;
684 copied_total += copied;
685 }
686 Ok(())
687}
688
689fn read_logical_range_with_page_cache(
690 block: &Superblock,
691 offset: u64,
692 dst: &mut [u8],
693 page_offsets: &mut PageOffsetCache,
694) -> Result<(), StableMemoryError> {
695 if dst.is_empty() {
696 return Ok(());
697 }
698 let in_page =
699 usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
700 if dst.len() <= page_len() - in_page {
701 return read_logical_page_slice_with_page_cache(
702 block,
703 offset / page_size(),
704 in_page,
705 dst,
706 page_offsets,
707 );
708 }
709
710 let mut copied_total = 0_usize;
711 while copied_total < dst.len() {
712 let absolute = checked_add(
713 offset,
714 u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
715 )?;
716 let page_no = absolute / page_size();
717 let in_page = usize::try_from(absolute % page_size())
718 .map_err(|_| StableMemoryError::OffsetOverflow)?;
719 let copied = (page_len() - in_page).min(dst.len() - copied_total);
720 read_logical_page_slice_with_page_cache(
721 block,
722 page_no,
723 in_page,
724 &mut dst[copied_total..copied_total + copied],
725 page_offsets,
726 )?;
727 copied_total += copied;
728 }
729 Ok(())
730}
731
732fn read_logical_page_slice(
733 block: &Superblock,
734 page_no: u64,
735 in_page: usize,
736 dst: &mut [u8],
737) -> Result<(), StableMemoryError> {
738 let physical = page_offset_for(block, page_no)?;
739 if physical == 0 {
740 dst.fill(0);
741 return Ok(());
742 }
743 let stable_offset = checked_add(
744 physical,
745 u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
746 )?;
747 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
748 crate::read_metrics::record_stable_data_read(dst.len());
749 memory::read(stable_offset, dst)
750}
751
752fn read_logical_page_slice_with_page_cache(
753 block: &Superblock,
754 page_no: u64,
755 in_page: usize,
756 dst: &mut [u8],
757 page_offsets: &mut PageOffsetCache,
758) -> Result<(), StableMemoryError> {
759 let physical = match page_offsets.get(page_no) {
760 Some(physical) => physical,
761 None => {
762 let physical = page_offset_for(block, page_no)?;
763 page_offsets.insert(page_no, physical);
764 physical
765 }
766 };
767 if physical == 0 {
768 dst.fill(0);
769 return Ok(());
770 }
771 let stable_offset = checked_add(
772 physical,
773 u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
774 )?;
775 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
776 crate::read_metrics::record_stable_data_read(dst.len());
777 memory::read(stable_offset, dst)
778}
779
780fn page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
781 if page_no >= active_page_count(block)? || block.page_table_offset == 0 {
782 return Ok(0);
783 }
784 cached_page_offset_for(block, page_no)
785}
786
787fn read_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
788 let root = read_root_table(block)?;
789 let count = active_page_count(block)?;
790 let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
791 let mut entries = Vec::with_capacity(capacity);
792 for segment_no in 0..segment_count_for_pages(count)? {
793 let table = read_segment_table(block, &root, segment_no)?;
794 for entry in table {
795 if entries.len() == capacity {
796 break;
797 }
798 entries.push(entry);
799 }
800 }
801 Ok(entries)
802}
803
804fn cached_page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
805 let context = memory::active_context_id()?;
806 let key = read_cache_key(block);
807 let segment_no = segment_no(page_no);
808 let index = segment_index(page_no)?;
809 READ_TABLE_CACHE.with(|cache| {
810 let mut caches = cache.borrow_mut();
811 let cache = caches.entry(context).or_insert_with(ReadTableCache::new);
812 cache.ensure_key(key);
813 if cache.root.is_empty() {
814 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
815 crate::read_metrics::record_page_table_root_miss();
816 cache.root = read_root_table(block)?;
817 } else {
818 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
819 crate::read_metrics::record_page_table_root_hit();
820 }
821 let Some(segment_offset) = cache
822 .root
823 .get(usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?)
824 .copied()
825 else {
826 return Ok(0);
827 };
828 if segment_offset == 0 {
829 return Ok(0);
830 }
831 if cache.segments.contains_key(&segment_no) {
832 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
833 crate::read_metrics::record_page_table_segment_hit();
834 cache.touch_segment(segment_no);
835 } else {
836 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
837 crate::read_metrics::record_page_table_segment_miss();
838 let table = read_segment_table_at(segment_offset)?;
839 cache.insert_segment(segment_no, table);
840 }
841 Ok(cache
842 .segments
843 .get(&segment_no)
844 .and_then(|table| table.get(index))
845 .copied()
846 .unwrap_or(0))
847 })
848}
849
850fn read_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
851 if block.page_count == 0 {
852 return Ok(Vec::new());
853 }
854 let bytes_len = usize::try_from(root_table_bytes(block.page_count)?)
855 .map_err(|_| StableMemoryError::OffsetOverflow)?;
856 let mut bytes = vec![0_u8; bytes_len];
857 memory::read(block.page_table_offset, &mut bytes)?;
858 decode_u64_table(&bytes)
859}
860
861fn read_segment_table(
862 _block: &Superblock,
863 root: &[u64],
864 segment_no: u64,
865) -> Result<Vec<u64>, StableMemoryError> {
866 let table = vec![0_u64; segment_page_count_usize()];
867 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
868 let Some(offset) = root.get(index).copied() else {
869 return Ok(table);
870 };
871 if offset == 0 {
872 return Ok(table);
873 }
874 read_segment_table_at(offset)
875}
876
877fn read_segment_table_at(offset: u64) -> Result<Vec<u64>, StableMemoryError> {
878 let mut bytes = vec![0_u8; segment_table_len()];
879 memory::read(offset, &mut bytes)?;
880 let mut table = decode_u64_table(&bytes)?;
881 table.resize(segment_page_count_usize(), 0);
882 Ok(table)
883}
884
885fn write_segmented_tables(entries: &[u64]) -> Result<(u64, u64), StableMemoryError> {
886 if entries.is_empty() {
887 return Ok((0, 0));
888 }
889 let root_len = segment_count_for_pages(entries_len_u64(entries)?)?;
890 let mut root = Vec::with_capacity(
891 usize::try_from(root_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
892 );
893 for segment_no in 0..root_len {
894 let start = usize::try_from(
895 segment_no
896 .checked_mul(SEGMENT_PAGE_COUNT)
897 .ok_or(StableMemoryError::OffsetOverflow)?,
898 )
899 .map_err(|_| StableMemoryError::OffsetOverflow)?;
900 let mut table = vec![0_u64; segment_page_count_usize()];
901 for (offset, entry) in entries[start..]
902 .iter()
903 .take(segment_page_count_usize())
904 .enumerate()
905 {
906 table[offset] = *entry;
907 }
908 root.push(write_segment_table(&table)?);
909 }
910 let root_offset = write_root_table(&root)?;
911 Ok((root_offset, entries_len_u64(&root)?))
912}
913
914fn write_segment_table(entries: &[u64]) -> Result<u64, StableMemoryError> {
915 let mut table = vec![0_u64; segment_page_count_usize()];
916 for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
917 table[index] = *entry;
918 }
919 write_u64_table(&table)
920}
921
922fn write_root_table(entries: &[u64]) -> Result<u64, StableMemoryError> {
923 write_u64_table(entries)
924}
925
926fn write_u64_table(entries: &[u64]) -> Result<u64, StableMemoryError> {
927 if entries.is_empty() {
928 return Ok(0);
929 }
930 let offset = append_base()?;
931 let mut bytes = Vec::with_capacity(entries.len() * 8);
932 for entry in entries {
933 bytes.extend_from_slice(&entry.to_le_bytes());
934 }
935 memory::write(offset, &bytes)?;
936 Ok(offset)
937}
938
939fn decode_u64_table(bytes: &[u8]) -> Result<Vec<u64>, StableMemoryError> {
940 if !bytes.len().is_multiple_of(8) {
941 return Err(StableMemoryError::OffsetOverflow);
942 }
943 let mut entries = Vec::with_capacity(bytes.len() / 8);
944 for chunk in bytes.chunks_exact(8) {
945 let mut entry = [0_u8; 8];
946 entry.copy_from_slice(chunk);
947 entries.push(u64::from_le_bytes(entry));
948 }
949 Ok(entries)
950}
951
952fn imported_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
953 let count = page_count_for_size(block.import_total_size)?;
954 let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
955 let mut entries = Vec::with_capacity(capacity);
956 for page_no in 0..count {
957 entries.push(checked_add(
958 block.import_base_offset,
959 page_no
960 .checked_mul(page_size())
961 .ok_or(StableMemoryError::OffsetOverflow)?,
962 )?);
963 }
964 Ok(entries)
965}
966
967fn checksum_logical_range(block: &Superblock, len: u64) -> Result<u64, StableMemoryError> {
968 let mut offset = 0_u64;
969 let mut hash = fnv1a64(&[]);
970 while offset < len {
971 let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
972 let copied_len =
973 usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
974 let mut bytes = vec![0_u8; copied_len];
975 read_logical_range(block, offset, &mut bytes)?;
976 hash = fold_fnv1a64(hash, &bytes);
977 offset += chunk_len;
978 }
979 Ok(hash)
980}
981
982fn checksum_physical_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
983 let mut offset = 0_u64;
984 let mut hash = fnv1a64(&[]);
985 while offset < len {
986 let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
987 let copied_len =
988 usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
989 let mut bytes = vec![0_u8; copied_len];
990 memory::read(checked_add(base_offset, offset)?, &mut bytes)?;
991 hash = fold_fnv1a64(hash, &bytes);
992 offset += chunk_len;
993 }
994 Ok(hash)
995}
996
997fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
998 block.flags &= !FLAG_IMPORTING;
999 block.import_expected_checksum = 0;
1000 block.import_written_until = 0;
1001 block.import_total_size = 0;
1002 block.import_base_offset = 0;
1003 block.store()?;
1004 invalidate_read_cache();
1005 Ok(())
1006}
1007
1008fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
1009 checked_add(block.import_base_offset, offset)
1010}
1011
1012fn active_page_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1013 page_count_for_size(block.db_size)
1014}
1015
1016fn active_segment_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1017 Ok(block.page_count)
1018}
1019
1020fn read_cache_key(block: &Superblock) -> ReadCacheKey {
1021 ReadCacheKey {
1022 page_table_offset: block.page_table_offset,
1023 page_count: block.page_count,
1024 db_size: block.db_size,
1025 last_tx_id: block.last_tx_id,
1026 }
1027}
1028
1029fn segment_count_for_pages(page_count: u64) -> Result<u64, StableMemoryError> {
1030 Ok(page_count.div_ceil(SEGMENT_PAGE_COUNT))
1031}
1032
1033fn segment_no(page_no: u64) -> u64 {
1034 page_no / SEGMENT_PAGE_COUNT
1035}
1036
1037fn segment_index(page_no: u64) -> Result<usize, StableMemoryError> {
1038 usize::try_from(page_no % SEGMENT_PAGE_COUNT).map_err(|_| StableMemoryError::OffsetOverflow)
1039}
1040
1041fn segment_page_count_usize() -> usize {
1042 usize::try_from(SEGMENT_PAGE_COUNT).expect("segment page count fits usize")
1043}
1044
1045fn segment_table_len() -> usize {
1046 segment_page_count_usize() * 8
1047}
1048
1049fn segment_table_bytes() -> Result<u64, StableMemoryError> {
1050 u64::try_from(segment_table_len()).map_err(|_| StableMemoryError::OffsetOverflow)
1051}
1052
1053fn root_table_bytes(entry_count: u64) -> Result<u64, StableMemoryError> {
1054 entry_count
1055 .checked_mul(PAGE_TABLE_ENTRY_LEN)
1056 .ok_or(StableMemoryError::OffsetOverflow)
1057}
1058
1059fn entries_len_u64<T>(entries: &[T]) -> Result<u64, StableMemoryError> {
1060 u64::try_from(entries.len()).map_err(|_| StableMemoryError::OffsetOverflow)
1061}
1062
1063fn append_base() -> Result<u64, StableMemoryError> {
1064 memory::size_pages()
1065 .checked_mul(STABLE_PAGE_SIZE)
1066 .ok_or(StableMemoryError::OffsetOverflow)
1067}
1068
1069fn page_size() -> u64 {
1070 u64::from(SQLITE_PAGE_SIZE)
1071}
1072
1073fn page_len() -> usize {
1074 usize::try_from(SQLITE_PAGE_SIZE).expect("SQLite page size fits usize")
1075}
1076
1077fn zero_page() -> Vec<u8> {
1078 vec![0_u8; page_len()]
1079}
1080
1081fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
1082 left.checked_add(right)
1083 .ok_or(StableMemoryError::OffsetOverflow)
1084}
1085
1086fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
1087 for byte in bytes {
1088 hash ^= u64::from(*byte);
1089 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
1090 }
1091 hash
1092}
1093
1094fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1095 let Ok(context) = memory::active_context_id() else {
1096 return Ok(());
1097 };
1098 FAILPOINTS.with(|slot| {
1099 let mut slot = slot.borrow_mut();
1100 if slot.get(&context).copied() == Some(failpoint) {
1101 slot.remove(&context);
1102 Err(StableMemoryError::Failpoint(failpoint.name()))
1103 } else {
1104 Ok(())
1105 }
1106 })
1107}
1108
1109impl StableBlobFailpoint {
1110 fn name(self) -> &'static str {
1111 match self {
1112 Self::OverlayWrite => "before overlay write",
1113 Self::OverlayTruncate => "before overlay truncate",
1114 Self::CommitCapacity => "before commit capacity",
1115 Self::CommitChunkWrite => "before commit page write",
1116 Self::CommitPageTableWrite => "before commit page table write",
1117 Self::CommitSuperblockStore => "before commit superblock store",
1118 }
1119 }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124 use super::*;
1125
1126 #[test]
1127 fn layout_math_matches_expected_boundaries() {
1128 assert_eq!(page_count_for_size(0).unwrap(), 0);
1129 assert_eq!(page_count_for_size(1).unwrap(), 1);
1130 assert_eq!(page_count_for_size(page_size()).unwrap(), 1);
1131 assert_eq!(page_count_for_size(page_size() + 1).unwrap(), 2);
1132
1133 assert_eq!(segment_count_for_pages(0).unwrap(), 0);
1134 assert_eq!(segment_count_for_pages(1).unwrap(), 1);
1135 assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT).unwrap(), 1);
1136 assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT + 1).unwrap(), 2);
1137
1138 assert_eq!(segment_no(SEGMENT_PAGE_COUNT), 1);
1139 assert_eq!(segment_index(SEGMENT_PAGE_COUNT - 1).unwrap(), 255);
1140 assert_eq!(segment_index(SEGMENT_PAGE_COUNT).unwrap(), 0);
1141 assert_eq!(root_table_bytes(2).unwrap(), 16);
1142 }
1143
1144 #[test]
1145 fn layout_math_rejects_u64_max_overflow_boundaries() {
1146 assert!(matches!(
1147 root_table_bytes(u64::MAX),
1148 Err(StableMemoryError::OffsetOverflow)
1149 ));
1150 assert!(matches!(
1151 checked_add(u64::MAX, 1),
1152 Err(StableMemoryError::OffsetOverflow)
1153 ));
1154
1155 let mut block = Superblock::fresh();
1156 block.import_base_offset = u64::MAX;
1157 assert!(matches!(
1158 import_offset(&block, 1),
1159 Err(StableMemoryError::OffsetOverflow)
1160 ));
1161
1162 block.import_base_offset = u64::MAX - page_size() + 1;
1163 block.import_total_size = page_size() + 1;
1164 assert!(matches!(
1165 imported_page_table(&block),
1166 Err(StableMemoryError::OffsetOverflow)
1167 ));
1168 }
1169
1170 #[test]
1171 #[serial_test::serial]
1172 fn read_metrics_separate_table_cache_from_data_reads() {
1173 crate::stable::memory::reset_for_tests();
1174 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
1175 invalidate_read_cache();
1176
1177 let page = vec![7_u8; page_len()];
1178 write_at(0, &page).unwrap();
1179 invalidate_read_cache();
1180 crate::read_metrics::reset_read_metrics();
1181
1182 let first = read_base_page(0).unwrap();
1183 let second = read_base_page(0).unwrap();
1184 let metrics = crate::read_metrics::read_metrics_snapshot();
1185
1186 assert_eq!(first, page);
1187 assert_eq!(second, page);
1188 assert!(metrics.stable_data_read_calls >= 2);
1189 assert!(metrics.stable_data_read_bytes >= page_size() * 2);
1190 assert!(metrics.page_table_root_misses >= 1);
1191 assert!(metrics.page_table_root_hits >= 1);
1192 assert!(metrics.page_table_segment_misses >= 1);
1193 assert!(metrics.page_table_segment_hits >= 1);
1194 #[cfg(feature = "bench-profile")]
1195 assert!(metrics.superblock_loads <= 1);
1196 #[cfg(not(feature = "bench-profile"))]
1197 assert_eq!(metrics.superblock_loads, 0);
1198 }
1199}