1use crate::config::{SQLITE_PAGE_SIZE, STABLE_PAGE_SIZE, SUPERBLOCK_SIZE};
7use crate::sqlite_vfs::overlay::{self, Overlay};
8use crate::stable::memory::{self, ContextId, StableMemoryError};
9use crate::stable::meta::{
10 fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
11 PAGE_MAP_LAYOUT_VERSION,
12};
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::mem::MaybeUninit;
16
17const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
18const PAGE_TABLE_ENTRY_LEN: u64 = 8;
19const SEGMENT_PAGE_COUNT: u64 = 256;
20const SEGMENT_TABLE_BYTES: u64 = SEGMENT_PAGE_COUNT * PAGE_TABLE_ENTRY_LEN;
21const SINGLE_SEGMENT_PAGE_TABLE_BYTES: u64 = SEGMENT_TABLE_BYTES + PAGE_TABLE_ENTRY_LEN;
22const READ_SEGMENT_CACHE_CAPACITY: usize = 8;
23const FILE_PAGE_OFFSET_CACHE_CAPACITY: usize = 64;
24const FILE_PAGE_DATA_CACHE_CAPACITY: usize = 8;
25const COMPACT_MIN_ORPHAN_BYTES: u64 = 16 * 1024 * 1024;
26
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub struct ChecksumRefresh {
29 pub complete: bool,
30 pub checksum: u64,
31 pub scanned_bytes: u64,
32 pub db_size: u64,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36pub struct StorageStats {
37 pub layout_version: u64,
38 pub page_count: u64,
39 pub page_table_bytes: u64,
40 pub active_bytes: u64,
41 pub allocated_bytes: u64,
42 pub orphan_bytes_estimate: u64,
43 pub orphan_ratio_basis_points: u64,
44 pub compact_recommended: bool,
45}
46
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub(crate) enum StableBlobFailpoint {
49 OverlayWrite,
50 OverlayTruncate,
51 CommitCapacity,
52 CommitChunkWrite,
53 CommitPageTableWrite,
54 CommitSuperblockStore,
55}
56
57thread_local! {
58 #[cfg(test)]
59 static FAILPOINTS: RefCell<BTreeMap<ContextId, StableBlobFailpoint>> = const { RefCell::new(BTreeMap::new()) };
60 static READ_TABLE_CACHE: RefCell<Vec<(ContextId, ReadTableCache)>> = const { RefCell::new(Vec::new()) };
61 static COMMIT_SEGMENT_CACHE: RefCell<Vec<(ContextId, CommitSegmentCache)>> = const { RefCell::new(Vec::new()) };
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65struct ReadCacheKey {
66 page_table_offset: u64,
67 page_count: u64,
68 db_size: u64,
69 last_tx_id: u64,
70}
71
72#[derive(Debug)]
73struct ReadTableCache {
74 key: Option<ReadCacheKey>,
75 root: Vec<u64>,
76 segments: Vec<CachedSegment>,
77}
78
79#[derive(Debug)]
80struct CachedSegment {
81 segment_no: u64,
82 table: Vec<u64>,
83}
84
85#[derive(Debug)]
86struct CommitSegmentCache {
87 segment_no: u64,
88 segment_offset: u64,
89 table: Vec<u64>,
90}
91
92impl ReadTableCache {
93 fn new() -> Self {
94 Self {
95 key: None,
96 root: Vec::new(),
97 segments: Vec::new(),
98 }
99 }
100
101 fn clear(&mut self) {
102 self.key = None;
103 self.root.clear();
104 self.segments.clear();
105 }
106
107 fn ensure_key(&mut self, key: ReadCacheKey) {
108 if self.key == Some(key) {
109 return;
110 }
111 self.clear();
112 self.key = Some(key);
113 }
114
115 #[inline(always)]
116 fn segment_page_offset(&mut self, segment_no: u64, index: usize) -> Option<u64> {
117 if self.segments.is_empty() {
118 return None;
119 }
120 if self.segments.len() == 1 {
121 let segment = &self.segments[0];
122 if segment.segment_no == segment_no {
123 return Some(segment.table[index]);
124 }
125 return None;
126 }
127 let position = self
128 .segments
129 .iter()
130 .position(|segment| segment.segment_no == segment_no)?;
131 let offset = Some(self.segments[position].table[index]);
132 if position + 1 != self.segments.len() {
133 let segment = self.segments.remove(position);
134 self.segments.push(segment);
135 }
136 offset
137 }
138
139 fn insert_segment(&mut self, segment_no: u64, table: Vec<u64>) {
140 if let Some(position) = self
141 .segments
142 .iter()
143 .position(|segment| segment.segment_no == segment_no)
144 {
145 self.segments.remove(position);
146 }
147 self.segments.push(CachedSegment { segment_no, table });
148 while self.segments.len() > READ_SEGMENT_CACHE_CAPACITY {
149 self.segments.remove(0);
150 }
151 }
152}
153
154#[derive(Debug)]
155pub(crate) struct PageOffsetCache {
156 entries: Vec<(u64, u64)>,
157 pages: Vec<(u64, Vec<u8>)>,
158}
159
160impl PageOffsetCache {
161 pub(crate) fn new() -> Self {
162 Self {
163 entries: Vec::with_capacity(FILE_PAGE_OFFSET_CACHE_CAPACITY),
164 pages: Vec::new(),
165 }
166 }
167
168 fn get(&self, page_no: u64) -> Option<u64> {
169 match self.entries.as_slice() {
170 [] => None,
171 [(cached_page, physical)] => (*cached_page == page_no).then_some(*physical),
172 entries => {
173 for (cached_page, physical) in entries {
174 if *cached_page == page_no {
175 return Some(*physical);
176 }
177 }
178 None
179 }
180 }
181 }
182
183 fn insert(&mut self, page_no: u64, physical: u64) {
184 if self.entries.len() == FILE_PAGE_OFFSET_CACHE_CAPACITY {
185 self.entries.remove(0);
186 }
187 self.entries.push((page_no, physical));
188 }
189
190 #[inline(always)]
191 fn copy_page_slice(&self, page_no: u64, in_page: usize, dst: &mut [u8]) -> bool {
192 if self.pages.is_empty() {
193 return false;
194 }
195 if self.pages.len() == 1 {
196 let (cached_page, page) = &self.pages[0];
197 if *cached_page == page_no {
198 let end = in_page + dst.len();
199 dst.copy_from_slice(&page[in_page..end]);
200 return true;
201 }
202 return false;
203 }
204 for (cached_page, page) in &self.pages {
205 if *cached_page == page_no {
206 let end = in_page + dst.len();
207 dst.copy_from_slice(&page[in_page..end]);
208 return true;
209 }
210 }
211 false
212 }
213
214 fn insert_page(&mut self, page_no: u64, page: Vec<u8>) {
215 if self.pages.len() == FILE_PAGE_DATA_CACHE_CAPACITY {
216 self.pages.remove(0);
217 }
218 self.pages.push((page_no, page));
219 }
220}
221
222#[cfg(test)]
223pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
224 if let Ok(context) = memory::active_context_id() {
225 FAILPOINTS.with(|slot| {
226 slot.borrow_mut().insert(context, failpoint);
227 });
228 }
229}
230
231#[cfg(test)]
232pub(crate) fn clear_failpoint() {
233 FAILPOINTS.with(|slot| slot.borrow_mut().clear());
234}
235
236pub(crate) fn ensure_page_map_layout() -> Result<(), StableMemoryError> {
237 let block = Superblock::load()?;
238 if block.layout_version >= PAGE_MAP_LAYOUT_VERSION {
239 return Ok(());
240 }
241 Err(StableMemoryError::UnsupportedLayoutVersion(
242 block.layout_version,
243 ))
244}
245
246pub(crate) fn begin_update() -> Result<u64, StableMemoryError> {
247 let block = Superblock::load()?;
248 if block.layout_version < PAGE_MAP_LAYOUT_VERSION {
249 return Err(StableMemoryError::UnsupportedLayoutVersion(
250 block.layout_version,
251 ));
252 }
253 if block.is_importing() {
254 return Err(StableMemoryError::ImportAlreadyStarted);
255 }
256 overlay::begin(block.db_size)?;
257 Ok(block.db_size)
258}
259
260pub(crate) fn rollback_update() {
261 overlay::rollback();
262}
263
264#[doc(hidden)]
265pub fn invalidate_read_cache() {
266 READ_TABLE_CACHE.with(|cache| cache.borrow_mut().clear());
267 COMMIT_SEGMENT_CACHE.with(|cache| cache.borrow_mut().clear());
268}
269
270pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
271 let Some(overlay) = overlay::take() else {
272 return Ok(());
273 };
274 if overlay.is_empty() {
275 return Ok(());
276 }
277 commit_overlay(overlay, true)
278}
279
280pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
281 if let Some(result) = overlay::read_at(offset, dst) {
282 return result;
283 }
284 read_base_at(offset, dst)
285}
286
287pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
288 if dst.is_empty() {
289 return Ok(true);
290 }
291 let block = Superblock::load()?;
292 read_base_at_with_block(&block, offset, dst)
293}
294
295pub(crate) fn read_base_at_with_block(
296 block: &Superblock,
297 offset: u64,
298 dst: &mut [u8],
299) -> Result<bool, StableMemoryError> {
300 if dst.is_empty() {
301 return Ok(true);
302 }
303 if offset >= block.db_size {
304 dst.fill(0);
305 return Ok(false);
306 }
307 let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
308 if requested <= block.db_size - offset {
309 read_logical_range(block, offset, dst)?;
310 return Ok(true);
311 }
312 let copied = requested.min(block.db_size - offset);
313 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
314 read_logical_range(block, offset, &mut dst[..copied_len])?;
315 dst[copied_len..].fill(0);
316 Ok(copied == requested)
317}
318
319#[inline(always)]
320pub(crate) fn read_base_at_with_page_cache(
321 block: &Superblock,
322 offset: u64,
323 dst: &mut [u8],
324 page_offsets: &mut PageOffsetCache,
325) -> Result<bool, StableMemoryError> {
326 if dst.is_empty() {
327 return Ok(true);
328 }
329 if offset >= block.db_size {
330 dst.fill(0);
331 return Ok(false);
332 }
333 let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
334 if requested <= block.db_size - offset {
335 read_logical_range_with_page_cache(block, offset, dst, page_offsets)?;
336 return Ok(true);
337 }
338 let copied = requested.min(block.db_size - offset);
339 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
340 read_logical_range_with_page_cache(block, offset, &mut dst[..copied_len], page_offsets)?;
341 dst[copied_len..].fill(0);
342 Ok(copied == requested)
343}
344
345pub(crate) fn read_base_page(page_no: u64) -> Result<Vec<u8>, StableMemoryError> {
346 let block = Superblock::load()?;
347 let mut page = zero_page();
348 if page_no >= active_page_count(&block)? {
349 return Ok(page);
350 }
351 let physical = page_offset_for(&block, page_no)?;
352 if physical != 0 {
353 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
354 crate::read_metrics::record_stable_data_read(page.len());
355 memory::read_preallocated(physical, &mut page)?;
356 }
357 Ok(page)
358}
359
360pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
361 if let Some(result) = overlay::write_at(offset, bytes) {
362 hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
363 return result;
364 }
365 if bytes.is_empty() {
366 return Ok(());
367 }
368 ensure_page_map_layout()?;
369 let mut direct = Overlay::new(Superblock::load()?.db_size);
370 direct.write_at(offset, bytes)?;
371 commit_overlay(direct, false)
372}
373
374pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
375 if let Some(result) = overlay::truncate(size) {
376 hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
377 return result;
378 }
379 ensure_page_map_layout()?;
380 let mut direct = Overlay::new(Superblock::load()?.db_size);
381 direct.truncate(size)?;
382 if direct.is_empty() {
383 return Ok(());
384 }
385 commit_overlay(direct, false)
386}
387
388pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
389 if let Some(size) = overlay::file_size() {
390 return Ok(size);
391 }
392 Ok(Superblock::load()?.db_size)
393}
394
395pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
396 reject_during_update()?;
397 let block = Superblock::load()?;
398 if offset >= block.db_size {
399 return Ok(Vec::new());
400 }
401 let copied = len.min(block.db_size - offset);
402 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
403 let mut out = vec![0_u8; copied_len];
404 read_logical_range(&block, offset, &mut out)?;
405 Ok(out)
406}
407
408pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
409 reject_during_update()?;
410 let mut block = Superblock::load()?;
411 if !block.is_importing() {
412 return Err(StableMemoryError::ImportNotStarted);
413 }
414 let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
415 if offset != block.import_written_until {
416 return Err(StableMemoryError::ImportOutOfOrder {
417 offset,
418 expected: block.import_written_until,
419 });
420 }
421 let end = checked_add(offset, len)?;
422 if end > block.import_total_size {
423 return Err(StableMemoryError::ImportOutOfBounds {
424 offset,
425 len,
426 db_size: block.import_total_size,
427 });
428 }
429 memory::write(import_offset(&block, offset)?, bytes)?;
430 block.import_written_until = end;
431 block.store()?;
432 invalidate_read_cache();
433 Ok(())
434}
435
436pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
437 reject_during_update()?;
438 let mut block = Superblock::load()?;
439 if block.is_importing() {
440 return Err(StableMemoryError::ImportAlreadyStarted);
441 }
442 let import_base_offset = append_base()?;
443 checked_add(import_base_offset, total_size)?;
444 block.flags |= FLAG_IMPORTING;
445 block.clear_checksum_refresh();
446 block.import_expected_checksum = expected_checksum;
447 block.import_written_until = 0;
448 block.import_total_size = total_size;
449 block.import_base_offset = import_base_offset;
450 block.store()?;
451 invalidate_read_cache();
452 Ok(())
453}
454
455pub fn finish_import() -> Result<(), StableMemoryError> {
456 reject_during_update()?;
457 let mut block = Superblock::load()?;
458 if !block.is_importing() {
459 return Err(StableMemoryError::ImportNotStarted);
460 }
461 if block.import_written_until != block.import_total_size {
462 return Err(StableMemoryError::ImportIncomplete {
463 written_until: block.import_written_until,
464 db_size: block.import_total_size,
465 });
466 }
467 let checksum = checksum_physical_range(block.import_base_offset, block.import_total_size)?;
468 if checksum != block.import_expected_checksum {
469 let expected = block.import_expected_checksum;
470 clear_import(&mut block)?;
471 return Err(StableMemoryError::ChecksumMismatch {
472 expected,
473 actual: checksum,
474 });
475 }
476 let entries = imported_page_table(&block)?;
477 let (root_offset, root_len) = write_segmented_tables(&entries)?;
478 block.db_size = block.import_total_size;
479 block.db_base_offset = block.import_base_offset;
480 block.page_table_offset = root_offset;
481 block.page_count = root_len;
482 block.layout_version = PAGE_MAP_LAYOUT_VERSION;
483 block.flags &= !FLAG_IMPORTING;
484 block.flags &= !FLAG_CHECKSUM_STALE;
485 block.clear_checksum_refresh();
486 block.checksum = checksum;
487 block.import_expected_checksum = 0;
488 block.import_written_until = 0;
489 block.import_total_size = 0;
490 block.import_base_offset = 0;
491 block.store()?;
492 invalidate_read_cache();
493 Ok(())
494}
495
496pub fn cancel_import() -> Result<(), StableMemoryError> {
497 reject_during_update()?;
498 let mut block = Superblock::load()?;
499 if !block.is_importing() {
500 return Err(StableMemoryError::ImportNotStarted);
501 }
502 clear_import(&mut block)
503}
504
505pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
506 reject_during_update()?;
507 let checksum = checksum()?;
508 let mut block = Superblock::load()?;
509 block.checksum = checksum;
510 block.flags &= !FLAG_CHECKSUM_STALE;
511 block.clear_checksum_refresh();
512 block.store()?;
513 invalidate_read_cache();
514 Ok(checksum)
515}
516
517pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
518 reject_during_update()?;
519 if max_bytes == 0 {
520 return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
521 }
522
523 let mut block = Superblock::load()?;
524 if block.is_importing() {
525 return Err(StableMemoryError::ImportAlreadyStarted);
526 }
527 if !block.is_checksum_refreshing() {
528 block.flags |= FLAG_CHECKSUM_REFRESHING;
529 block.checksum_refresh_offset = 0;
530 block.checksum_refresh_hash = fnv1a64(&[]);
531 block.checksum_refresh_tx_id = block.last_tx_id;
532 }
533 if block.checksum_refresh_tx_id != block.last_tx_id {
534 block.clear_checksum_refresh();
535 block.store()?;
536 invalidate_read_cache();
537 return refresh_checksum_chunk(max_bytes);
538 }
539
540 let start = block.checksum_refresh_offset;
541 let end = block.db_size.min(start.saturating_add(max_bytes));
542 let mut offset = start;
543 let mut hash = block.checksum_refresh_hash;
544 while offset < end {
545 let len = (end - offset).min(CHECKSUM_CHUNK_LEN);
546 let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
547 let mut bytes = vec![0_u8; copied_len];
548 read_logical_range(&block, offset, &mut bytes)?;
549 hash = fold_fnv1a64(hash, &bytes);
550 offset += len;
551 }
552
553 block.checksum_refresh_offset = offset;
554 block.checksum_refresh_hash = hash;
555 if offset == block.db_size {
556 block.checksum = hash;
557 block.flags &= !FLAG_CHECKSUM_STALE;
558 block.clear_checksum_refresh();
559 }
560 let out = ChecksumRefresh {
561 complete: offset == block.db_size,
562 checksum: hash,
563 scanned_bytes: offset,
564 db_size: block.db_size,
565 };
566 block.store()?;
567 invalidate_read_cache();
568 Ok(out)
569}
570
571pub fn checksum() -> Result<u64, StableMemoryError> {
572 reject_during_update()?;
573 let block = Superblock::load()?;
574 checksum_logical_range(&block, block.db_size)
575}
576
577pub fn compact() -> Result<(), StableMemoryError> {
578 reject_during_update()?;
579 ensure_page_map_layout()?;
580 let block = Superblock::load()?;
581 let table = read_page_table(&block)?;
582 let mut compacted = Vec::with_capacity(table.len());
583 let mut cursor = append_base()?;
584 let non_zero_pages = table.iter().filter(|offset| **offset != 0).count();
585 let data_bytes = u64::try_from(non_zero_pages)
586 .map_err(|_| StableMemoryError::OffsetOverflow)?
587 .checked_mul(page_size())
588 .ok_or(StableMemoryError::OffsetOverflow)?;
589 memory::ensure_capacity(checked_add(cursor, data_bytes)?)?;
590
591 for offset in table {
592 if offset == 0 {
593 compacted.push(0);
594 continue;
595 }
596 let mut page = zero_page();
597 memory::read_preallocated(offset, &mut page)?;
598 memory::write_preallocated(cursor, &page)?;
599 compacted.push(cursor);
600 cursor = checked_add(cursor, page_size())?;
601 }
602
603 let (root_offset, root_len) = write_segmented_tables(&compacted)?;
604 Superblock::store_page_map_without_tx(root_offset, root_len, block.db_size)?;
605 invalidate_read_cache();
606 Ok(())
607}
608
609pub fn storage_stats() -> Result<StorageStats, StableMemoryError> {
610 let block = Superblock::load()?;
611 let table = read_page_table(&block)?;
612 let non_zero_pages = u64::try_from(table.iter().filter(|offset| **offset != 0).count())
613 .map_err(|_| StableMemoryError::OffsetOverflow)?;
614 let segment_count = active_segment_count(&block)?;
615 let root_bytes = root_table_bytes(segment_count)?;
616 let segment_bytes = segment_count
617 .checked_mul(segment_table_bytes()?)
618 .ok_or(StableMemoryError::OffsetOverflow)?;
619 let page_table_bytes = checked_add(root_bytes, segment_bytes)?;
620 let active_bytes = SUPERBLOCK_SIZE
621 .checked_add(non_zero_pages.saturating_mul(page_size()))
622 .and_then(|value| value.checked_add(page_table_bytes))
623 .ok_or(StableMemoryError::OffsetOverflow)?;
624 let allocated_bytes = memory::size_pages()
625 .checked_mul(STABLE_PAGE_SIZE)
626 .ok_or(StableMemoryError::OffsetOverflow)?;
627 let orphan_bytes_estimate = allocated_bytes.saturating_sub(active_bytes);
628 let orphan_ratio_basis_points = if active_bytes == 0 {
629 0
630 } else {
631 orphan_bytes_estimate.saturating_mul(10_000) / active_bytes
632 };
633 Ok(StorageStats {
634 layout_version: block.layout_version,
635 page_count: active_page_count(&block)?,
636 page_table_bytes,
637 active_bytes,
638 allocated_bytes,
639 orphan_bytes_estimate,
640 orphan_ratio_basis_points,
641 compact_recommended: orphan_bytes_estimate >= active_bytes
642 && orphan_bytes_estimate >= COMPACT_MIN_ORPHAN_BYTES,
643 })
644}
645
646pub(crate) fn page_count_for_size(size: u64) -> Result<u64, StableMemoryError> {
647 Ok(size.div_ceil(page_size()))
648}
649
650#[cfg(test)]
651pub(crate) fn debug_root_table_for_tests() -> Result<Vec<u64>, StableMemoryError> {
652 let block = Superblock::load()?;
653 read_root_table(&block)
654}
655
656fn commit_overlay(overlay: Overlay, advance_tx: bool) -> Result<(), StableMemoryError> {
657 hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
658 let profile_enabled = commit_profile_enabled();
659 let block = Superblock::load()?;
660 let overlay_size = overlay.size();
661 let final_page_count = page_count_for_size(overlay_size)?;
662 let data_cursor = append_base()?;
663 debug_assert!(overlay
664 .dirty_pages()
665 .iter()
666 .all(|(page_no, _)| *page_no < final_page_count));
667 let dirty_pages = overlay.dirty_pages();
668 if let [(page_no, page)] = dirty_pages {
669 if overlay_size >= block.db_size
670 && *page_no < final_page_count
671 && final_page_count <= SEGMENT_PAGE_COUNT
672 {
673 let build_profile_start = commit_profile_start(profile_enabled);
674 let options = SinglePageCommitOptions {
675 advance_tx,
676 overlay_size,
677 data_cursor,
678 profile_enabled,
679 build_profile_start,
680 };
681 return commit_single_segment_page_overlay(&block, *page_no, page, options);
682 }
683 }
684
685 let final_segment_count = segment_count_for_pages(final_page_count)?;
686 let profile_start = commit_profile_start(profile_enabled);
687 let mut root = read_commit_root_table(&block)?;
688 commit_profile_record_load(profile_start);
689
690 let build_profile_start = commit_profile_start(profile_enabled);
691 let root_len =
692 usize::try_from(final_segment_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
693 if root.len() != root_len {
694 root.resize(root_len, 0);
695 }
696
697 if let [(page_no, page)] = dirty_pages {
698 if overlay_size >= block.db_size && *page_no < final_page_count {
699 let options = SinglePageCommitOptions {
700 advance_tx,
701 overlay_size,
702 data_cursor,
703 profile_enabled,
704 build_profile_start,
705 };
706 return commit_single_page_overlay(
707 &block,
708 final_segment_count,
709 root,
710 *page_no,
711 page,
712 options,
713 );
714 }
715 }
716
717 let mut segment_updates = BTreeMap::<u64, Vec<u64>>::new();
718 let mut page_cursor = data_cursor;
719
720 for (page_no, _) in dirty_pages {
721 if *page_no >= final_page_count {
722 continue;
723 }
724 let segment_no = segment_no(*page_no);
725 let index = segment_index(*page_no)?;
726 let table = load_segment_for_update(&block, &root, &mut segment_updates, segment_no)?;
727 table[index] = page_cursor;
728 page_cursor = checked_add(page_cursor, page_size())?;
729 }
730
731 if overlay_size < block.db_size {
732 clear_truncated_tail(&block, &root, &mut segment_updates, final_page_count)?;
733 }
734 commit_profile_record_build_segments(build_profile_start);
735
736 let mut table_cursor = page_cursor;
737 let root_entries_len = final_segment_count;
738 let segment_table_writes = segment_updates.len();
739 let segment_table_bytes = u64::try_from(segment_table_writes)
740 .map_err(|_| StableMemoryError::OffsetOverflow)?
741 .checked_mul(segment_table_bytes()?)
742 .ok_or(StableMemoryError::OffsetOverflow)?;
743 let page_table_bytes = checked_add(segment_table_bytes, root_table_bytes(root_entries_len)?)?;
744 let profile_start = commit_profile_start(profile_enabled);
745 memory::ensure_capacity(checked_add(table_cursor, page_table_bytes)?)?;
746 commit_profile_record_capacity(profile_start);
747
748 let profile_start = commit_profile_start(profile_enabled);
749 let mut cursor = data_cursor;
750 for (_, page) in dirty_pages {
751 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
752 write_commit_page(cursor, page, profile_enabled)?;
753 cursor = checked_add(cursor, page_size())?;
754 }
755 commit_profile_record_page_write(profile_start);
756
757 hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
758 let profile_start = commit_profile_start(profile_enabled);
759 for (segment_no, table) in segment_updates {
760 let offset = write_commit_segment_table_at(&table, &mut table_cursor, profile_enabled)?;
761 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
762 root[index] = offset;
763 }
764 let root_offset = write_commit_root_table_at(&root, &mut table_cursor, profile_enabled)?;
765 commit_profile_record_table_write(profile_start);
766
767 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
768 let profile_start = commit_profile_start(profile_enabled);
769 let result = store_commit_page_map(
770 advance_tx,
771 root_offset,
772 root_entries_len,
773 overlay_size,
774 profile_enabled,
775 );
776 commit_profile_record_superblock_store(profile_start);
777 result
778}
779
780#[derive(Clone, Copy)]
781struct SinglePageCommitOptions {
782 advance_tx: bool,
783 overlay_size: u64,
784 data_cursor: u64,
785 profile_enabled: bool,
786 build_profile_start: Option<u64>,
787}
788
789fn commit_single_page_overlay(
790 block: &Superblock,
791 final_segment_count: u64,
792 mut root: Vec<u64>,
793 page_no: u64,
794 page: &[u8],
795 options: SinglePageCommitOptions,
796) -> Result<(), StableMemoryError> {
797 let segment_no = segment_no(page_no);
798 let index = segment_index(page_no)?;
799 let mut table = read_commit_segment_table(block, &root, segment_no)?;
800 table[index] = options.data_cursor;
801 let page_cursor = checked_add(options.data_cursor, page_size())?;
802 commit_profile_record_build_segments(options.build_profile_start);
803
804 let root_entries_len = final_segment_count;
805 let page_table_bytes =
806 checked_add(segment_table_bytes()?, root_table_bytes(root_entries_len)?)?;
807 let profile_start = commit_profile_start(options.profile_enabled);
808 memory::ensure_capacity(checked_add(page_cursor, page_table_bytes)?)?;
809 commit_profile_record_capacity(profile_start);
810
811 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
812 let profile_start = commit_profile_start(options.profile_enabled);
813 write_commit_page(options.data_cursor, page, options.profile_enabled)?;
814 commit_profile_record_page_write(profile_start);
815
816 hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
817 let profile_start = commit_profile_start(options.profile_enabled);
818 let mut table_cursor = page_cursor;
819 let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
820 let root_offset = if final_segment_count == 1 {
821 write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?
822 } else {
823 let root_index =
824 usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
825 root[root_index] = offset;
826 write_commit_root_table_at(&root, &mut table_cursor, options.profile_enabled)?
827 };
828 commit_profile_record_table_write(profile_start);
829
830 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
831 let profile_start = commit_profile_start(options.profile_enabled);
832 let result = store_commit_page_map(
833 options.advance_tx,
834 root_offset,
835 root_entries_len,
836 options.overlay_size,
837 options.profile_enabled,
838 );
839 commit_profile_record_superblock_store(profile_start);
840 if result.is_ok() {
841 cache_commit_segment_table(segment_no, offset, table);
842 }
843 result
844}
845
846fn commit_single_segment_page_overlay(
847 block: &Superblock,
848 page_no: u64,
849 page: &[u8],
850 options: SinglePageCommitOptions,
851) -> Result<(), StableMemoryError> {
852 let index = segment_index(page_no)?;
853 let root = read_commit_root_table(block)?;
854 let mut table = read_commit_segment_table(block, &root, 0)?;
855 table[index] = options.data_cursor;
856 let page_cursor = checked_add(options.data_cursor, page_size())?;
857 commit_profile_record_build_segments(options.build_profile_start);
858
859 let profile_start = commit_profile_start(options.profile_enabled);
860 memory::ensure_capacity(checked_add(page_cursor, SINGLE_SEGMENT_PAGE_TABLE_BYTES)?)?;
861 commit_profile_record_capacity(profile_start);
862
863 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
864 let profile_start = commit_profile_start(options.profile_enabled);
865 memory::write_prechecked(options.data_cursor, page)?;
866 commit_profile_record_page_write(profile_start);
867
868 hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
869 let profile_start = commit_profile_start(options.profile_enabled);
870 let mut table_cursor = page_cursor;
871 let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
872 let root_offset =
873 write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?;
874 commit_profile_record_table_write(profile_start);
875
876 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
877 let profile_start = commit_profile_start(options.profile_enabled);
878 let result = store_commit_page_map(
879 options.advance_tx,
880 root_offset,
881 1,
882 options.overlay_size,
883 options.profile_enabled,
884 );
885 commit_profile_record_superblock_store(profile_start);
886 if result.is_ok() {
887 cache_commit_segment_table(0, offset, table);
888 }
889 result
890}
891
892#[cfg(any(test, debug_assertions, feature = "bench-profile"))]
893#[inline(always)]
894fn commit_profile_enabled() -> bool {
895 crate::read_metrics::metrics_enabled()
896}
897
898#[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
899#[inline(always)]
900fn commit_profile_enabled() -> bool {
901 false
902}
903
904#[inline(always)]
905fn commit_profile_start(enabled: bool) -> Option<u64> {
906 if enabled {
907 Some(crate::read_metrics::instruction_counter())
908 } else {
909 None
910 }
911}
912
913macro_rules! commit_profile_recorder {
914 ($name:ident, $record:ident) => {
915 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
916 #[inline(always)]
917 fn $name(start: Option<u64>) {
918 if let Some(start) = start {
919 crate::read_metrics::$record(
920 crate::read_metrics::instruction_counter().saturating_sub(start),
921 );
922 }
923 }
924
925 #[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
926 #[inline(always)]
927 fn $name(_start: Option<u64>) {}
928 };
929}
930
931commit_profile_recorder!(commit_profile_record_load, record_commit_load);
932commit_profile_recorder!(
933 commit_profile_record_build_segments,
934 record_commit_build_segments
935);
936commit_profile_recorder!(commit_profile_record_capacity, record_commit_capacity);
937commit_profile_recorder!(commit_profile_record_page_write, record_commit_page_write);
938commit_profile_recorder!(commit_profile_record_table_write, record_commit_table_write);
939commit_profile_recorder!(
940 commit_profile_record_superblock_store,
941 record_commit_superblock_store
942);
943
944#[inline(always)]
945fn write_commit_page(
946 offset: u64,
947 page: &[u8],
948 profile_enabled: bool,
949) -> Result<(), StableMemoryError> {
950 if profile_enabled {
951 memory::write_prechecked(offset, page)
952 } else {
953 memory::write_prechecked_unmetered(offset, page)
954 }
955}
956
957fn store_commit_page_map(
958 advance_tx: bool,
959 root_offset: u64,
960 root_entries_len: u64,
961 overlay_size: u64,
962 profile_enabled: bool,
963) -> Result<(), StableMemoryError> {
964 match (advance_tx, profile_enabled) {
965 (true, true) => Superblock::commit_page_map(root_offset, root_entries_len, overlay_size),
966 (true, false) => {
967 Superblock::commit_page_map_unmetered(root_offset, root_entries_len, overlay_size)
968 }
969 (false, true) => {
970 Superblock::store_page_map_without_tx(root_offset, root_entries_len, overlay_size)
971 }
972 (false, false) => Superblock::store_page_map_without_tx_unmetered(
973 root_offset,
974 root_entries_len,
975 overlay_size,
976 ),
977 }
978}
979
980fn load_segment_for_update<'a>(
981 block: &Superblock,
982 root: &[u64],
983 updates: &'a mut BTreeMap<u64, Vec<u64>>,
984 segment_no: u64,
985) -> Result<&'a mut Vec<u64>, StableMemoryError> {
986 match updates.entry(segment_no) {
987 std::collections::btree_map::Entry::Occupied(entry) => Ok(entry.into_mut()),
988 std::collections::btree_map::Entry::Vacant(entry) => {
989 let table = read_segment_table(block, root, segment_no)?;
990 Ok(entry.insert(table))
991 }
992 }
993}
994
995fn clear_truncated_tail(
996 block: &Superblock,
997 root: &[u64],
998 updates: &mut BTreeMap<u64, Vec<u64>>,
999 final_page_count: u64,
1000) -> Result<(), StableMemoryError> {
1001 let old_page_count = active_page_count(block)?;
1002 if final_page_count >= old_page_count || final_page_count == 0 {
1003 return Ok(());
1004 }
1005 let boundary_segment = segment_no(final_page_count);
1006 if boundary_segment >= segment_count_for_pages(final_page_count)? {
1007 return Ok(());
1008 }
1009 let start = segment_index(final_page_count)?;
1010 if start == 0 {
1011 return Ok(());
1012 }
1013 let table = load_segment_for_update(block, root, updates, boundary_segment)?;
1014 table[start..].fill(0);
1015 Ok(())
1016}
1017
1018fn reject_during_update() -> Result<(), StableMemoryError> {
1019 if overlay::is_active() {
1020 Err(StableMemoryError::UpdateInProgress)
1021 } else {
1022 Ok(())
1023 }
1024}
1025
1026fn read_logical_range(
1027 block: &Superblock,
1028 offset: u64,
1029 dst: &mut [u8],
1030) -> Result<(), StableMemoryError> {
1031 if dst.is_empty() {
1032 return Ok(());
1033 }
1034 let in_page =
1035 usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1036 if dst.len() <= page_len() - in_page {
1037 return read_logical_page_slice(block, offset / page_size(), in_page, dst);
1038 }
1039
1040 let mut copied_total = 0_usize;
1041 while copied_total < dst.len() {
1042 let absolute = checked_add(
1043 offset,
1044 u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1045 )?;
1046 let page_no = absolute / page_size();
1047 let in_page = usize::try_from(absolute % page_size())
1048 .map_err(|_| StableMemoryError::OffsetOverflow)?;
1049 let copied = (page_len() - in_page).min(dst.len() - copied_total);
1050 read_logical_page_slice(
1051 block,
1052 page_no,
1053 in_page,
1054 &mut dst[copied_total..copied_total + copied],
1055 )?;
1056 copied_total += copied;
1057 }
1058 Ok(())
1059}
1060
1061fn read_logical_range_with_page_cache(
1062 block: &Superblock,
1063 offset: u64,
1064 dst: &mut [u8],
1065 page_offsets: &mut PageOffsetCache,
1066) -> Result<(), StableMemoryError> {
1067 let in_page =
1068 usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1069 if dst.len() <= page_len() - in_page {
1070 return read_logical_page_slice_with_page_cache(
1071 block,
1072 offset / page_size(),
1073 in_page,
1074 dst,
1075 page_offsets,
1076 );
1077 }
1078
1079 let mut copied_total = 0_usize;
1080 while copied_total < dst.len() {
1081 let absolute = checked_add(
1082 offset,
1083 u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1084 )?;
1085 let page_no = absolute / page_size();
1086 let in_page = usize::try_from(absolute % page_size())
1087 .map_err(|_| StableMemoryError::OffsetOverflow)?;
1088 let copied = (page_len() - in_page).min(dst.len() - copied_total);
1089 read_logical_page_slice_with_page_cache(
1090 block,
1091 page_no,
1092 in_page,
1093 &mut dst[copied_total..copied_total + copied],
1094 page_offsets,
1095 )?;
1096 copied_total += copied;
1097 }
1098 Ok(())
1099}
1100
1101fn read_logical_page_slice(
1102 block: &Superblock,
1103 page_no: u64,
1104 in_page: usize,
1105 dst: &mut [u8],
1106) -> Result<(), StableMemoryError> {
1107 let physical = page_offset_for(block, page_no)?;
1108 if physical == 0 {
1109 dst.fill(0);
1110 return Ok(());
1111 }
1112 let stable_offset = checked_add(
1113 physical,
1114 u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1115 )?;
1116 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1117 crate::read_metrics::record_stable_data_read(dst.len());
1118 memory::read_preallocated(stable_offset, dst)
1119}
1120
1121#[inline(always)]
1122fn read_logical_page_slice_with_page_cache(
1123 block: &Superblock,
1124 page_no: u64,
1125 in_page: usize,
1126 dst: &mut [u8],
1127 page_offsets: &mut PageOffsetCache,
1128) -> Result<(), StableMemoryError> {
1129 if dst.len() < page_len() && page_offsets.copy_page_slice(page_no, in_page, dst) {
1130 return Ok(());
1131 }
1132 let physical = match page_offsets.get(page_no) {
1133 Some(physical) => physical,
1134 None => {
1135 let physical = if block.page_table_offset == 0 {
1136 0
1137 } else {
1138 cached_page_offset_for(block, page_no)?
1139 };
1140 page_offsets.insert(page_no, physical);
1141 physical
1142 }
1143 };
1144 if physical == 0 {
1145 dst.fill(0);
1146 return Ok(());
1147 }
1148 if in_page == 0 && dst.len() == page_len() {
1149 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1150 crate::read_metrics::record_stable_data_read(dst.len());
1151 return memory::read_preallocated(physical, dst);
1152 }
1153 if dst.len() < page_len() {
1154 let mut page = zero_page();
1155 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1156 crate::read_metrics::record_stable_data_read(page.len());
1157 memory::read_preallocated(physical, &mut page)?;
1158 let end = in_page + dst.len();
1159 dst.copy_from_slice(&page[in_page..end]);
1160 page_offsets.insert_page(page_no, page);
1161 return Ok(());
1162 }
1163 let stable_offset = checked_add(
1164 physical,
1165 u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1166 )?;
1167 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1168 crate::read_metrics::record_stable_data_read(dst.len());
1169 memory::read_preallocated(stable_offset, dst)
1170}
1171
1172fn page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1173 if page_no >= active_page_count(block)? || block.page_table_offset == 0 {
1174 return Ok(0);
1175 }
1176 cached_page_offset_for(block, page_no)
1177}
1178
1179fn read_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1180 let root = read_root_table(block)?;
1181 let count = active_page_count(block)?;
1182 let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1183 let mut entries = Vec::with_capacity(capacity);
1184 for segment_no in 0..segment_count_for_pages(count)? {
1185 let table = read_segment_table(block, &root, segment_no)?;
1186 for entry in table {
1187 if entries.len() == capacity {
1188 break;
1189 }
1190 entries.push(entry);
1191 }
1192 }
1193 Ok(entries)
1194}
1195
1196fn cached_page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1197 let context = memory::active_context_id()?;
1198 let key = read_cache_key(block);
1199 let segment_no = segment_no(page_no);
1200 let index = segment_index(page_no)?;
1201 READ_TABLE_CACHE.with(|cache| {
1202 let mut caches = cache.borrow_mut();
1203 let cache = match read_table_cache_index(&caches, context) {
1204 Some(index) => &mut caches[index].1,
1205 None => {
1206 caches.push((context, ReadTableCache::new()));
1207 &mut caches
1208 .last_mut()
1209 .ok_or(StableMemoryError::OffsetOverflow)?
1210 .1
1211 }
1212 };
1213 cache.ensure_key(key);
1214 if cache.root.is_empty() {
1215 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1216 crate::read_metrics::record_page_table_root_miss();
1217 cache.root = read_root_table(block)?;
1218 } else {
1219 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1220 crate::read_metrics::record_page_table_root_hit();
1221 }
1222 let root_index =
1223 usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1224 let segment_offset = cache.root[root_index];
1225 if segment_offset == 0 {
1226 return Ok(0);
1227 }
1228 if let Some(offset) = cache.segment_page_offset(segment_no, index) {
1229 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1230 crate::read_metrics::record_page_table_segment_hit();
1231 return Ok(offset);
1232 }
1233 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1234 crate::read_metrics::record_page_table_segment_miss();
1235 let table = read_segment_table_at(segment_offset)?;
1236 let offset = table[index];
1237 cache.insert_segment(segment_no, table);
1238 Ok(offset)
1239 })
1240}
1241
1242fn read_table_cache_index(
1243 caches: &[(ContextId, ReadTableCache)],
1244 context: ContextId,
1245) -> Option<usize> {
1246 caches
1247 .iter()
1248 .position(|(stored_context, _)| *stored_context == context)
1249}
1250
1251fn read_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1252 if block.page_count == 0 {
1253 return Ok(Vec::new());
1254 }
1255 let entries_len =
1256 usize::try_from(block.page_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1257 read_u64_table_at(block.page_table_offset, entries_len)
1258}
1259
1260fn read_commit_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1261 read_root_table(block)
1262}
1263
1264fn read_segment_table(
1265 _block: &Superblock,
1266 root: &[u64],
1267 segment_no: u64,
1268) -> Result<Vec<u64>, StableMemoryError> {
1269 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1270 let Some(offset) = root.get(index).copied() else {
1271 return Ok(vec![0_u64; segment_page_count_usize()]);
1272 };
1273 if offset == 0 {
1274 return Ok(vec![0_u64; segment_page_count_usize()]);
1275 }
1276 read_segment_table_at(offset)
1277}
1278
1279fn read_commit_segment_table(
1280 _block: &Superblock,
1281 root: &[u64],
1282 segment_no: u64,
1283) -> Result<Vec<u64>, StableMemoryError> {
1284 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1285 let Some(offset) = root.get(index).copied() else {
1286 return Ok(vec![0_u64; segment_page_count_usize()]);
1287 };
1288 if offset == 0 {
1289 return Ok(vec![0_u64; segment_page_count_usize()]);
1290 }
1291 read_commit_segment_table_at(segment_no, offset)
1292}
1293
1294fn read_commit_segment_table_at(
1295 segment_no: u64,
1296 offset: u64,
1297) -> Result<Vec<u64>, StableMemoryError> {
1298 if offset == 0 {
1299 return Ok(vec![0_u64; segment_page_count_usize()]);
1300 }
1301 if let Some(table) = take_commit_segment_table(segment_no, offset) {
1302 return Ok(table);
1303 }
1304 read_segment_table_at(offset)
1305}
1306
1307fn take_commit_segment_table(segment_no: u64, segment_offset: u64) -> Option<Vec<u64>> {
1308 let Ok(context) = memory::active_context_id() else {
1309 return None;
1310 };
1311 COMMIT_SEGMENT_CACHE.with(|cache| {
1312 let mut cache = cache.borrow_mut();
1313 if cache.len() == 1 {
1314 let (stored_context, cached) = &cache[0];
1315 if *stored_context == context
1316 && cached.segment_no == segment_no
1317 && cached.segment_offset == segment_offset
1318 {
1319 return cache.pop().map(|(_, cached)| cached.table);
1320 }
1321 return None;
1322 }
1323 cache
1324 .iter()
1325 .position(|(stored_context, cached)| {
1326 *stored_context == context
1327 && cached.segment_no == segment_no
1328 && cached.segment_offset == segment_offset
1329 })
1330 .map(|position| cache.remove(position).1.table)
1331 })
1332}
1333
1334fn cache_commit_segment_table(segment_no: u64, segment_offset: u64, table: Vec<u64>) {
1335 let Ok(context) = memory::active_context_id() else {
1336 return;
1337 };
1338 COMMIT_SEGMENT_CACHE.with(|cache| {
1339 let mut cache = cache.borrow_mut();
1340 if cache.is_empty() {
1341 cache.push((
1342 context,
1343 CommitSegmentCache {
1344 segment_no,
1345 segment_offset,
1346 table,
1347 },
1348 ));
1349 return;
1350 }
1351 if cache.len() == 1 {
1352 let (stored_context, cached) = &mut cache[0];
1353 if *stored_context == context {
1354 cached.segment_no = segment_no;
1355 cached.segment_offset = segment_offset;
1356 cached.table = table;
1357 return;
1358 }
1359 } else if let Some((_, cached)) = cache
1360 .iter_mut()
1361 .find(|(stored_context, _)| *stored_context == context)
1362 {
1363 cached.segment_no = segment_no;
1364 cached.segment_offset = segment_offset;
1365 cached.table = table;
1366 return;
1367 }
1368 cache.push((
1369 context,
1370 CommitSegmentCache {
1371 segment_no,
1372 segment_offset,
1373 table,
1374 },
1375 ));
1376 });
1377}
1378
1379fn read_segment_table_at(offset: u64) -> Result<Vec<u64>, StableMemoryError> {
1380 read_u64_table_at(offset, segment_page_count_usize())
1381}
1382
1383fn write_segmented_tables(entries: &[u64]) -> Result<(u64, u64), StableMemoryError> {
1384 if entries.is_empty() {
1385 return Ok((0, 0));
1386 }
1387 let root_len = segment_count_for_pages(entries_len_u64(entries)?)?;
1388 let mut cursor = append_base()?;
1389 let segment_bytes = root_len
1390 .checked_mul(segment_table_bytes()?)
1391 .ok_or(StableMemoryError::OffsetOverflow)?;
1392 let page_table_bytes = checked_add(segment_bytes, root_table_bytes(root_len)?)?;
1393 memory::ensure_capacity(checked_add(cursor, page_table_bytes)?)?;
1394 let mut root = Vec::with_capacity(
1395 usize::try_from(root_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1396 );
1397 for segment_no in 0..root_len {
1398 let start = usize::try_from(
1399 segment_no
1400 .checked_mul(SEGMENT_PAGE_COUNT)
1401 .ok_or(StableMemoryError::OffsetOverflow)?,
1402 )
1403 .map_err(|_| StableMemoryError::OffsetOverflow)?;
1404 let mut table = vec![0_u64; segment_page_count_usize()];
1405 for (offset, entry) in entries[start..]
1406 .iter()
1407 .take(segment_page_count_usize())
1408 .enumerate()
1409 {
1410 table[offset] = *entry;
1411 }
1412 root.push(write_segment_table_at(&table, &mut cursor)?);
1413 }
1414 let root_offset = write_root_table_at(&root, &mut cursor)?;
1415 Ok((root_offset, entries_len_u64(&root)?))
1416}
1417
1418#[inline(always)]
1419fn write_segment_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1420 if entries.len() == segment_page_count_usize() {
1421 return write_u64_table_at(entries, cursor);
1422 }
1423
1424 let mut table = vec![0_u64; segment_page_count_usize()];
1425 for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1426 table[index] = *entry;
1427 }
1428 write_u64_table_at(&table, cursor)
1429}
1430
1431fn write_root_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1432 write_u64_table_at(entries, cursor)
1433}
1434
1435#[inline(always)]
1436fn write_commit_segment_table_at(
1437 entries: &[u64],
1438 cursor: &mut u64,
1439 profile_enabled: bool,
1440) -> Result<u64, StableMemoryError> {
1441 if profile_enabled {
1442 write_segment_table_at(entries, cursor)
1443 } else {
1444 write_segment_table_at_unmetered(entries, cursor)
1445 }
1446}
1447
1448#[inline(always)]
1449fn write_commit_root_table_at(
1450 entries: &[u64],
1451 cursor: &mut u64,
1452 profile_enabled: bool,
1453) -> Result<u64, StableMemoryError> {
1454 if profile_enabled {
1455 write_root_table_at(entries, cursor)
1456 } else {
1457 write_u64_table_at_unmetered(entries, cursor)
1458 }
1459}
1460
1461fn write_segment_table_at_unmetered(
1462 entries: &[u64],
1463 cursor: &mut u64,
1464) -> Result<u64, StableMemoryError> {
1465 if entries.len() == segment_page_count_usize() {
1466 return write_u64_table_at_unmetered(entries, cursor);
1467 }
1468
1469 let mut table = vec![0_u64; segment_page_count_usize()];
1470 for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1471 table[index] = *entry;
1472 }
1473 write_u64_table_at_unmetered(&table, cursor)
1474}
1475
1476fn write_u64_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1477 if entries.is_empty() {
1478 return Ok(0);
1479 }
1480 let offset = *cursor;
1481 let byte_len = entries
1482 .len()
1483 .checked_mul(8)
1484 .ok_or(StableMemoryError::OffsetOverflow)?;
1485 #[cfg(target_endian = "little")]
1486 {
1487 let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1489 memory::write_prechecked(offset, bytes)?;
1490 *cursor = checked_add(
1491 offset,
1492 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1493 )?;
1494 Ok(offset)
1495 }
1496
1497 #[cfg(not(target_endian = "little"))]
1498 {
1499 let mut bytes = vec![0_u8; byte_len];
1500 for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1501 chunk.copy_from_slice(&entry.to_le_bytes());
1502 }
1503 memory::write_prechecked(offset, &bytes)?;
1504 *cursor = checked_add(
1505 offset,
1506 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1507 )?;
1508 Ok(offset)
1509 }
1510}
1511
1512fn read_u64_table_at(offset: u64, entries_len: usize) -> Result<Vec<u64>, StableMemoryError> {
1513 if entries_len == 0 {
1514 return Ok(Vec::new());
1515 }
1516 let byte_len = entries_len
1517 .checked_mul(8)
1518 .ok_or(StableMemoryError::OffsetOverflow)?;
1519 #[cfg(target_endian = "little")]
1520 {
1521 let mut entries = Vec::<MaybeUninit<u64>>::with_capacity(entries_len);
1522 unsafe {
1523 entries.set_len(entries_len);
1524 }
1525 let bytes =
1528 unsafe { std::slice::from_raw_parts_mut(entries.as_mut_ptr().cast::<u8>(), byte_len) };
1529 memory::read_preallocated(offset, bytes)?;
1530 let ptr = entries.as_mut_ptr().cast::<u64>();
1531 let len = entries.len();
1532 let capacity = entries.capacity();
1533 std::mem::forget(entries);
1534 unsafe { Ok(Vec::from_raw_parts(ptr, len, capacity)) }
1537 }
1538
1539 #[cfg(not(target_endian = "little"))]
1540 {
1541 let mut bytes = vec![0_u8; byte_len];
1542 memory::read_preallocated(offset, &mut bytes)?;
1543 decode_u64_table(&bytes)
1544 }
1545}
1546
1547fn write_u64_table_at_unmetered(
1548 entries: &[u64],
1549 cursor: &mut u64,
1550) -> Result<u64, StableMemoryError> {
1551 if entries.is_empty() {
1552 return Ok(0);
1553 }
1554 let offset = *cursor;
1555 let byte_len = entries
1556 .len()
1557 .checked_mul(8)
1558 .ok_or(StableMemoryError::OffsetOverflow)?;
1559 #[cfg(target_endian = "little")]
1560 {
1561 let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1563 memory::write_prechecked_unmetered(offset, bytes)?;
1564 *cursor = checked_add(
1565 offset,
1566 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1567 )?;
1568 Ok(offset)
1569 }
1570
1571 #[cfg(not(target_endian = "little"))]
1572 {
1573 let mut bytes = vec![0_u8; byte_len];
1574 for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1575 chunk.copy_from_slice(&entry.to_le_bytes());
1576 }
1577 memory::write_prechecked_unmetered(offset, &bytes)?;
1578 *cursor = checked_add(
1579 offset,
1580 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1581 )?;
1582 Ok(offset)
1583 }
1584}
1585
1586#[cfg(not(target_endian = "little"))]
1587fn decode_u64_table(bytes: &[u8]) -> Result<Vec<u64>, StableMemoryError> {
1588 if !bytes.len().is_multiple_of(8) {
1589 return Err(StableMemoryError::OffsetOverflow);
1590 }
1591 let mut entries = Vec::with_capacity(bytes.len() / 8);
1592 for chunk in bytes.chunks_exact(8) {
1593 entries.push(u64::from_le_bytes([
1594 chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
1595 ]));
1596 }
1597 Ok(entries)
1598}
1599
1600fn imported_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1601 let count = page_count_for_size(block.import_total_size)?;
1602 let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1603 let mut entries = Vec::with_capacity(capacity);
1604 for page_no in 0..count {
1605 entries.push(checked_add(
1606 block.import_base_offset,
1607 page_no
1608 .checked_mul(page_size())
1609 .ok_or(StableMemoryError::OffsetOverflow)?,
1610 )?);
1611 }
1612 Ok(entries)
1613}
1614
1615fn checksum_logical_range(block: &Superblock, len: u64) -> Result<u64, StableMemoryError> {
1616 let mut offset = 0_u64;
1617 let mut hash = fnv1a64(&[]);
1618 while offset < len {
1619 let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1620 let copied_len =
1621 usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1622 let mut bytes = vec![0_u8; copied_len];
1623 read_logical_range(block, offset, &mut bytes)?;
1624 hash = fold_fnv1a64(hash, &bytes);
1625 offset += chunk_len;
1626 }
1627 Ok(hash)
1628}
1629
1630fn checksum_physical_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
1631 let mut offset = 0_u64;
1632 let mut hash = fnv1a64(&[]);
1633 while offset < len {
1634 let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1635 let copied_len =
1636 usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1637 let mut bytes = vec![0_u8; copied_len];
1638 memory::read_preallocated(checked_add(base_offset, offset)?, &mut bytes)?;
1639 hash = fold_fnv1a64(hash, &bytes);
1640 offset += chunk_len;
1641 }
1642 Ok(hash)
1643}
1644
1645fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
1646 block.flags &= !FLAG_IMPORTING;
1647 block.import_expected_checksum = 0;
1648 block.import_written_until = 0;
1649 block.import_total_size = 0;
1650 block.import_base_offset = 0;
1651 block.store()?;
1652 invalidate_read_cache();
1653 Ok(())
1654}
1655
1656fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
1657 checked_add(block.import_base_offset, offset)
1658}
1659
1660fn active_page_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1661 page_count_for_size(block.db_size)
1662}
1663
1664fn active_segment_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1665 Ok(block.page_count)
1666}
1667
1668fn read_cache_key(block: &Superblock) -> ReadCacheKey {
1669 ReadCacheKey {
1670 page_table_offset: block.page_table_offset,
1671 page_count: block.page_count,
1672 db_size: block.db_size,
1673 last_tx_id: block.last_tx_id,
1674 }
1675}
1676
1677fn segment_count_for_pages(page_count: u64) -> Result<u64, StableMemoryError> {
1678 Ok(page_count.div_ceil(SEGMENT_PAGE_COUNT))
1679}
1680
1681fn segment_no(page_no: u64) -> u64 {
1682 page_no / SEGMENT_PAGE_COUNT
1683}
1684
1685fn segment_index(page_no: u64) -> Result<usize, StableMemoryError> {
1686 usize::try_from(page_no % SEGMENT_PAGE_COUNT).map_err(|_| StableMemoryError::OffsetOverflow)
1687}
1688
1689fn segment_page_count_usize() -> usize {
1690 usize::try_from(SEGMENT_PAGE_COUNT).expect("segment page count fits usize")
1691}
1692
1693fn segment_table_len() -> usize {
1694 segment_page_count_usize() * 8
1695}
1696
1697fn segment_table_bytes() -> Result<u64, StableMemoryError> {
1698 u64::try_from(segment_table_len()).map_err(|_| StableMemoryError::OffsetOverflow)
1699}
1700
1701fn root_table_bytes(entry_count: u64) -> Result<u64, StableMemoryError> {
1702 entry_count
1703 .checked_mul(PAGE_TABLE_ENTRY_LEN)
1704 .ok_or(StableMemoryError::OffsetOverflow)
1705}
1706
1707fn entries_len_u64<T>(entries: &[T]) -> Result<u64, StableMemoryError> {
1708 u64::try_from(entries.len()).map_err(|_| StableMemoryError::OffsetOverflow)
1709}
1710
1711fn append_base() -> Result<u64, StableMemoryError> {
1712 memory::size_pages()
1713 .checked_mul(STABLE_PAGE_SIZE)
1714 .ok_or(StableMemoryError::OffsetOverflow)
1715}
1716
1717fn page_size() -> u64 {
1718 u64::from(SQLITE_PAGE_SIZE)
1719}
1720
1721fn page_len() -> usize {
1722 usize::try_from(SQLITE_PAGE_SIZE).expect("SQLite page size fits usize")
1723}
1724
1725fn zero_page() -> Vec<u8> {
1726 vec![0_u8; page_len()]
1727}
1728
1729fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
1730 left.checked_add(right)
1731 .ok_or(StableMemoryError::OffsetOverflow)
1732}
1733
1734fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
1735 for byte in bytes {
1736 hash ^= u64::from(*byte);
1737 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
1738 }
1739 hash
1740}
1741
1742#[cfg(test)]
1743fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1744 let Ok(context) = memory::active_context_id() else {
1745 return Ok(());
1746 };
1747 FAILPOINTS.with(|slot| {
1748 let mut slot = slot.borrow_mut();
1749 if slot.get(&context).copied() == Some(failpoint) {
1750 slot.remove(&context);
1751 Err(StableMemoryError::Failpoint(failpoint.name()))
1752 } else {
1753 Ok(())
1754 }
1755 })
1756}
1757
1758#[cfg(not(test))]
1759fn hit_failpoint(_failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1760 Ok(())
1761}
1762
1763#[cfg(test)]
1764impl StableBlobFailpoint {
1765 fn name(self) -> &'static str {
1766 match self {
1767 Self::OverlayWrite => "before overlay write",
1768 Self::OverlayTruncate => "before overlay truncate",
1769 Self::CommitCapacity => "before commit capacity",
1770 Self::CommitChunkWrite => "before commit page write",
1771 Self::CommitPageTableWrite => "before commit page table write",
1772 Self::CommitSuperblockStore => "before commit superblock store",
1773 }
1774 }
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779 use super::*;
1780 use proptest::prelude::*;
1781 use proptest::test_runner::{Config, TestRunner};
1782 use std::collections::BTreeSet;
1783
1784 #[test]
1785 fn layout_math_matches_expected_boundaries() {
1786 assert_eq!(page_count_for_size(0).unwrap(), 0);
1787 assert_eq!(page_count_for_size(1).unwrap(), 1);
1788 assert_eq!(page_count_for_size(page_size()).unwrap(), 1);
1789 assert_eq!(page_count_for_size(page_size() + 1).unwrap(), 2);
1790
1791 assert_eq!(segment_count_for_pages(0).unwrap(), 0);
1792 assert_eq!(segment_count_for_pages(1).unwrap(), 1);
1793 assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT).unwrap(), 1);
1794 assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT + 1).unwrap(), 2);
1795
1796 assert_eq!(segment_no(SEGMENT_PAGE_COUNT), 1);
1797 assert_eq!(segment_index(SEGMENT_PAGE_COUNT - 1).unwrap(), 255);
1798 assert_eq!(segment_index(SEGMENT_PAGE_COUNT).unwrap(), 0);
1799 assert_eq!(root_table_bytes(2).unwrap(), 16);
1800 }
1801
1802 #[test]
1803 fn layout_math_rejects_u64_max_overflow_boundaries() {
1804 assert!(matches!(
1805 root_table_bytes(u64::MAX),
1806 Err(StableMemoryError::OffsetOverflow)
1807 ));
1808 assert!(matches!(
1809 checked_add(u64::MAX, 1),
1810 Err(StableMemoryError::OffsetOverflow)
1811 ));
1812
1813 let mut block = Superblock::fresh();
1814 block.import_base_offset = u64::MAX;
1815 assert!(matches!(
1816 import_offset(&block, 1),
1817 Err(StableMemoryError::OffsetOverflow)
1818 ));
1819
1820 block.import_base_offset = u64::MAX - page_size() + 1;
1821 block.import_total_size = page_size() + 1;
1822 assert!(matches!(
1823 imported_page_table(&block),
1824 Err(StableMemoryError::OffsetOverflow)
1825 ));
1826 }
1827
1828 #[test]
1829 fn pbt_layout_math_matches_verus_model() {
1830 let mut runner = TestRunner::new(Config {
1831 cases: 512,
1832 ..Config::default()
1833 });
1834
1835 runner
1836 .run(
1837 &(
1838 boundary_size_strategy(),
1839 boundary_page_strategy(),
1840 boundary_entry_strategy(),
1841 ),
1842 |(size, page_no, entries)| {
1843 let page_count = page_count_for_size(size).unwrap();
1844 let page_size = u128::from(page_size());
1845 if size == 0 {
1846 prop_assert_eq!(page_count, 0);
1847 } else {
1848 prop_assert!(u128::from(page_count - 1) * page_size < u128::from(size));
1849 prop_assert!(u128::from(size) <= u128::from(page_count) * page_size);
1850 }
1851
1852 let segment_count = segment_count_for_pages(page_count).unwrap();
1853 if page_count == 0 {
1854 prop_assert_eq!(segment_count, 0);
1855 } else {
1856 prop_assert!(
1857 u128::from(segment_count - 1) * u128::from(SEGMENT_PAGE_COUNT)
1858 < u128::from(page_count)
1859 );
1860 prop_assert!(
1861 u128::from(page_count)
1862 <= u128::from(segment_count) * u128::from(SEGMENT_PAGE_COUNT)
1863 );
1864 }
1865
1866 let index = segment_index(page_no).unwrap();
1867 prop_assert!(index < segment_page_count_usize());
1868 prop_assert_eq!(
1869 u128::from(segment_no(page_no)) * u128::from(SEGMENT_PAGE_COUNT)
1870 + index as u128,
1871 u128::from(page_no)
1872 );
1873
1874 match root_table_bytes(entries) {
1875 Ok(bytes) => prop_assert_eq!(bytes, entries * PAGE_TABLE_ENTRY_LEN),
1876 Err(StableMemoryError::OffsetOverflow) => {
1877 prop_assert!(entries.checked_mul(PAGE_TABLE_ENTRY_LEN).is_none());
1878 }
1879 Err(error) => return Err(TestCaseError::fail(error.to_string())),
1880 }
1881 Ok(())
1882 },
1883 )
1884 .unwrap();
1885 }
1886
1887 fn boundary_size_strategy() -> impl Strategy<Value = u64> {
1888 let page = page_size();
1889 let segment_bytes = SEGMENT_PAGE_COUNT * page;
1890 prop_oneof![
1891 any::<u64>(),
1892 prop::sample::select(boundary_values(&[
1893 0,
1894 1,
1895 page - 1,
1896 page,
1897 page + 1,
1898 segment_bytes - 1,
1899 segment_bytes,
1900 segment_bytes + 1,
1901 u64::MAX,
1902 ])),
1903 ]
1904 }
1905
1906 fn boundary_page_strategy() -> impl Strategy<Value = u64> {
1907 prop_oneof![
1908 any::<u64>(),
1909 prop::sample::select(boundary_values(&[
1910 0,
1911 1,
1912 SEGMENT_PAGE_COUNT - 1,
1913 SEGMENT_PAGE_COUNT,
1914 SEGMENT_PAGE_COUNT + 1,
1915 u64::MAX,
1916 ])),
1917 ]
1918 }
1919
1920 fn boundary_entry_strategy() -> impl Strategy<Value = u64> {
1921 let max_without_overflow = u64::MAX / PAGE_TABLE_ENTRY_LEN;
1922 prop_oneof![
1923 any::<u64>(),
1924 prop::sample::select(boundary_values(&[
1925 0,
1926 1,
1927 SEGMENT_PAGE_COUNT - 1,
1928 SEGMENT_PAGE_COUNT,
1929 SEGMENT_PAGE_COUNT + 1,
1930 max_without_overflow - 1,
1931 max_without_overflow,
1932 max_without_overflow + 1,
1933 u64::MAX - 1,
1934 u64::MAX,
1935 ])),
1936 ]
1937 }
1938
1939 fn boundary_values(values: &[u64]) -> Vec<u64> {
1940 values
1941 .iter()
1942 .flat_map(|value| [value.saturating_sub(1), *value, value.saturating_add(1)])
1943 .collect()
1944 }
1945
1946 #[test]
1947 fn fnv_fold_matches_one_pass_for_multiple_partitions() {
1948 let bytes: Vec<u8> = (0..97)
1949 .map(|index| (index as u8).wrapping_mul(37).wrapping_add(11))
1950 .collect();
1951 let expected = fnv1a64(&bytes);
1952
1953 for split in [0_usize, 1, 2, 7, 31, 64, bytes.len()] {
1954 let split = split.min(bytes.len());
1955 let mut hash = fnv1a64(&[]);
1956 hash = fold_fnv1a64(hash, &bytes[..split]);
1957 hash = fold_fnv1a64(hash, &bytes[split..]);
1958 assert_eq!(hash, expected);
1959 }
1960
1961 let mut hash = fnv1a64(&[]);
1962 for chunk in bytes.chunks(13) {
1963 hash = fold_fnv1a64(hash, chunk);
1964 }
1965 assert_eq!(hash, expected);
1966 }
1967
1968 #[test]
1969 #[serial_test::serial]
1970 fn page_map_commit_tracks_dirty_page_offsets() {
1971 crate::stable::memory::reset_for_tests();
1972 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
1973 invalidate_read_cache();
1974
1975 let page_zero = vec![1_u8; page_len()];
1976 let page_later = vec![2_u8; page_len()];
1977 let later_page_no = SEGMENT_PAGE_COUNT + 1;
1978 write_at(0, &page_zero).unwrap();
1979 write_at(later_page_no * page_size(), &page_later).unwrap();
1980
1981 let block = Superblock::load().unwrap();
1982 let root = read_root_table(&block).unwrap();
1983 let table = read_page_table(&block).unwrap();
1984 let expected_pages = active_page_count(&block).unwrap();
1985 let expected_segments = segment_count_for_pages(expected_pages).unwrap();
1986
1987 assert_eq!(root.len() as u64, expected_segments);
1988 assert_eq!(table.len() as u64, expected_pages);
1989 assert_ne!(table[0], 0);
1990 assert_ne!(table[later_page_no as usize], 0);
1991
1992 let old_page_zero_offset = table[0];
1993 let updated_page_zero = vec![3_u8; page_len()];
1994 write_at(0, &updated_page_zero).unwrap();
1995 let updated_table = read_page_table(&Superblock::load().unwrap()).unwrap();
1996 let mut out = vec![0_u8; page_len()];
1997 read_base_at(0, &mut out).unwrap();
1998
1999 assert_ne!(updated_table[0], old_page_zero_offset);
2000 assert_eq!(out, updated_page_zero);
2001 }
2002
2003 #[test]
2004 #[serial_test::serial]
2005 fn page_map_commit_tracks_multi_segment_dirty_and_clean_pages() {
2006 crate::stable::memory::reset_for_tests();
2007 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2008 invalidate_read_cache();
2009
2010 let clean_page_no = 1;
2011 let later_page_no = SEGMENT_PAGE_COUNT + 1;
2012 write_at(0, &vec![1_u8; page_len()]).unwrap();
2013 write_at(clean_page_no * page_size(), &vec![2_u8; page_len()]).unwrap();
2014 write_at(later_page_no * page_size(), &vec![3_u8; page_len()]).unwrap();
2015
2016 let before = Superblock::load().unwrap();
2017 let before_root = read_root_table(&before).unwrap();
2018 let before_table = read_page_table(&before).unwrap();
2019
2020 begin_update().unwrap();
2021 write_at(0, &vec![4_u8; page_len()]).unwrap();
2022 write_at(later_page_no * page_size(), &vec![5_u8; page_len()]).unwrap();
2023 commit_update().unwrap();
2024
2025 let after = Superblock::load().unwrap();
2026 let after_root = read_root_table(&after).unwrap();
2027 let after_table = read_page_table(&after).unwrap();
2028
2029 assert_eq!(after_root.len(), after.page_count as usize);
2030 assert_eq!(after_root.len(), before_root.len());
2031 assert_ne!(after_table[0], before_table[0]);
2032 assert_eq!(
2033 after_table[clean_page_no as usize],
2034 before_table[clean_page_no as usize]
2035 );
2036 assert_ne!(
2037 after_table[later_page_no as usize],
2038 before_table[later_page_no as usize]
2039 );
2040 }
2041
2042 #[test]
2043 #[serial_test::serial]
2044 fn page_map_commit_zeroes_truncated_tail_slots() {
2045 crate::stable::memory::reset_for_tests();
2046 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2047 invalidate_read_cache();
2048
2049 write_at(0, &vec![1_u8; page_len()]).unwrap();
2050 write_at(page_size(), &vec![2_u8; page_len()]).unwrap();
2051 write_at(2 * page_size(), &vec![3_u8; page_len()]).unwrap();
2052 truncate(page_size()).unwrap();
2053
2054 let block = Superblock::load().unwrap();
2055 let root = read_root_table(&block).unwrap();
2056 let segment = read_segment_table(&block, &root, 0).unwrap();
2057
2058 assert_eq!(block.db_size, page_size());
2059 assert_eq!(segment[0] != 0, true);
2060 assert_eq!(segment[1], 0);
2061 assert_eq!(segment[2], 0);
2062 }
2063
2064 #[test]
2065 #[serial_test::serial]
2066 fn compact_keeps_zero_pages_and_densifies_offsets_across_segments() {
2067 crate::stable::memory::reset_for_tests();
2068 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2069 invalidate_read_cache();
2070
2071 let later_page_no = SEGMENT_PAGE_COUNT + 2;
2072 let first_page = vec![7_u8; page_len()];
2073 let later_page = vec![9_u8; page_len()];
2074 write_at(0, &first_page).unwrap();
2075 write_at(later_page_no * page_size(), &later_page).unwrap();
2076
2077 compact().unwrap();
2078
2079 let block = Superblock::load().unwrap();
2080 let root = read_root_table(&block).unwrap();
2081 let table = read_page_table(&block).unwrap();
2082 let mut first_out = vec![0_u8; page_len()];
2083 let mut later_out = vec![0_u8; page_len()];
2084
2085 read_base_at(0, &mut first_out).unwrap();
2086 read_base_at(later_page_no * page_size(), &mut later_out).unwrap();
2087
2088 assert_eq!(root.len() as u64, block.page_count);
2089 assert_eq!(table.len() as u64, active_page_count(&block).unwrap());
2090 assert_ne!(table[0], 0);
2091 assert_eq!(table[1], 0);
2092 assert_eq!(table[later_page_no as usize], table[0] + page_size());
2093 assert_eq!(first_out, first_page);
2094 assert_eq!(later_out, later_page);
2095 }
2096
2097 #[test]
2098 #[serial_test::serial]
2099 fn single_segment_fast_path_preserves_table_after_expand_only_commit() {
2100 crate::stable::memory::reset_for_tests();
2101 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2102 invalidate_read_cache();
2103
2104 write_at(0, &[0]).unwrap();
2105 truncate(page_size() * 4).unwrap();
2106 truncate(page_size() * 4 + 1).unwrap();
2107
2108 let block = Superblock::load().unwrap();
2109 let table = read_page_table(&block).unwrap();
2110 let mut first = [1_u8; 1];
2111 let mut expanded_tail = [1_u8; 1];
2112
2113 read_base_at(0, &mut first).unwrap();
2114 read_base_at(page_size() * 4, &mut expanded_tail).unwrap();
2115
2116 assert_eq!(block.db_size, page_size() * 4 + 1);
2117 assert_ne!(table[0], 0);
2118 assert_eq!(table[1], 0);
2119 assert_ne!(table[4], 0);
2120 assert_eq!(first, [0]);
2121 assert_eq!(expanded_tail, [0]);
2122 }
2123
2124 #[test]
2125 #[serial_test::serial]
2126 fn page_table_u64_encoding_is_little_endian_and_round_trips() {
2127 crate::stable::memory::reset_for_tests();
2128 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2129 invalidate_read_cache();
2130
2131 let entries = [
2132 0_u64,
2133 1,
2134 0x0102_0304_0506_0708,
2135 0xf1f2_f3f4_f5f6_f7f8,
2136 u64::MAX,
2137 ];
2138 let mut cursor = 128_u64;
2139 let expected_len = u64::try_from(entries.len() * 8).unwrap();
2140 crate::stable::memory::ensure_capacity(cursor + expected_len).unwrap();
2141
2142 let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2143 let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2144 let mut encoded = vec![0_u8; entries.len() * 8];
2145 crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2146 let expected = entries
2147 .iter()
2148 .flat_map(|entry| entry.to_le_bytes())
2149 .collect::<Vec<_>>();
2150
2151 assert_eq!(offset, 128);
2152 assert_eq!(cursor, 128 + expected_len);
2153 assert_eq!(decoded, entries);
2154 assert_eq!(encoded, expected);
2155
2156 let mut empty_cursor = cursor;
2157 assert_eq!(write_u64_table_at(&[], &mut empty_cursor).unwrap(), 0);
2158 assert_eq!(empty_cursor, cursor);
2159 assert!(read_u64_table_at(cursor, 0).unwrap().is_empty());
2160 }
2161
2162 #[test]
2163 #[serial_test::serial]
2164 fn pbt_page_table_u64_encoding_round_trips() {
2165 let mut runner = TestRunner::new(Config {
2166 cases: 128,
2167 ..Config::default()
2168 });
2169
2170 runner
2171 .run(
2172 &proptest::collection::vec(any::<u64>(), 0..=512),
2173 |entries| {
2174 crate::stable::memory::reset_for_tests();
2175 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2176 invalidate_read_cache();
2177
2178 let mut cursor = 128_u64;
2179 let byte_len = entries.len().checked_mul(8).unwrap();
2180 let end = cursor + u64::try_from(byte_len).unwrap();
2181 crate::stable::memory::ensure_capacity(end).unwrap();
2182
2183 let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2184 let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2185 prop_assert_eq!(decoded, entries.clone());
2186 prop_assert_eq!(cursor, end);
2187
2188 let mut encoded = vec![0_u8; byte_len];
2189 crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2190 let expected = entries
2191 .iter()
2192 .flat_map(|entry| entry.to_le_bytes())
2193 .collect::<Vec<_>>();
2194 prop_assert_eq!(encoded, expected);
2195 Ok(())
2196 },
2197 )
2198 .unwrap();
2199 }
2200
2201 #[test]
2202 #[serial_test::serial]
2203 fn pbt_compact_preserves_sparse_page_model() {
2204 let mut runner = TestRunner::new(Config {
2205 cases: 32,
2206 ..Config::default()
2207 });
2208
2209 runner
2210 .run(
2211 &proptest::collection::vec(prop::option::of(any::<u8>()), 0..=300),
2212 |pages| {
2213 crate::stable::memory::reset_for_tests();
2214 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2215 invalidate_read_cache();
2216
2217 let active_len = pages
2218 .iter()
2219 .rposition(Option::is_some)
2220 .map(|index| index + 1)
2221 .unwrap_or(0);
2222 for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2223 if let Some(byte) = byte {
2224 write_at(
2225 u64::try_from(page_no).unwrap() * page_size(),
2226 &vec![*byte; page_len()],
2227 )
2228 .unwrap();
2229 }
2230 }
2231
2232 compact().unwrap();
2233 let block = Superblock::load().unwrap();
2234 prop_assert_eq!(
2235 block.db_size,
2236 u64::try_from(active_len).unwrap() * page_size()
2237 );
2238 let table = read_page_table(&block).unwrap();
2239 prop_assert_eq!(table.len(), active_len);
2240
2241 let mut first_compacted_offset = None;
2242 let mut non_zero_seen = 0_u64;
2243 for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2244 let entry = table[page_no];
2245 let mut page = vec![0_u8; page_len()];
2246 read_base_at(u64::try_from(page_no).unwrap() * page_size(), &mut page)
2247 .unwrap();
2248
2249 if let Some(byte) = byte {
2250 let base = *first_compacted_offset.get_or_insert(entry);
2251 prop_assert_ne!(entry, 0);
2252 prop_assert_eq!(entry, base + non_zero_seen * page_size());
2253 prop_assert_eq!(page, vec![*byte; page_len()]);
2254 non_zero_seen += 1;
2255 } else {
2256 prop_assert_eq!(entry, 0);
2257 prop_assert_eq!(page, vec![0_u8; page_len()]);
2258 }
2259 }
2260 Ok(())
2261 },
2262 )
2263 .unwrap();
2264 }
2265
2266 #[derive(Clone, Debug)]
2267 enum BlobOp {
2268 Write { offset: u64, len: usize, byte: u8 },
2269 Truncate { size: u64 },
2270 Compact,
2271 }
2272
2273 #[test]
2274 #[serial_test::serial]
2275 fn pbt_blob_operations_match_logical_model_across_compact() {
2276 let mut runner = TestRunner::new(Config {
2277 cases: 48,
2278 ..Config::default()
2279 });
2280
2281 runner
2282 .run(&blob_operation_sequence(), |operations| {
2283 crate::stable::memory::reset_for_tests();
2284 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2285 invalidate_read_cache();
2286
2287 let mut model = Vec::new();
2288 let mut materialized = BTreeSet::new();
2289 assert_blob_model(&model, &materialized, false)?;
2290
2291 for operation in operations {
2292 let compacted = apply_blob_op(operation, &mut model, &mut materialized)?;
2293 assert_blob_model(&model, &materialized, compacted)?;
2294 }
2295 Ok(())
2296 })
2297 .unwrap();
2298 }
2299
2300 fn blob_operation_sequence() -> impl Strategy<Value = Vec<BlobOp>> {
2301 let write = (blob_offset_strategy(), blob_len_strategy(), any::<u8>())
2302 .prop_map(|(offset, len, byte)| BlobOp::Write { offset, len, byte });
2303 let truncate = blob_offset_strategy().prop_map(|size| BlobOp::Truncate { size });
2304 proptest::collection::vec(prop_oneof![write, truncate, Just(BlobOp::Compact)], 0..=48)
2305 }
2306
2307 fn blob_offset_strategy() -> impl Strategy<Value = u64> {
2308 let limit = blob_model_limit();
2309 let page = page_size();
2310 let segment = SEGMENT_PAGE_COUNT * page;
2311 prop_oneof![
2312 0_u64..=limit,
2313 prop::sample::select(boundary_values(&[
2314 0,
2315 1,
2316 page - 1,
2317 page,
2318 page + 1,
2319 segment - 1,
2320 segment,
2321 segment + 1,
2322 limit - 1,
2323 limit,
2324 ]))
2325 .prop_map(move |value| value.min(limit)),
2326 ]
2327 }
2328
2329 fn blob_len_strategy() -> impl Strategy<Value = usize> {
2330 prop_oneof![
2331 0_usize..=(page_len() * 2 + 17),
2332 prop::sample::select(vec![
2333 0,
2334 1,
2335 page_len() - 1,
2336 page_len(),
2337 page_len() + 1,
2338 page_len() * 2 + 1,
2339 ]),
2340 ]
2341 }
2342
2343 fn blob_model_limit() -> u64 {
2344 (SEGMENT_PAGE_COUNT + 3) * page_size()
2345 }
2346
2347 fn apply_blob_op(
2348 operation: BlobOp,
2349 model: &mut Vec<u8>,
2350 materialized: &mut BTreeSet<u64>,
2351 ) -> Result<bool, TestCaseError> {
2352 match operation {
2353 BlobOp::Write { offset, len, byte } => {
2354 let len = len.min(usize::try_from(blob_model_limit() - offset).unwrap());
2355 let bytes = vec![byte; len];
2356 write_at(offset, &bytes).map_err(|error| TestCaseError::fail(error.to_string()))?;
2357 if len == 0 {
2358 return Ok(false);
2359 }
2360
2361 let start = usize::try_from(offset).unwrap();
2362 let end = start + len;
2363 if model.len() < start {
2364 model.resize(start, 0);
2365 }
2366 if model.len() < end {
2367 model.resize(end, 0);
2368 }
2369 model[start..end].copy_from_slice(&bytes);
2370 mark_materialized_range(offset, len, materialized);
2371 Ok(false)
2372 }
2373 BlobOp::Truncate { size } => {
2374 truncate(size).map_err(|error| TestCaseError::fail(error.to_string()))?;
2375 let new_len = usize::try_from(size).unwrap();
2376 model.resize(new_len, 0);
2377 let active_pages = page_count_for_size(size)
2378 .map_err(|error| TestCaseError::fail(error.to_string()))?;
2379 materialized.retain(|page_no| *page_no < active_pages);
2380 if size > 0 && !size.is_multiple_of(page_size()) {
2381 materialized.insert(size / page_size());
2382 }
2383 Ok(false)
2384 }
2385 BlobOp::Compact => {
2386 compact().map_err(|error| TestCaseError::fail(error.to_string()))?;
2387 Ok(true)
2388 }
2389 }
2390 }
2391
2392 fn mark_materialized_range(offset: u64, len: usize, materialized: &mut BTreeSet<u64>) {
2393 let end = offset + u64::try_from(len).unwrap();
2394 let first_page = offset / page_size();
2395 let last_page = (end - 1) / page_size();
2396 for page_no in first_page..=last_page {
2397 materialized.insert(page_no);
2398 }
2399 }
2400
2401 fn assert_blob_model(
2402 model: &[u8],
2403 materialized: &BTreeSet<u64>,
2404 expect_compacted: bool,
2405 ) -> Result<(), TestCaseError> {
2406 let block = Superblock::load().map_err(|error| TestCaseError::fail(error.to_string()))?;
2407 prop_assert_eq!(block.db_size, u64::try_from(model.len()).unwrap());
2408
2409 if !model.is_empty() {
2410 let mut out = vec![0_u8; model.len()];
2411 read_base_at(0, &mut out).map_err(|error| TestCaseError::fail(error.to_string()))?;
2412 prop_assert_eq!(out, model);
2413 }
2414
2415 let mut tail = vec![1_u8; 32];
2416 read_base_at(u64::try_from(model.len()).unwrap(), &mut tail)
2417 .map_err(|error| TestCaseError::fail(error.to_string()))?;
2418 prop_assert_eq!(tail, vec![0_u8; 32]);
2419
2420 let table =
2421 read_page_table(&block).map_err(|error| TestCaseError::fail(error.to_string()))?;
2422 let active_pages = page_count_for_size(u64::try_from(model.len()).unwrap())
2423 .map_err(|error| TestCaseError::fail(error.to_string()))?;
2424 prop_assert_eq!(table.len(), usize::try_from(active_pages).unwrap());
2425
2426 let mut first_compacted_offset = None;
2427 let mut non_zero_seen = 0_u64;
2428 for (index, entry) in table.iter().enumerate() {
2429 let page_no = u64::try_from(index).unwrap();
2430 if materialized.contains(&page_no) {
2431 prop_assert_ne!(*entry, 0);
2432 if expect_compacted {
2433 let base = *first_compacted_offset.get_or_insert(*entry);
2434 prop_assert_eq!(*entry, base + non_zero_seen * page_size());
2435 }
2436 non_zero_seen += 1;
2437 } else {
2438 prop_assert_eq!(*entry, 0);
2439 }
2440 }
2441 Ok(())
2442 }
2443
2444 #[test]
2445 #[serial_test::serial]
2446 fn read_metrics_separate_table_cache_from_data_reads() {
2447 crate::stable::memory::reset_for_tests();
2448 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2449 invalidate_read_cache();
2450
2451 let page = vec![7_u8; page_len()];
2452 write_at(0, &page).unwrap();
2453 invalidate_read_cache();
2454 crate::read_metrics::reset_read_metrics();
2455
2456 let first = read_base_page(0).unwrap();
2457 let second = read_base_page(0).unwrap();
2458 let metrics = crate::read_metrics::read_metrics_snapshot();
2459
2460 assert_eq!(first, page);
2461 assert_eq!(second, page);
2462 assert!(metrics.stable_data_read_calls >= 2);
2463 assert!(metrics.stable_data_read_bytes >= page_size() * 2);
2464 assert!(metrics.page_table_root_misses >= 1);
2465 assert!(metrics.page_table_root_hits >= 1);
2466 assert!(metrics.page_table_segment_misses >= 1);
2467 assert!(metrics.page_table_segment_hits >= 1);
2468 #[cfg(feature = "bench-profile")]
2469 assert!(metrics.superblock_loads <= 1);
2470 #[cfg(not(feature = "bench-profile"))]
2471 assert_eq!(metrics.superblock_loads, 0);
2472 }
2473
2474 #[test]
2475 #[serial_test::serial]
2476 fn page_offset_cache_reuses_page_data_for_small_reads() {
2477 crate::stable::memory::reset_for_tests();
2478 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2479 invalidate_read_cache();
2480
2481 let page = vec![9_u8; page_len()];
2482 write_at(0, &page).unwrap();
2483 let block = Superblock::load().unwrap();
2484 let mut cache = PageOffsetCache::new();
2485 let mut first = [0_u8; 16];
2486 let mut second = [0_u8; 16];
2487
2488 crate::read_metrics::reset_read_metrics();
2489 read_base_at_with_page_cache(&block, 0, &mut first, &mut cache).unwrap();
2490 read_base_at_with_page_cache(&block, 8, &mut second, &mut cache).unwrap();
2491 let metrics = crate::read_metrics::read_metrics_snapshot();
2492
2493 assert_eq!(first, [9_u8; 16]);
2494 assert_eq!(second, [9_u8; 16]);
2495 assert_eq!(metrics.stable_data_read_calls, 1);
2496 assert_eq!(metrics.stable_data_read_bytes, page_size());
2497 }
2498}