1use crate::config::{SQLITE_PAGE_SIZE, STABLE_PAGE_SIZE, SUPERBLOCK_SIZE};
7use crate::sqlite_vfs::overlay::{self, Overlay};
8use crate::stable::memory::{self, ContextId, StableMemoryError};
9use crate::stable::meta::{
10 fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
11 PAGE_MAP_LAYOUT_VERSION,
12};
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::mem::MaybeUninit;
16
17const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
18const PAGE_TABLE_ENTRY_LEN: u64 = 8;
19const SEGMENT_PAGE_COUNT: u64 = 256;
20const SEGMENT_TABLE_BYTES: u64 = SEGMENT_PAGE_COUNT * PAGE_TABLE_ENTRY_LEN;
21const SINGLE_SEGMENT_PAGE_TABLE_BYTES: u64 = SEGMENT_TABLE_BYTES + PAGE_TABLE_ENTRY_LEN;
22const READ_SEGMENT_CACHE_CAPACITY: usize = 8;
23const FILE_PAGE_OFFSET_CACHE_CAPACITY: usize = 64;
24const FILE_PAGE_DATA_CACHE_CAPACITY: usize = 8;
25const COMPACT_MIN_ORPHAN_BYTES: u64 = 16 * 1024 * 1024;
26
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub struct ChecksumRefresh {
29 pub complete: bool,
30 pub checksum: u64,
31 pub scanned_bytes: u64,
32 pub db_size: u64,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36pub struct StorageStats {
37 pub layout_version: u64,
38 pub page_count: u64,
39 pub page_table_bytes: u64,
40 pub active_bytes: u64,
41 pub allocated_bytes: u64,
42 pub orphan_bytes_estimate: u64,
43 pub orphan_ratio_basis_points: u64,
44 pub compact_recommended: bool,
45}
46
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub(crate) enum StableBlobFailpoint {
49 OverlayWrite,
50 OverlayTruncate,
51 CommitCapacity,
52 CommitChunkWrite,
53 CommitPageTableWrite,
54 CommitSuperblockStore,
55}
56
57thread_local! {
58 #[cfg(test)]
59 static FAILPOINTS: RefCell<BTreeMap<ContextId, StableBlobFailpoint>> = const { RefCell::new(BTreeMap::new()) };
60 static READ_TABLE_CACHE: RefCell<Vec<(ContextId, ReadTableCache)>> = const { RefCell::new(Vec::new()) };
61 static COMMIT_SEGMENT_CACHE: RefCell<Vec<(ContextId, CommitSegmentCache)>> = const { RefCell::new(Vec::new()) };
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65struct ReadCacheKey {
66 page_table_offset: u64,
67 page_count: u64,
68 db_size: u64,
69 last_tx_id: u64,
70}
71
72#[derive(Debug)]
73struct ReadTableCache {
74 key: Option<ReadCacheKey>,
75 root: Vec<u64>,
76 segments: Vec<CachedSegment>,
77}
78
79#[derive(Debug)]
80struct CachedSegment {
81 segment_no: u64,
82 table: Vec<u64>,
83}
84
85#[derive(Debug)]
86struct CommitSegmentCache {
87 segment_no: u64,
88 segment_offset: u64,
89 table: Vec<u64>,
90}
91
92impl ReadTableCache {
93 fn new() -> Self {
94 Self {
95 key: None,
96 root: Vec::new(),
97 segments: Vec::new(),
98 }
99 }
100
101 fn clear(&mut self) {
102 self.key = None;
103 self.root.clear();
104 self.segments.clear();
105 }
106
107 fn ensure_key(&mut self, key: ReadCacheKey) {
108 if self.key == Some(key) {
109 return;
110 }
111 self.clear();
112 self.key = Some(key);
113 }
114
115 #[inline(always)]
116 fn segment_page_offset(&mut self, segment_no: u64, index: usize) -> Option<u64> {
117 if self.segments.is_empty() {
118 return None;
119 }
120 if self.segments.len() == 1 {
121 let segment = &self.segments[0];
122 if segment.segment_no == segment_no {
123 return Some(segment.table[index]);
124 }
125 return None;
126 }
127 let position = self
128 .segments
129 .iter()
130 .position(|segment| segment.segment_no == segment_no)?;
131 let offset = Some(self.segments[position].table[index]);
132 if position + 1 != self.segments.len() {
133 let segment = self.segments.remove(position);
134 self.segments.push(segment);
135 }
136 offset
137 }
138
139 fn insert_segment(&mut self, segment_no: u64, table: Vec<u64>) {
140 if let Some(position) = self
141 .segments
142 .iter()
143 .position(|segment| segment.segment_no == segment_no)
144 {
145 self.segments.remove(position);
146 }
147 self.segments.push(CachedSegment { segment_no, table });
148 while self.segments.len() > READ_SEGMENT_CACHE_CAPACITY {
149 self.segments.remove(0);
150 }
151 }
152}
153
154#[derive(Debug)]
155pub(crate) struct PageOffsetCache {
156 entries: Vec<(u64, u64)>,
157 pages: Vec<(u64, Vec<u8>)>,
158}
159
160impl PageOffsetCache {
161 pub(crate) fn new() -> Self {
162 Self {
163 entries: Vec::with_capacity(FILE_PAGE_OFFSET_CACHE_CAPACITY),
164 pages: Vec::new(),
165 }
166 }
167
168 fn get(&self, page_no: u64) -> Option<u64> {
169 match self.entries.as_slice() {
170 [] => None,
171 [(cached_page, physical)] => (*cached_page == page_no).then_some(*physical),
172 entries => {
173 for (cached_page, physical) in entries {
174 if *cached_page == page_no {
175 return Some(*physical);
176 }
177 }
178 None
179 }
180 }
181 }
182
183 fn insert(&mut self, page_no: u64, physical: u64) {
184 if self.entries.len() == FILE_PAGE_OFFSET_CACHE_CAPACITY {
185 self.entries.remove(0);
186 }
187 self.entries.push((page_no, physical));
188 }
189
190 #[inline(always)]
191 fn copy_page_slice(&self, page_no: u64, in_page: usize, dst: &mut [u8]) -> bool {
192 if self.pages.is_empty() {
193 return false;
194 }
195 if self.pages.len() == 1 {
196 let (cached_page, page) = &self.pages[0];
197 if *cached_page == page_no {
198 let end = in_page + dst.len();
199 dst.copy_from_slice(&page[in_page..end]);
200 return true;
201 }
202 return false;
203 }
204 for (cached_page, page) in &self.pages {
205 if *cached_page == page_no {
206 let end = in_page + dst.len();
207 dst.copy_from_slice(&page[in_page..end]);
208 return true;
209 }
210 }
211 false
212 }
213
214 fn insert_page(&mut self, page_no: u64, page: Vec<u8>) {
215 if self.pages.len() == FILE_PAGE_DATA_CACHE_CAPACITY {
216 self.pages.remove(0);
217 }
218 self.pages.push((page_no, page));
219 }
220}
221
222#[cfg(test)]
223pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
224 if let Ok(context) = memory::active_context_id() {
225 FAILPOINTS.with(|slot| {
226 slot.borrow_mut().insert(context, failpoint);
227 });
228 }
229}
230
231#[cfg(test)]
232pub(crate) fn clear_failpoint() {
233 FAILPOINTS.with(|slot| slot.borrow_mut().clear());
234}
235
236pub(crate) fn ensure_page_map_layout() -> Result<(), StableMemoryError> {
237 let block = Superblock::load()?;
238 if block.layout_version >= PAGE_MAP_LAYOUT_VERSION {
239 return Ok(());
240 }
241 Err(StableMemoryError::UnsupportedLayoutVersion(
242 block.layout_version,
243 ))
244}
245
246pub(crate) fn begin_update() -> Result<u64, StableMemoryError> {
247 let block = Superblock::load()?;
248 if block.layout_version < PAGE_MAP_LAYOUT_VERSION {
249 return Err(StableMemoryError::UnsupportedLayoutVersion(
250 block.layout_version,
251 ));
252 }
253 if block.is_importing() {
254 return Err(StableMemoryError::ImportAlreadyStarted);
255 }
256 overlay::begin(block.db_size)?;
257 Ok(block.db_size)
258}
259
260pub(crate) fn rollback_update() {
261 overlay::rollback();
262}
263
264#[doc(hidden)]
265pub fn invalidate_read_cache() {
266 READ_TABLE_CACHE.with(|cache| cache.borrow_mut().clear());
267 COMMIT_SEGMENT_CACHE.with(|cache| cache.borrow_mut().clear());
268}
269
270pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
271 let Some(overlay) = overlay::take() else {
272 return Ok(());
273 };
274 if overlay.is_empty() {
275 return Ok(());
276 }
277 commit_overlay(overlay, true)
278}
279
280pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
281 if let Some(result) = overlay::read_at(offset, dst) {
282 return result;
283 }
284 read_base_at(offset, dst)
285}
286
287pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
288 if dst.is_empty() {
289 return Ok(true);
290 }
291 let block = Superblock::load()?;
292 read_base_at_with_block(&block, offset, dst)
293}
294
295pub(crate) fn read_base_at_with_block(
296 block: &Superblock,
297 offset: u64,
298 dst: &mut [u8],
299) -> Result<bool, StableMemoryError> {
300 if dst.is_empty() {
301 return Ok(true);
302 }
303 if offset >= block.db_size {
304 dst.fill(0);
305 return Ok(false);
306 }
307 let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
308 if requested <= block.db_size - offset {
309 read_logical_range(block, offset, dst)?;
310 return Ok(true);
311 }
312 let copied = requested.min(block.db_size - offset);
313 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
314 read_logical_range(block, offset, &mut dst[..copied_len])?;
315 dst[copied_len..].fill(0);
316 Ok(copied == requested)
317}
318
319#[inline(always)]
320pub(crate) fn read_base_at_with_page_cache(
321 block: &Superblock,
322 offset: u64,
323 dst: &mut [u8],
324 page_offsets: &mut PageOffsetCache,
325) -> Result<bool, StableMemoryError> {
326 if dst.is_empty() {
327 return Ok(true);
328 }
329 if offset >= block.db_size {
330 dst.fill(0);
331 return Ok(false);
332 }
333 let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
334 if requested <= block.db_size - offset {
335 read_logical_range_with_page_cache(block, offset, dst, page_offsets)?;
336 return Ok(true);
337 }
338 let copied = requested.min(block.db_size - offset);
339 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
340 read_logical_range_with_page_cache(block, offset, &mut dst[..copied_len], page_offsets)?;
341 dst[copied_len..].fill(0);
342 Ok(copied == requested)
343}
344
345pub(crate) fn read_base_page(page_no: u64) -> Result<Vec<u8>, StableMemoryError> {
346 let block = Superblock::load()?;
347 let mut page = zero_page();
348 if page_no >= active_page_count(&block)? {
349 return Ok(page);
350 }
351 let physical = page_offset_for(&block, page_no)?;
352 if physical != 0 {
353 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
354 crate::read_metrics::record_stable_data_read(page.len());
355 memory::read_preallocated(physical, &mut page)?;
356 }
357 Ok(page)
358}
359
360pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
361 if let Some(result) = overlay::write_at(offset, bytes) {
362 hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
363 return result;
364 }
365 if bytes.is_empty() {
366 return Ok(());
367 }
368 ensure_page_map_layout()?;
369 let mut direct = Overlay::new(Superblock::load()?.db_size);
370 direct.write_at(offset, bytes)?;
371 commit_overlay(direct, false)
372}
373
374pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
375 if let Some(result) = overlay::truncate(size) {
376 hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
377 return result;
378 }
379 ensure_page_map_layout()?;
380 let mut direct = Overlay::new(Superblock::load()?.db_size);
381 direct.truncate(size)?;
382 if direct.is_empty() {
383 return Ok(());
384 }
385 commit_overlay(direct, false)
386}
387
388pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
389 if let Some(size) = overlay::file_size() {
390 return Ok(size);
391 }
392 Ok(Superblock::load()?.db_size)
393}
394
395pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
396 reject_during_update()?;
397 let block = Superblock::load()?;
398 if offset >= block.db_size {
399 return Ok(Vec::new());
400 }
401 let copied = len.min(block.db_size - offset);
402 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
403 let mut out = vec![0_u8; copied_len];
404 read_logical_range(&block, offset, &mut out)?;
405 Ok(out)
406}
407
408pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
409 reject_during_update()?;
410 let mut block = Superblock::load()?;
411 if !block.is_importing() {
412 return Err(StableMemoryError::ImportNotStarted);
413 }
414 let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
415 if offset != block.import_written_until {
416 return Err(StableMemoryError::ImportOutOfOrder {
417 offset,
418 expected: block.import_written_until,
419 });
420 }
421 let end = checked_add(offset, len)?;
422 if end > block.import_total_size {
423 return Err(StableMemoryError::ImportOutOfBounds {
424 offset,
425 len,
426 db_size: block.import_total_size,
427 });
428 }
429 memory::write(import_offset(&block, offset)?, bytes)?;
430 block.import_written_until = end;
431 block.store()?;
432 invalidate_read_cache();
433 Ok(())
434}
435
436pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
437 reject_during_update()?;
438 let mut block = Superblock::load()?;
439 if block.is_importing() {
440 return Err(StableMemoryError::ImportAlreadyStarted);
441 }
442 let import_base_offset = append_base()?;
443 checked_add(import_base_offset, total_size)?;
444 block.flags |= FLAG_IMPORTING;
445 block.clear_checksum_refresh();
446 block.import_expected_checksum = expected_checksum;
447 block.import_written_until = 0;
448 block.import_total_size = total_size;
449 block.import_base_offset = import_base_offset;
450 block.store()?;
451 invalidate_read_cache();
452 Ok(())
453}
454
455pub fn finish_import() -> Result<(), StableMemoryError> {
456 reject_during_update()?;
457 let mut block = Superblock::load()?;
458 if !block.is_importing() {
459 return Err(StableMemoryError::ImportNotStarted);
460 }
461 if block.import_written_until != block.import_total_size {
462 return Err(StableMemoryError::ImportIncomplete {
463 written_until: block.import_written_until,
464 db_size: block.import_total_size,
465 });
466 }
467 let checksum = checksum_physical_range(block.import_base_offset, block.import_total_size)?;
468 if checksum != block.import_expected_checksum {
469 let expected = block.import_expected_checksum;
470 clear_import(&mut block)?;
471 return Err(StableMemoryError::ChecksumMismatch {
472 expected,
473 actual: checksum,
474 });
475 }
476 let entries = imported_page_table(&block)?;
477 let (root_offset, root_len) = write_segmented_tables(&entries)?;
478 block.db_size = block.import_total_size;
479 block.db_base_offset = block.import_base_offset;
480 block.page_table_offset = root_offset;
481 block.page_count = root_len;
482 block.layout_version = PAGE_MAP_LAYOUT_VERSION;
483 block.flags &= !FLAG_IMPORTING;
484 block.flags &= !FLAG_CHECKSUM_STALE;
485 block.clear_checksum_refresh();
486 block.checksum = checksum;
487 block.import_expected_checksum = 0;
488 block.import_written_until = 0;
489 block.import_total_size = 0;
490 block.import_base_offset = 0;
491 block.store()?;
492 invalidate_read_cache();
493 Ok(())
494}
495
496pub fn cancel_import() -> Result<(), StableMemoryError> {
497 reject_during_update()?;
498 let mut block = Superblock::load()?;
499 if !block.is_importing() {
500 return Err(StableMemoryError::ImportNotStarted);
501 }
502 clear_import(&mut block)
503}
504
505pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
506 reject_during_update()?;
507 let checksum = checksum()?;
508 let mut block = Superblock::load()?;
509 block.checksum = checksum;
510 block.flags &= !FLAG_CHECKSUM_STALE;
511 block.clear_checksum_refresh();
512 block.store()?;
513 invalidate_read_cache();
514 Ok(checksum)
515}
516
517pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
518 reject_during_update()?;
519 if max_bytes == 0 {
520 return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
521 }
522
523 let mut block = Superblock::load()?;
524 if block.is_importing() {
525 return Err(StableMemoryError::ImportAlreadyStarted);
526 }
527 if !block.is_checksum_refreshing() {
528 block.flags |= FLAG_CHECKSUM_REFRESHING;
529 block.checksum_refresh_offset = 0;
530 block.checksum_refresh_hash = fnv1a64(&[]);
531 block.checksum_refresh_tx_id = block.last_tx_id;
532 }
533 if block.checksum_refresh_tx_id != block.last_tx_id {
534 block.clear_checksum_refresh();
535 block.store()?;
536 invalidate_read_cache();
537 return refresh_checksum_chunk(max_bytes);
538 }
539
540 let start = block.checksum_refresh_offset;
541 let end = block.db_size.min(start.saturating_add(max_bytes));
542 let mut offset = start;
543 let mut hash = block.checksum_refresh_hash;
544 while offset < end {
545 let len = (end - offset).min(CHECKSUM_CHUNK_LEN);
546 let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
547 let mut bytes = vec![0_u8; copied_len];
548 read_logical_range(&block, offset, &mut bytes)?;
549 hash = fold_fnv1a64(hash, &bytes);
550 offset += len;
551 }
552
553 block.checksum_refresh_offset = offset;
554 block.checksum_refresh_hash = hash;
555 if offset == block.db_size {
556 block.checksum = hash;
557 block.flags &= !FLAG_CHECKSUM_STALE;
558 block.clear_checksum_refresh();
559 }
560 let out = ChecksumRefresh {
561 complete: offset == block.db_size,
562 checksum: hash,
563 scanned_bytes: offset,
564 db_size: block.db_size,
565 };
566 block.store()?;
567 invalidate_read_cache();
568 Ok(out)
569}
570
571pub fn checksum() -> Result<u64, StableMemoryError> {
572 reject_during_update()?;
573 let block = Superblock::load()?;
574 checksum_logical_range(&block, block.db_size)
575}
576
577pub fn compact() -> Result<(), StableMemoryError> {
578 reject_during_update()?;
579 ensure_page_map_layout()?;
580 let block = Superblock::load()?;
581 let table = read_page_table(&block)?;
582 let mut compacted = Vec::with_capacity(table.len());
583 let mut cursor = append_base()?;
584 let non_zero_pages = table.iter().filter(|offset| **offset != 0).count();
585 let data_bytes = u64::try_from(non_zero_pages)
586 .map_err(|_| StableMemoryError::OffsetOverflow)?
587 .checked_mul(page_size())
588 .ok_or(StableMemoryError::OffsetOverflow)?;
589 memory::ensure_capacity(checked_add(cursor, data_bytes)?)?;
590
591 for offset in table {
592 if offset == 0 {
593 compacted.push(0);
594 continue;
595 }
596 let mut page = zero_page();
597 memory::read_preallocated(offset, &mut page)?;
598 memory::write_preallocated(cursor, &page)?;
599 compacted.push(cursor);
600 cursor = checked_add(cursor, page_size())?;
601 }
602
603 let (root_offset, root_len) = write_segmented_tables(&compacted)?;
604 Superblock::store_page_map_without_tx(root_offset, root_len, block.db_size)?;
605 invalidate_read_cache();
606 Ok(())
607}
608
609pub fn storage_stats() -> Result<StorageStats, StableMemoryError> {
610 let block = Superblock::load()?;
611 let table = read_page_table(&block)?;
612 let non_zero_pages = u64::try_from(table.iter().filter(|offset| **offset != 0).count())
613 .map_err(|_| StableMemoryError::OffsetOverflow)?;
614 let segment_count = active_segment_count(&block)?;
615 let root_bytes = root_table_bytes(segment_count)?;
616 let segment_bytes = segment_count
617 .checked_mul(segment_table_bytes()?)
618 .ok_or(StableMemoryError::OffsetOverflow)?;
619 let page_table_bytes = checked_add(root_bytes, segment_bytes)?;
620 let active_bytes = SUPERBLOCK_SIZE
621 .checked_add(non_zero_pages.saturating_mul(page_size()))
622 .and_then(|value| value.checked_add(page_table_bytes))
623 .ok_or(StableMemoryError::OffsetOverflow)?;
624 let allocated_bytes = memory::size_pages()
625 .checked_mul(STABLE_PAGE_SIZE)
626 .ok_or(StableMemoryError::OffsetOverflow)?;
627 let orphan_bytes_estimate = allocated_bytes.saturating_sub(active_bytes);
628 let orphan_ratio_basis_points = orphan_bytes_estimate
629 .saturating_mul(10_000)
630 .checked_div(active_bytes)
631 .unwrap_or(0);
632 Ok(StorageStats {
633 layout_version: block.layout_version,
634 page_count: active_page_count(&block)?,
635 page_table_bytes,
636 active_bytes,
637 allocated_bytes,
638 orphan_bytes_estimate,
639 orphan_ratio_basis_points,
640 compact_recommended: orphan_bytes_estimate >= active_bytes
641 && orphan_bytes_estimate >= COMPACT_MIN_ORPHAN_BYTES,
642 })
643}
644
645pub(crate) fn page_count_for_size(size: u64) -> Result<u64, StableMemoryError> {
646 Ok(size.div_ceil(page_size()))
647}
648
649#[cfg(test)]
650pub(crate) fn debug_root_table_for_tests() -> Result<Vec<u64>, StableMemoryError> {
651 let block = Superblock::load()?;
652 read_root_table(&block)
653}
654
655fn commit_overlay(overlay: Overlay, advance_tx: bool) -> Result<(), StableMemoryError> {
656 hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
657 let profile_enabled = commit_profile_enabled();
658 let block = Superblock::load()?;
659 let overlay_size = overlay.size();
660 let final_page_count = page_count_for_size(overlay_size)?;
661 let data_cursor = append_base()?;
662 debug_assert!(overlay
663 .dirty_pages()
664 .iter()
665 .all(|(page_no, _)| *page_no < final_page_count));
666 let dirty_pages = overlay.dirty_pages();
667 if let [(page_no, page)] = dirty_pages {
668 if overlay_size >= block.db_size
669 && *page_no < final_page_count
670 && final_page_count <= SEGMENT_PAGE_COUNT
671 {
672 let build_profile_start = commit_profile_start(profile_enabled);
673 let options = SinglePageCommitOptions {
674 advance_tx,
675 overlay_size,
676 data_cursor,
677 profile_enabled,
678 build_profile_start,
679 };
680 return commit_single_segment_page_overlay(&block, *page_no, page, options);
681 }
682 }
683
684 let final_segment_count = segment_count_for_pages(final_page_count)?;
685 let profile_start = commit_profile_start(profile_enabled);
686 let mut root = read_commit_root_table(&block)?;
687 commit_profile_record_load(profile_start);
688
689 let build_profile_start = commit_profile_start(profile_enabled);
690 let root_len =
691 usize::try_from(final_segment_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
692 if root.len() != root_len {
693 root.resize(root_len, 0);
694 }
695
696 if let [(page_no, page)] = dirty_pages {
697 if overlay_size >= block.db_size && *page_no < final_page_count {
698 let options = SinglePageCommitOptions {
699 advance_tx,
700 overlay_size,
701 data_cursor,
702 profile_enabled,
703 build_profile_start,
704 };
705 return commit_single_page_overlay(
706 &block,
707 final_segment_count,
708 root,
709 *page_no,
710 page,
711 options,
712 );
713 }
714 }
715
716 let mut segment_updates = BTreeMap::<u64, Vec<u64>>::new();
717 let mut page_cursor = data_cursor;
718
719 for (page_no, _) in dirty_pages {
720 if *page_no >= final_page_count {
721 continue;
722 }
723 let segment_no = segment_no(*page_no);
724 let index = segment_index(*page_no)?;
725 let table = load_segment_for_update(&block, &root, &mut segment_updates, segment_no)?;
726 table[index] = page_cursor;
727 page_cursor = checked_add(page_cursor, page_size())?;
728 }
729
730 if overlay_size < block.db_size {
731 clear_truncated_tail(&block, &root, &mut segment_updates, final_page_count)?;
732 }
733 commit_profile_record_build_segments(build_profile_start);
734
735 let mut table_cursor = page_cursor;
736 let root_entries_len = final_segment_count;
737 let segment_table_writes = segment_updates.len();
738 let segment_table_bytes = u64::try_from(segment_table_writes)
739 .map_err(|_| StableMemoryError::OffsetOverflow)?
740 .checked_mul(segment_table_bytes()?)
741 .ok_or(StableMemoryError::OffsetOverflow)?;
742 let page_table_bytes = checked_add(segment_table_bytes, root_table_bytes(root_entries_len)?)?;
743 let profile_start = commit_profile_start(profile_enabled);
744 memory::ensure_capacity(checked_add(table_cursor, page_table_bytes)?)?;
745 commit_profile_record_capacity(profile_start);
746
747 let profile_start = commit_profile_start(profile_enabled);
748 let mut cursor = data_cursor;
749 for (_, page) in dirty_pages {
750 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
751 write_commit_page(cursor, page, profile_enabled)?;
752 cursor = checked_add(cursor, page_size())?;
753 }
754 commit_profile_record_page_write(profile_start);
755
756 hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
757 let profile_start = commit_profile_start(profile_enabled);
758 for (segment_no, table) in segment_updates {
759 let offset = write_commit_segment_table_at(&table, &mut table_cursor, profile_enabled)?;
760 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
761 root[index] = offset;
762 }
763 let root_offset = write_commit_root_table_at(&root, &mut table_cursor, profile_enabled)?;
764 commit_profile_record_table_write(profile_start);
765
766 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
767 let profile_start = commit_profile_start(profile_enabled);
768 let result = store_commit_page_map(
769 advance_tx,
770 root_offset,
771 root_entries_len,
772 overlay_size,
773 profile_enabled,
774 );
775 commit_profile_record_superblock_store(profile_start);
776 result
777}
778
779#[derive(Clone, Copy)]
780struct SinglePageCommitOptions {
781 advance_tx: bool,
782 overlay_size: u64,
783 data_cursor: u64,
784 profile_enabled: bool,
785 build_profile_start: Option<u64>,
786}
787
788fn commit_single_page_overlay(
789 block: &Superblock,
790 final_segment_count: u64,
791 mut root: Vec<u64>,
792 page_no: u64,
793 page: &[u8],
794 options: SinglePageCommitOptions,
795) -> Result<(), StableMemoryError> {
796 let segment_no = segment_no(page_no);
797 let index = segment_index(page_no)?;
798 let mut table = read_commit_segment_table(block, &root, segment_no)?;
799 table[index] = options.data_cursor;
800 let page_cursor = checked_add(options.data_cursor, page_size())?;
801 commit_profile_record_build_segments(options.build_profile_start);
802
803 let root_entries_len = final_segment_count;
804 let page_table_bytes =
805 checked_add(segment_table_bytes()?, root_table_bytes(root_entries_len)?)?;
806 let profile_start = commit_profile_start(options.profile_enabled);
807 memory::ensure_capacity(checked_add(page_cursor, page_table_bytes)?)?;
808 commit_profile_record_capacity(profile_start);
809
810 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
811 let profile_start = commit_profile_start(options.profile_enabled);
812 write_commit_page(options.data_cursor, page, options.profile_enabled)?;
813 commit_profile_record_page_write(profile_start);
814
815 hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
816 let profile_start = commit_profile_start(options.profile_enabled);
817 let mut table_cursor = page_cursor;
818 let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
819 let root_offset = if final_segment_count == 1 {
820 write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?
821 } else {
822 let root_index =
823 usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
824 root[root_index] = offset;
825 write_commit_root_table_at(&root, &mut table_cursor, options.profile_enabled)?
826 };
827 commit_profile_record_table_write(profile_start);
828
829 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
830 let profile_start = commit_profile_start(options.profile_enabled);
831 let result = store_commit_page_map(
832 options.advance_tx,
833 root_offset,
834 root_entries_len,
835 options.overlay_size,
836 options.profile_enabled,
837 );
838 commit_profile_record_superblock_store(profile_start);
839 if result.is_ok() {
840 cache_commit_segment_table(segment_no, offset, table);
841 }
842 result
843}
844
845fn commit_single_segment_page_overlay(
846 block: &Superblock,
847 page_no: u64,
848 page: &[u8],
849 options: SinglePageCommitOptions,
850) -> Result<(), StableMemoryError> {
851 let index = segment_index(page_no)?;
852 let root = read_commit_root_table(block)?;
853 let mut table = read_commit_segment_table(block, &root, 0)?;
854 table[index] = options.data_cursor;
855 let page_cursor = checked_add(options.data_cursor, page_size())?;
856 commit_profile_record_build_segments(options.build_profile_start);
857
858 let profile_start = commit_profile_start(options.profile_enabled);
859 memory::ensure_capacity(checked_add(page_cursor, SINGLE_SEGMENT_PAGE_TABLE_BYTES)?)?;
860 commit_profile_record_capacity(profile_start);
861
862 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
863 let profile_start = commit_profile_start(options.profile_enabled);
864 memory::write_prechecked(options.data_cursor, page)?;
865 commit_profile_record_page_write(profile_start);
866
867 hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
868 let profile_start = commit_profile_start(options.profile_enabled);
869 let mut table_cursor = page_cursor;
870 let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
871 let root_offset =
872 write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?;
873 commit_profile_record_table_write(profile_start);
874
875 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
876 let profile_start = commit_profile_start(options.profile_enabled);
877 let result = store_commit_page_map(
878 options.advance_tx,
879 root_offset,
880 1,
881 options.overlay_size,
882 options.profile_enabled,
883 );
884 commit_profile_record_superblock_store(profile_start);
885 if result.is_ok() {
886 cache_commit_segment_table(0, offset, table);
887 }
888 result
889}
890
891#[cfg(any(test, debug_assertions, feature = "bench-profile"))]
892#[inline(always)]
893fn commit_profile_enabled() -> bool {
894 crate::read_metrics::metrics_enabled()
895}
896
897#[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
898#[inline(always)]
899fn commit_profile_enabled() -> bool {
900 false
901}
902
903#[inline(always)]
904fn commit_profile_start(enabled: bool) -> Option<u64> {
905 if enabled {
906 Some(crate::read_metrics::instruction_counter())
907 } else {
908 None
909 }
910}
911
912macro_rules! commit_profile_recorder {
913 ($name:ident, $record:ident) => {
914 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
915 #[inline(always)]
916 fn $name(start: Option<u64>) {
917 if let Some(start) = start {
918 crate::read_metrics::$record(
919 crate::read_metrics::instruction_counter().saturating_sub(start),
920 );
921 }
922 }
923
924 #[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
925 #[inline(always)]
926 fn $name(_start: Option<u64>) {}
927 };
928}
929
930commit_profile_recorder!(commit_profile_record_load, record_commit_load);
931commit_profile_recorder!(
932 commit_profile_record_build_segments,
933 record_commit_build_segments
934);
935commit_profile_recorder!(commit_profile_record_capacity, record_commit_capacity);
936commit_profile_recorder!(commit_profile_record_page_write, record_commit_page_write);
937commit_profile_recorder!(commit_profile_record_table_write, record_commit_table_write);
938commit_profile_recorder!(
939 commit_profile_record_superblock_store,
940 record_commit_superblock_store
941);
942
943#[inline(always)]
944fn write_commit_page(
945 offset: u64,
946 page: &[u8],
947 profile_enabled: bool,
948) -> Result<(), StableMemoryError> {
949 if profile_enabled {
950 memory::write_prechecked(offset, page)
951 } else {
952 memory::write_prechecked_unmetered(offset, page)
953 }
954}
955
956fn store_commit_page_map(
957 advance_tx: bool,
958 root_offset: u64,
959 root_entries_len: u64,
960 overlay_size: u64,
961 profile_enabled: bool,
962) -> Result<(), StableMemoryError> {
963 match (advance_tx, profile_enabled) {
964 (true, true) => Superblock::commit_page_map(root_offset, root_entries_len, overlay_size),
965 (true, false) => {
966 Superblock::commit_page_map_unmetered(root_offset, root_entries_len, overlay_size)
967 }
968 (false, true) => {
969 Superblock::store_page_map_without_tx(root_offset, root_entries_len, overlay_size)
970 }
971 (false, false) => Superblock::store_page_map_without_tx_unmetered(
972 root_offset,
973 root_entries_len,
974 overlay_size,
975 ),
976 }
977}
978
979fn load_segment_for_update<'a>(
980 block: &Superblock,
981 root: &[u64],
982 updates: &'a mut BTreeMap<u64, Vec<u64>>,
983 segment_no: u64,
984) -> Result<&'a mut Vec<u64>, StableMemoryError> {
985 match updates.entry(segment_no) {
986 std::collections::btree_map::Entry::Occupied(entry) => Ok(entry.into_mut()),
987 std::collections::btree_map::Entry::Vacant(entry) => {
988 let table = read_segment_table(block, root, segment_no)?;
989 Ok(entry.insert(table))
990 }
991 }
992}
993
994fn clear_truncated_tail(
995 block: &Superblock,
996 root: &[u64],
997 updates: &mut BTreeMap<u64, Vec<u64>>,
998 final_page_count: u64,
999) -> Result<(), StableMemoryError> {
1000 let old_page_count = active_page_count(block)?;
1001 if final_page_count >= old_page_count || final_page_count == 0 {
1002 return Ok(());
1003 }
1004 let boundary_segment = segment_no(final_page_count);
1005 if boundary_segment >= segment_count_for_pages(final_page_count)? {
1006 return Ok(());
1007 }
1008 let start = segment_index(final_page_count)?;
1009 if start == 0 {
1010 return Ok(());
1011 }
1012 let table = load_segment_for_update(block, root, updates, boundary_segment)?;
1013 table[start..].fill(0);
1014 Ok(())
1015}
1016
1017fn reject_during_update() -> Result<(), StableMemoryError> {
1018 if overlay::is_active() {
1019 Err(StableMemoryError::UpdateInProgress)
1020 } else {
1021 Ok(())
1022 }
1023}
1024
1025fn read_logical_range(
1026 block: &Superblock,
1027 offset: u64,
1028 dst: &mut [u8],
1029) -> Result<(), StableMemoryError> {
1030 if dst.is_empty() {
1031 return Ok(());
1032 }
1033 let in_page =
1034 usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1035 if dst.len() <= page_len() - in_page {
1036 return read_logical_page_slice(block, offset / page_size(), in_page, dst);
1037 }
1038
1039 let mut copied_total = 0_usize;
1040 while copied_total < dst.len() {
1041 let absolute = checked_add(
1042 offset,
1043 u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1044 )?;
1045 let page_no = absolute / page_size();
1046 let in_page = usize::try_from(absolute % page_size())
1047 .map_err(|_| StableMemoryError::OffsetOverflow)?;
1048 let copied = (page_len() - in_page).min(dst.len() - copied_total);
1049 read_logical_page_slice(
1050 block,
1051 page_no,
1052 in_page,
1053 &mut dst[copied_total..copied_total + copied],
1054 )?;
1055 copied_total += copied;
1056 }
1057 Ok(())
1058}
1059
1060fn read_logical_range_with_page_cache(
1061 block: &Superblock,
1062 offset: u64,
1063 dst: &mut [u8],
1064 page_offsets: &mut PageOffsetCache,
1065) -> Result<(), StableMemoryError> {
1066 let in_page =
1067 usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1068 if dst.len() <= page_len() - in_page {
1069 return read_logical_page_slice_with_page_cache(
1070 block,
1071 offset / page_size(),
1072 in_page,
1073 dst,
1074 page_offsets,
1075 );
1076 }
1077
1078 let mut copied_total = 0_usize;
1079 while copied_total < dst.len() {
1080 let absolute = checked_add(
1081 offset,
1082 u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1083 )?;
1084 let page_no = absolute / page_size();
1085 let in_page = usize::try_from(absolute % page_size())
1086 .map_err(|_| StableMemoryError::OffsetOverflow)?;
1087 let copied = (page_len() - in_page).min(dst.len() - copied_total);
1088 read_logical_page_slice_with_page_cache(
1089 block,
1090 page_no,
1091 in_page,
1092 &mut dst[copied_total..copied_total + copied],
1093 page_offsets,
1094 )?;
1095 copied_total += copied;
1096 }
1097 Ok(())
1098}
1099
1100fn read_logical_page_slice(
1101 block: &Superblock,
1102 page_no: u64,
1103 in_page: usize,
1104 dst: &mut [u8],
1105) -> Result<(), StableMemoryError> {
1106 let physical = page_offset_for(block, page_no)?;
1107 if physical == 0 {
1108 dst.fill(0);
1109 return Ok(());
1110 }
1111 let stable_offset = checked_add(
1112 physical,
1113 u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1114 )?;
1115 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1116 crate::read_metrics::record_stable_data_read(dst.len());
1117 memory::read_preallocated(stable_offset, dst)
1118}
1119
1120#[inline(always)]
1121fn read_logical_page_slice_with_page_cache(
1122 block: &Superblock,
1123 page_no: u64,
1124 in_page: usize,
1125 dst: &mut [u8],
1126 page_offsets: &mut PageOffsetCache,
1127) -> Result<(), StableMemoryError> {
1128 if dst.len() < page_len() && page_offsets.copy_page_slice(page_no, in_page, dst) {
1129 return Ok(());
1130 }
1131 let physical = match page_offsets.get(page_no) {
1132 Some(physical) => physical,
1133 None => {
1134 let physical = if block.page_table_offset == 0 {
1135 0
1136 } else {
1137 cached_page_offset_for(block, page_no)?
1138 };
1139 page_offsets.insert(page_no, physical);
1140 physical
1141 }
1142 };
1143 if physical == 0 {
1144 dst.fill(0);
1145 return Ok(());
1146 }
1147 if in_page == 0 && dst.len() == page_len() {
1148 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1149 crate::read_metrics::record_stable_data_read(dst.len());
1150 return memory::read_preallocated(physical, dst);
1151 }
1152 if dst.len() < page_len() {
1153 let mut page = zero_page();
1154 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1155 crate::read_metrics::record_stable_data_read(page.len());
1156 memory::read_preallocated(physical, &mut page)?;
1157 let end = in_page + dst.len();
1158 dst.copy_from_slice(&page[in_page..end]);
1159 page_offsets.insert_page(page_no, page);
1160 return Ok(());
1161 }
1162 let stable_offset = checked_add(
1163 physical,
1164 u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1165 )?;
1166 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1167 crate::read_metrics::record_stable_data_read(dst.len());
1168 memory::read_preallocated(stable_offset, dst)
1169}
1170
1171fn page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1172 if page_no >= active_page_count(block)? || block.page_table_offset == 0 {
1173 return Ok(0);
1174 }
1175 cached_page_offset_for(block, page_no)
1176}
1177
1178fn read_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1179 let root = read_root_table(block)?;
1180 let count = active_page_count(block)?;
1181 let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1182 let mut entries = Vec::with_capacity(capacity);
1183 for segment_no in 0..segment_count_for_pages(count)? {
1184 let table = read_segment_table(block, &root, segment_no)?;
1185 for entry in table {
1186 if entries.len() == capacity {
1187 break;
1188 }
1189 entries.push(entry);
1190 }
1191 }
1192 Ok(entries)
1193}
1194
1195fn cached_page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1196 let context = memory::active_context_id()?;
1197 let key = read_cache_key(block);
1198 let segment_no = segment_no(page_no);
1199 let index = segment_index(page_no)?;
1200 READ_TABLE_CACHE.with(|cache| {
1201 let mut caches = cache.borrow_mut();
1202 let cache = match read_table_cache_index(&caches, context) {
1203 Some(index) => &mut caches[index].1,
1204 None => {
1205 caches.push((context, ReadTableCache::new()));
1206 &mut caches
1207 .last_mut()
1208 .ok_or(StableMemoryError::OffsetOverflow)?
1209 .1
1210 }
1211 };
1212 cache.ensure_key(key);
1213 if cache.root.is_empty() {
1214 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1215 crate::read_metrics::record_page_table_root_miss();
1216 cache.root = read_root_table(block)?;
1217 } else {
1218 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1219 crate::read_metrics::record_page_table_root_hit();
1220 }
1221 let root_index =
1222 usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1223 let segment_offset = cache.root[root_index];
1224 if segment_offset == 0 {
1225 return Ok(0);
1226 }
1227 if let Some(offset) = cache.segment_page_offset(segment_no, index) {
1228 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1229 crate::read_metrics::record_page_table_segment_hit();
1230 return Ok(offset);
1231 }
1232 #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1233 crate::read_metrics::record_page_table_segment_miss();
1234 let table = read_segment_table_at(segment_offset)?;
1235 let offset = table[index];
1236 cache.insert_segment(segment_no, table);
1237 Ok(offset)
1238 })
1239}
1240
1241fn read_table_cache_index(
1242 caches: &[(ContextId, ReadTableCache)],
1243 context: ContextId,
1244) -> Option<usize> {
1245 caches
1246 .iter()
1247 .position(|(stored_context, _)| *stored_context == context)
1248}
1249
1250fn read_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1251 if block.page_count == 0 {
1252 return Ok(Vec::new());
1253 }
1254 let entries_len =
1255 usize::try_from(block.page_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1256 read_u64_table_at(block.page_table_offset, entries_len)
1257}
1258
1259fn read_commit_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1260 read_root_table(block)
1261}
1262
1263fn read_segment_table(
1264 _block: &Superblock,
1265 root: &[u64],
1266 segment_no: u64,
1267) -> Result<Vec<u64>, StableMemoryError> {
1268 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1269 let Some(offset) = root.get(index).copied() else {
1270 return Ok(vec![0_u64; segment_page_count_usize()]);
1271 };
1272 if offset == 0 {
1273 return Ok(vec![0_u64; segment_page_count_usize()]);
1274 }
1275 read_segment_table_at(offset)
1276}
1277
1278fn read_commit_segment_table(
1279 _block: &Superblock,
1280 root: &[u64],
1281 segment_no: u64,
1282) -> Result<Vec<u64>, StableMemoryError> {
1283 let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1284 let Some(offset) = root.get(index).copied() else {
1285 return Ok(vec![0_u64; segment_page_count_usize()]);
1286 };
1287 if offset == 0 {
1288 return Ok(vec![0_u64; segment_page_count_usize()]);
1289 }
1290 read_commit_segment_table_at(segment_no, offset)
1291}
1292
1293fn read_commit_segment_table_at(
1294 segment_no: u64,
1295 offset: u64,
1296) -> Result<Vec<u64>, StableMemoryError> {
1297 if offset == 0 {
1298 return Ok(vec![0_u64; segment_page_count_usize()]);
1299 }
1300 if let Some(table) = take_commit_segment_table(segment_no, offset) {
1301 return Ok(table);
1302 }
1303 read_segment_table_at(offset)
1304}
1305
1306fn take_commit_segment_table(segment_no: u64, segment_offset: u64) -> Option<Vec<u64>> {
1307 let Ok(context) = memory::active_context_id() else {
1308 return None;
1309 };
1310 COMMIT_SEGMENT_CACHE.with(|cache| {
1311 let mut cache = cache.borrow_mut();
1312 if cache.len() == 1 {
1313 let (stored_context, cached) = &cache[0];
1314 if *stored_context == context
1315 && cached.segment_no == segment_no
1316 && cached.segment_offset == segment_offset
1317 {
1318 return cache.pop().map(|(_, cached)| cached.table);
1319 }
1320 return None;
1321 }
1322 cache
1323 .iter()
1324 .position(|(stored_context, cached)| {
1325 *stored_context == context
1326 && cached.segment_no == segment_no
1327 && cached.segment_offset == segment_offset
1328 })
1329 .map(|position| cache.remove(position).1.table)
1330 })
1331}
1332
1333fn cache_commit_segment_table(segment_no: u64, segment_offset: u64, table: Vec<u64>) {
1334 let Ok(context) = memory::active_context_id() else {
1335 return;
1336 };
1337 COMMIT_SEGMENT_CACHE.with(|cache| {
1338 let mut cache = cache.borrow_mut();
1339 if cache.is_empty() {
1340 cache.push((
1341 context,
1342 CommitSegmentCache {
1343 segment_no,
1344 segment_offset,
1345 table,
1346 },
1347 ));
1348 return;
1349 }
1350 if cache.len() == 1 {
1351 let (stored_context, cached) = &mut cache[0];
1352 if *stored_context == context {
1353 cached.segment_no = segment_no;
1354 cached.segment_offset = segment_offset;
1355 cached.table = table;
1356 return;
1357 }
1358 } else if let Some((_, cached)) = cache
1359 .iter_mut()
1360 .find(|(stored_context, _)| *stored_context == context)
1361 {
1362 cached.segment_no = segment_no;
1363 cached.segment_offset = segment_offset;
1364 cached.table = table;
1365 return;
1366 }
1367 cache.push((
1368 context,
1369 CommitSegmentCache {
1370 segment_no,
1371 segment_offset,
1372 table,
1373 },
1374 ));
1375 });
1376}
1377
1378fn read_segment_table_at(offset: u64) -> Result<Vec<u64>, StableMemoryError> {
1379 read_u64_table_at(offset, segment_page_count_usize())
1380}
1381
1382fn write_segmented_tables(entries: &[u64]) -> Result<(u64, u64), StableMemoryError> {
1383 if entries.is_empty() {
1384 return Ok((0, 0));
1385 }
1386 let root_len = segment_count_for_pages(entries_len_u64(entries)?)?;
1387 let mut cursor = append_base()?;
1388 let segment_bytes = root_len
1389 .checked_mul(segment_table_bytes()?)
1390 .ok_or(StableMemoryError::OffsetOverflow)?;
1391 let page_table_bytes = checked_add(segment_bytes, root_table_bytes(root_len)?)?;
1392 memory::ensure_capacity(checked_add(cursor, page_table_bytes)?)?;
1393 let mut root = Vec::with_capacity(
1394 usize::try_from(root_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1395 );
1396 for segment_no in 0..root_len {
1397 let start = usize::try_from(
1398 segment_no
1399 .checked_mul(SEGMENT_PAGE_COUNT)
1400 .ok_or(StableMemoryError::OffsetOverflow)?,
1401 )
1402 .map_err(|_| StableMemoryError::OffsetOverflow)?;
1403 let mut table = vec![0_u64; segment_page_count_usize()];
1404 for (offset, entry) in entries[start..]
1405 .iter()
1406 .take(segment_page_count_usize())
1407 .enumerate()
1408 {
1409 table[offset] = *entry;
1410 }
1411 root.push(write_segment_table_at(&table, &mut cursor)?);
1412 }
1413 let root_offset = write_root_table_at(&root, &mut cursor)?;
1414 Ok((root_offset, entries_len_u64(&root)?))
1415}
1416
1417#[inline(always)]
1418fn write_segment_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1419 if entries.len() == segment_page_count_usize() {
1420 return write_u64_table_at(entries, cursor);
1421 }
1422
1423 let mut table = vec![0_u64; segment_page_count_usize()];
1424 for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1425 table[index] = *entry;
1426 }
1427 write_u64_table_at(&table, cursor)
1428}
1429
1430fn write_root_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1431 write_u64_table_at(entries, cursor)
1432}
1433
1434#[inline(always)]
1435fn write_commit_segment_table_at(
1436 entries: &[u64],
1437 cursor: &mut u64,
1438 profile_enabled: bool,
1439) -> Result<u64, StableMemoryError> {
1440 if profile_enabled {
1441 write_segment_table_at(entries, cursor)
1442 } else {
1443 write_segment_table_at_unmetered(entries, cursor)
1444 }
1445}
1446
1447#[inline(always)]
1448fn write_commit_root_table_at(
1449 entries: &[u64],
1450 cursor: &mut u64,
1451 profile_enabled: bool,
1452) -> Result<u64, StableMemoryError> {
1453 if profile_enabled {
1454 write_root_table_at(entries, cursor)
1455 } else {
1456 write_u64_table_at_unmetered(entries, cursor)
1457 }
1458}
1459
1460fn write_segment_table_at_unmetered(
1461 entries: &[u64],
1462 cursor: &mut u64,
1463) -> Result<u64, StableMemoryError> {
1464 if entries.len() == segment_page_count_usize() {
1465 return write_u64_table_at_unmetered(entries, cursor);
1466 }
1467
1468 let mut table = vec![0_u64; segment_page_count_usize()];
1469 for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1470 table[index] = *entry;
1471 }
1472 write_u64_table_at_unmetered(&table, cursor)
1473}
1474
1475fn write_u64_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1476 if entries.is_empty() {
1477 return Ok(0);
1478 }
1479 let offset = *cursor;
1480 let byte_len = entries
1481 .len()
1482 .checked_mul(8)
1483 .ok_or(StableMemoryError::OffsetOverflow)?;
1484 #[cfg(target_endian = "little")]
1485 {
1486 let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1488 memory::write_prechecked(offset, bytes)?;
1489 *cursor = checked_add(
1490 offset,
1491 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1492 )?;
1493 Ok(offset)
1494 }
1495
1496 #[cfg(not(target_endian = "little"))]
1497 {
1498 let mut bytes = vec![0_u8; byte_len];
1499 for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1500 chunk.copy_from_slice(&entry.to_le_bytes());
1501 }
1502 memory::write_prechecked(offset, &bytes)?;
1503 *cursor = checked_add(
1504 offset,
1505 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1506 )?;
1507 Ok(offset)
1508 }
1509}
1510
1511fn read_u64_table_at(offset: u64, entries_len: usize) -> Result<Vec<u64>, StableMemoryError> {
1512 if entries_len == 0 {
1513 return Ok(Vec::new());
1514 }
1515 let byte_len = entries_len
1516 .checked_mul(8)
1517 .ok_or(StableMemoryError::OffsetOverflow)?;
1518 #[cfg(target_endian = "little")]
1519 {
1520 let mut entries = Vec::<MaybeUninit<u64>>::with_capacity(entries_len);
1521 unsafe {
1522 entries.set_len(entries_len);
1523 }
1524 let bytes =
1527 unsafe { std::slice::from_raw_parts_mut(entries.as_mut_ptr().cast::<u8>(), byte_len) };
1528 memory::read_preallocated(offset, bytes)?;
1529 let ptr = entries.as_mut_ptr().cast::<u64>();
1530 let len = entries.len();
1531 let capacity = entries.capacity();
1532 std::mem::forget(entries);
1533 unsafe { Ok(Vec::from_raw_parts(ptr, len, capacity)) }
1536 }
1537
1538 #[cfg(not(target_endian = "little"))]
1539 {
1540 let mut bytes = vec![0_u8; byte_len];
1541 memory::read_preallocated(offset, &mut bytes)?;
1542 decode_u64_table(&bytes)
1543 }
1544}
1545
1546fn write_u64_table_at_unmetered(
1547 entries: &[u64],
1548 cursor: &mut u64,
1549) -> Result<u64, StableMemoryError> {
1550 if entries.is_empty() {
1551 return Ok(0);
1552 }
1553 let offset = *cursor;
1554 let byte_len = entries
1555 .len()
1556 .checked_mul(8)
1557 .ok_or(StableMemoryError::OffsetOverflow)?;
1558 #[cfg(target_endian = "little")]
1559 {
1560 let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1562 memory::write_prechecked_unmetered(offset, bytes)?;
1563 *cursor = checked_add(
1564 offset,
1565 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1566 )?;
1567 Ok(offset)
1568 }
1569
1570 #[cfg(not(target_endian = "little"))]
1571 {
1572 let mut bytes = vec![0_u8; byte_len];
1573 for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1574 chunk.copy_from_slice(&entry.to_le_bytes());
1575 }
1576 memory::write_prechecked_unmetered(offset, &bytes)?;
1577 *cursor = checked_add(
1578 offset,
1579 u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1580 )?;
1581 Ok(offset)
1582 }
1583}
1584
1585#[cfg(not(target_endian = "little"))]
1586fn decode_u64_table(bytes: &[u8]) -> Result<Vec<u64>, StableMemoryError> {
1587 if !bytes.len().is_multiple_of(8) {
1588 return Err(StableMemoryError::OffsetOverflow);
1589 }
1590 let mut entries = Vec::with_capacity(bytes.len() / 8);
1591 for chunk in bytes.chunks_exact(8) {
1592 entries.push(u64::from_le_bytes([
1593 chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
1594 ]));
1595 }
1596 Ok(entries)
1597}
1598
1599fn imported_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1600 let count = page_count_for_size(block.import_total_size)?;
1601 let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1602 let mut entries = Vec::with_capacity(capacity);
1603 for page_no in 0..count {
1604 entries.push(checked_add(
1605 block.import_base_offset,
1606 page_no
1607 .checked_mul(page_size())
1608 .ok_or(StableMemoryError::OffsetOverflow)?,
1609 )?);
1610 }
1611 Ok(entries)
1612}
1613
1614fn checksum_logical_range(block: &Superblock, len: u64) -> Result<u64, StableMemoryError> {
1615 let mut offset = 0_u64;
1616 let mut hash = fnv1a64(&[]);
1617 while offset < len {
1618 let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1619 let copied_len =
1620 usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1621 let mut bytes = vec![0_u8; copied_len];
1622 read_logical_range(block, offset, &mut bytes)?;
1623 hash = fold_fnv1a64(hash, &bytes);
1624 offset += chunk_len;
1625 }
1626 Ok(hash)
1627}
1628
1629fn checksum_physical_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
1630 let mut offset = 0_u64;
1631 let mut hash = fnv1a64(&[]);
1632 while offset < len {
1633 let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1634 let copied_len =
1635 usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1636 let mut bytes = vec![0_u8; copied_len];
1637 memory::read_preallocated(checked_add(base_offset, offset)?, &mut bytes)?;
1638 hash = fold_fnv1a64(hash, &bytes);
1639 offset += chunk_len;
1640 }
1641 Ok(hash)
1642}
1643
1644fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
1645 block.flags &= !FLAG_IMPORTING;
1646 block.import_expected_checksum = 0;
1647 block.import_written_until = 0;
1648 block.import_total_size = 0;
1649 block.import_base_offset = 0;
1650 block.store()?;
1651 invalidate_read_cache();
1652 Ok(())
1653}
1654
1655fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
1656 checked_add(block.import_base_offset, offset)
1657}
1658
1659fn active_page_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1660 page_count_for_size(block.db_size)
1661}
1662
1663fn active_segment_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1664 Ok(block.page_count)
1665}
1666
1667fn read_cache_key(block: &Superblock) -> ReadCacheKey {
1668 ReadCacheKey {
1669 page_table_offset: block.page_table_offset,
1670 page_count: block.page_count,
1671 db_size: block.db_size,
1672 last_tx_id: block.last_tx_id,
1673 }
1674}
1675
1676fn segment_count_for_pages(page_count: u64) -> Result<u64, StableMemoryError> {
1677 Ok(page_count.div_ceil(SEGMENT_PAGE_COUNT))
1678}
1679
1680fn segment_no(page_no: u64) -> u64 {
1681 page_no / SEGMENT_PAGE_COUNT
1682}
1683
1684fn segment_index(page_no: u64) -> Result<usize, StableMemoryError> {
1685 usize::try_from(page_no % SEGMENT_PAGE_COUNT).map_err(|_| StableMemoryError::OffsetOverflow)
1686}
1687
1688fn segment_page_count_usize() -> usize {
1689 usize::try_from(SEGMENT_PAGE_COUNT).expect("segment page count fits usize")
1690}
1691
1692fn segment_table_len() -> usize {
1693 segment_page_count_usize() * 8
1694}
1695
1696fn segment_table_bytes() -> Result<u64, StableMemoryError> {
1697 u64::try_from(segment_table_len()).map_err(|_| StableMemoryError::OffsetOverflow)
1698}
1699
1700fn root_table_bytes(entry_count: u64) -> Result<u64, StableMemoryError> {
1701 entry_count
1702 .checked_mul(PAGE_TABLE_ENTRY_LEN)
1703 .ok_or(StableMemoryError::OffsetOverflow)
1704}
1705
1706fn entries_len_u64<T>(entries: &[T]) -> Result<u64, StableMemoryError> {
1707 u64::try_from(entries.len()).map_err(|_| StableMemoryError::OffsetOverflow)
1708}
1709
1710fn append_base() -> Result<u64, StableMemoryError> {
1711 memory::size_pages()
1712 .checked_mul(STABLE_PAGE_SIZE)
1713 .ok_or(StableMemoryError::OffsetOverflow)
1714}
1715
1716fn page_size() -> u64 {
1717 u64::from(SQLITE_PAGE_SIZE)
1718}
1719
1720fn page_len() -> usize {
1721 usize::try_from(SQLITE_PAGE_SIZE).expect("SQLite page size fits usize")
1722}
1723
1724fn zero_page() -> Vec<u8> {
1725 vec![0_u8; page_len()]
1726}
1727
1728fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
1729 left.checked_add(right)
1730 .ok_or(StableMemoryError::OffsetOverflow)
1731}
1732
1733fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
1734 for byte in bytes {
1735 hash ^= u64::from(*byte);
1736 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
1737 }
1738 hash
1739}
1740
1741#[cfg(test)]
1742fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1743 let Ok(context) = memory::active_context_id() else {
1744 return Ok(());
1745 };
1746 FAILPOINTS.with(|slot| {
1747 let mut slot = slot.borrow_mut();
1748 if slot.get(&context).copied() == Some(failpoint) {
1749 slot.remove(&context);
1750 Err(StableMemoryError::Failpoint(failpoint.name()))
1751 } else {
1752 Ok(())
1753 }
1754 })
1755}
1756
1757#[cfg(not(test))]
1758fn hit_failpoint(_failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1759 Ok(())
1760}
1761
1762#[cfg(test)]
1763impl StableBlobFailpoint {
1764 fn name(self) -> &'static str {
1765 match self {
1766 Self::OverlayWrite => "before overlay write",
1767 Self::OverlayTruncate => "before overlay truncate",
1768 Self::CommitCapacity => "before commit capacity",
1769 Self::CommitChunkWrite => "before commit page write",
1770 Self::CommitPageTableWrite => "before commit page table write",
1771 Self::CommitSuperblockStore => "before commit superblock store",
1772 }
1773 }
1774}
1775
1776#[cfg(test)]
1777mod tests {
1778 use super::*;
1779 use proptest::prelude::*;
1780 use proptest::test_runner::{Config, TestRunner};
1781 use std::collections::BTreeSet;
1782
1783 #[test]
1784 fn layout_math_matches_expected_boundaries() {
1785 assert_eq!(page_count_for_size(0).unwrap(), 0);
1786 assert_eq!(page_count_for_size(1).unwrap(), 1);
1787 assert_eq!(page_count_for_size(page_size()).unwrap(), 1);
1788 assert_eq!(page_count_for_size(page_size() + 1).unwrap(), 2);
1789
1790 assert_eq!(segment_count_for_pages(0).unwrap(), 0);
1791 assert_eq!(segment_count_for_pages(1).unwrap(), 1);
1792 assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT).unwrap(), 1);
1793 assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT + 1).unwrap(), 2);
1794
1795 assert_eq!(segment_no(SEGMENT_PAGE_COUNT), 1);
1796 assert_eq!(segment_index(SEGMENT_PAGE_COUNT - 1).unwrap(), 255);
1797 assert_eq!(segment_index(SEGMENT_PAGE_COUNT).unwrap(), 0);
1798 assert_eq!(root_table_bytes(2).unwrap(), 16);
1799 }
1800
1801 #[test]
1802 fn layout_math_rejects_u64_max_overflow_boundaries() {
1803 assert!(matches!(
1804 root_table_bytes(u64::MAX),
1805 Err(StableMemoryError::OffsetOverflow)
1806 ));
1807 assert!(matches!(
1808 checked_add(u64::MAX, 1),
1809 Err(StableMemoryError::OffsetOverflow)
1810 ));
1811
1812 let mut block = Superblock::fresh();
1813 block.import_base_offset = u64::MAX;
1814 assert!(matches!(
1815 import_offset(&block, 1),
1816 Err(StableMemoryError::OffsetOverflow)
1817 ));
1818
1819 block.import_base_offset = u64::MAX - page_size() + 1;
1820 block.import_total_size = page_size() + 1;
1821 assert!(matches!(
1822 imported_page_table(&block),
1823 Err(StableMemoryError::OffsetOverflow)
1824 ));
1825 }
1826
1827 #[test]
1828 fn pbt_layout_math_matches_verus_model() {
1829 let mut runner = TestRunner::new(Config {
1830 cases: 512,
1831 ..Config::default()
1832 });
1833
1834 runner
1835 .run(
1836 &(
1837 boundary_size_strategy(),
1838 boundary_page_strategy(),
1839 boundary_entry_strategy(),
1840 ),
1841 |(size, page_no, entries)| {
1842 let page_count = page_count_for_size(size).unwrap();
1843 let page_size = u128::from(page_size());
1844 if size == 0 {
1845 prop_assert_eq!(page_count, 0);
1846 } else {
1847 prop_assert!(u128::from(page_count - 1) * page_size < u128::from(size));
1848 prop_assert!(u128::from(size) <= u128::from(page_count) * page_size);
1849 }
1850
1851 let segment_count = segment_count_for_pages(page_count).unwrap();
1852 if page_count == 0 {
1853 prop_assert_eq!(segment_count, 0);
1854 } else {
1855 prop_assert!(
1856 u128::from(segment_count - 1) * u128::from(SEGMENT_PAGE_COUNT)
1857 < u128::from(page_count)
1858 );
1859 prop_assert!(
1860 u128::from(page_count)
1861 <= u128::from(segment_count) * u128::from(SEGMENT_PAGE_COUNT)
1862 );
1863 }
1864
1865 let index = segment_index(page_no).unwrap();
1866 prop_assert!(index < segment_page_count_usize());
1867 prop_assert_eq!(
1868 u128::from(segment_no(page_no)) * u128::from(SEGMENT_PAGE_COUNT)
1869 + index as u128,
1870 u128::from(page_no)
1871 );
1872
1873 match root_table_bytes(entries) {
1874 Ok(bytes) => prop_assert_eq!(bytes, entries * PAGE_TABLE_ENTRY_LEN),
1875 Err(StableMemoryError::OffsetOverflow) => {
1876 prop_assert!(entries.checked_mul(PAGE_TABLE_ENTRY_LEN).is_none());
1877 }
1878 Err(error) => return Err(TestCaseError::fail(error.to_string())),
1879 }
1880 Ok(())
1881 },
1882 )
1883 .unwrap();
1884 }
1885
1886 fn boundary_size_strategy() -> impl Strategy<Value = u64> {
1887 let page = page_size();
1888 let segment_bytes = SEGMENT_PAGE_COUNT * page;
1889 prop_oneof![
1890 any::<u64>(),
1891 prop::sample::select(boundary_values(&[
1892 0,
1893 1,
1894 page - 1,
1895 page,
1896 page + 1,
1897 segment_bytes - 1,
1898 segment_bytes,
1899 segment_bytes + 1,
1900 u64::MAX,
1901 ])),
1902 ]
1903 }
1904
1905 fn boundary_page_strategy() -> impl Strategy<Value = u64> {
1906 prop_oneof![
1907 any::<u64>(),
1908 prop::sample::select(boundary_values(&[
1909 0,
1910 1,
1911 SEGMENT_PAGE_COUNT - 1,
1912 SEGMENT_PAGE_COUNT,
1913 SEGMENT_PAGE_COUNT + 1,
1914 u64::MAX,
1915 ])),
1916 ]
1917 }
1918
1919 fn boundary_entry_strategy() -> impl Strategy<Value = u64> {
1920 let max_without_overflow = u64::MAX / PAGE_TABLE_ENTRY_LEN;
1921 prop_oneof![
1922 any::<u64>(),
1923 prop::sample::select(boundary_values(&[
1924 0,
1925 1,
1926 SEGMENT_PAGE_COUNT - 1,
1927 SEGMENT_PAGE_COUNT,
1928 SEGMENT_PAGE_COUNT + 1,
1929 max_without_overflow - 1,
1930 max_without_overflow,
1931 max_without_overflow + 1,
1932 u64::MAX - 1,
1933 u64::MAX,
1934 ])),
1935 ]
1936 }
1937
1938 fn boundary_values(values: &[u64]) -> Vec<u64> {
1939 values
1940 .iter()
1941 .flat_map(|value| [value.saturating_sub(1), *value, value.saturating_add(1)])
1942 .collect()
1943 }
1944
1945 #[test]
1946 fn fnv_fold_matches_one_pass_for_multiple_partitions() {
1947 let bytes: Vec<u8> = (0..97)
1948 .map(|index| (index as u8).wrapping_mul(37).wrapping_add(11))
1949 .collect();
1950 let expected = fnv1a64(&bytes);
1951
1952 for split in [0_usize, 1, 2, 7, 31, 64, bytes.len()] {
1953 let split = split.min(bytes.len());
1954 let mut hash = fnv1a64(&[]);
1955 hash = fold_fnv1a64(hash, &bytes[..split]);
1956 hash = fold_fnv1a64(hash, &bytes[split..]);
1957 assert_eq!(hash, expected);
1958 }
1959
1960 let mut hash = fnv1a64(&[]);
1961 for chunk in bytes.chunks(13) {
1962 hash = fold_fnv1a64(hash, chunk);
1963 }
1964 assert_eq!(hash, expected);
1965 }
1966
1967 #[test]
1968 #[serial_test::serial]
1969 fn page_map_commit_tracks_dirty_page_offsets() {
1970 crate::stable::memory::reset_for_tests();
1971 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
1972 invalidate_read_cache();
1973
1974 let page_zero = vec![1_u8; page_len()];
1975 let page_later = vec![2_u8; page_len()];
1976 let later_page_no = SEGMENT_PAGE_COUNT + 1;
1977 write_at(0, &page_zero).unwrap();
1978 write_at(later_page_no * page_size(), &page_later).unwrap();
1979
1980 let block = Superblock::load().unwrap();
1981 let root = read_root_table(&block).unwrap();
1982 let table = read_page_table(&block).unwrap();
1983 let expected_pages = active_page_count(&block).unwrap();
1984 let expected_segments = segment_count_for_pages(expected_pages).unwrap();
1985
1986 assert_eq!(root.len() as u64, expected_segments);
1987 assert_eq!(table.len() as u64, expected_pages);
1988 assert_ne!(table[0], 0);
1989 assert_ne!(table[later_page_no as usize], 0);
1990
1991 let old_page_zero_offset = table[0];
1992 let updated_page_zero = vec![3_u8; page_len()];
1993 write_at(0, &updated_page_zero).unwrap();
1994 let updated_table = read_page_table(&Superblock::load().unwrap()).unwrap();
1995 let mut out = vec![0_u8; page_len()];
1996 read_base_at(0, &mut out).unwrap();
1997
1998 assert_ne!(updated_table[0], old_page_zero_offset);
1999 assert_eq!(out, updated_page_zero);
2000 }
2001
2002 #[test]
2003 #[serial_test::serial]
2004 fn page_map_commit_tracks_multi_segment_dirty_and_clean_pages() {
2005 crate::stable::memory::reset_for_tests();
2006 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2007 invalidate_read_cache();
2008
2009 let clean_page_no = 1;
2010 let later_page_no = SEGMENT_PAGE_COUNT + 1;
2011 write_at(0, &vec![1_u8; page_len()]).unwrap();
2012 write_at(clean_page_no * page_size(), &vec![2_u8; page_len()]).unwrap();
2013 write_at(later_page_no * page_size(), &vec![3_u8; page_len()]).unwrap();
2014
2015 let before = Superblock::load().unwrap();
2016 let before_root = read_root_table(&before).unwrap();
2017 let before_table = read_page_table(&before).unwrap();
2018
2019 begin_update().unwrap();
2020 write_at(0, &vec![4_u8; page_len()]).unwrap();
2021 write_at(later_page_no * page_size(), &vec![5_u8; page_len()]).unwrap();
2022 commit_update().unwrap();
2023
2024 let after = Superblock::load().unwrap();
2025 let after_root = read_root_table(&after).unwrap();
2026 let after_table = read_page_table(&after).unwrap();
2027
2028 assert_eq!(after_root.len(), after.page_count as usize);
2029 assert_eq!(after_root.len(), before_root.len());
2030 assert_ne!(after_table[0], before_table[0]);
2031 assert_eq!(
2032 after_table[clean_page_no as usize],
2033 before_table[clean_page_no as usize]
2034 );
2035 assert_ne!(
2036 after_table[later_page_no as usize],
2037 before_table[later_page_no as usize]
2038 );
2039 }
2040
2041 #[test]
2042 #[serial_test::serial]
2043 fn page_map_commit_zeroes_truncated_tail_slots() {
2044 crate::stable::memory::reset_for_tests();
2045 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2046 invalidate_read_cache();
2047
2048 write_at(0, &vec![1_u8; page_len()]).unwrap();
2049 write_at(page_size(), &vec![2_u8; page_len()]).unwrap();
2050 write_at(2 * page_size(), &vec![3_u8; page_len()]).unwrap();
2051 truncate(page_size()).unwrap();
2052
2053 let block = Superblock::load().unwrap();
2054 let root = read_root_table(&block).unwrap();
2055 let segment = read_segment_table(&block, &root, 0).unwrap();
2056
2057 assert_eq!(block.db_size, page_size());
2058 assert!(segment[0] != 0);
2059 assert_eq!(segment[1], 0);
2060 assert_eq!(segment[2], 0);
2061 }
2062
2063 #[test]
2064 #[serial_test::serial]
2065 fn compact_keeps_zero_pages_and_densifies_offsets_across_segments() {
2066 crate::stable::memory::reset_for_tests();
2067 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2068 invalidate_read_cache();
2069
2070 let later_page_no = SEGMENT_PAGE_COUNT + 2;
2071 let first_page = vec![7_u8; page_len()];
2072 let later_page = vec![9_u8; page_len()];
2073 write_at(0, &first_page).unwrap();
2074 write_at(later_page_no * page_size(), &later_page).unwrap();
2075
2076 compact().unwrap();
2077
2078 let block = Superblock::load().unwrap();
2079 let root = read_root_table(&block).unwrap();
2080 let table = read_page_table(&block).unwrap();
2081 let mut first_out = vec![0_u8; page_len()];
2082 let mut later_out = vec![0_u8; page_len()];
2083
2084 read_base_at(0, &mut first_out).unwrap();
2085 read_base_at(later_page_no * page_size(), &mut later_out).unwrap();
2086
2087 assert_eq!(root.len() as u64, block.page_count);
2088 assert_eq!(table.len() as u64, active_page_count(&block).unwrap());
2089 assert_ne!(table[0], 0);
2090 assert_eq!(table[1], 0);
2091 assert_eq!(table[later_page_no as usize], table[0] + page_size());
2092 assert_eq!(first_out, first_page);
2093 assert_eq!(later_out, later_page);
2094 }
2095
2096 #[test]
2097 #[serial_test::serial]
2098 fn single_segment_fast_path_preserves_table_after_expand_only_commit() {
2099 crate::stable::memory::reset_for_tests();
2100 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2101 invalidate_read_cache();
2102
2103 write_at(0, &[0]).unwrap();
2104 truncate(page_size() * 4).unwrap();
2105 truncate(page_size() * 4 + 1).unwrap();
2106
2107 let block = Superblock::load().unwrap();
2108 let table = read_page_table(&block).unwrap();
2109 let mut first = [1_u8; 1];
2110 let mut expanded_tail = [1_u8; 1];
2111
2112 read_base_at(0, &mut first).unwrap();
2113 read_base_at(page_size() * 4, &mut expanded_tail).unwrap();
2114
2115 assert_eq!(block.db_size, page_size() * 4 + 1);
2116 assert_ne!(table[0], 0);
2117 assert_eq!(table[1], 0);
2118 assert_ne!(table[4], 0);
2119 assert_eq!(first, [0]);
2120 assert_eq!(expanded_tail, [0]);
2121 }
2122
2123 #[test]
2124 #[serial_test::serial]
2125 fn page_table_u64_encoding_is_little_endian_and_round_trips() {
2126 crate::stable::memory::reset_for_tests();
2127 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2128 invalidate_read_cache();
2129
2130 let entries = [
2131 0_u64,
2132 1,
2133 0x0102_0304_0506_0708,
2134 0xf1f2_f3f4_f5f6_f7f8,
2135 u64::MAX,
2136 ];
2137 let mut cursor = 128_u64;
2138 let expected_len = u64::try_from(entries.len() * 8).unwrap();
2139 crate::stable::memory::ensure_capacity(cursor + expected_len).unwrap();
2140
2141 let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2142 let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2143 let mut encoded = vec![0_u8; entries.len() * 8];
2144 crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2145 let expected = entries
2146 .iter()
2147 .flat_map(|entry| entry.to_le_bytes())
2148 .collect::<Vec<_>>();
2149
2150 assert_eq!(offset, 128);
2151 assert_eq!(cursor, 128 + expected_len);
2152 assert_eq!(decoded, entries);
2153 assert_eq!(encoded, expected);
2154
2155 let mut empty_cursor = cursor;
2156 assert_eq!(write_u64_table_at(&[], &mut empty_cursor).unwrap(), 0);
2157 assert_eq!(empty_cursor, cursor);
2158 assert!(read_u64_table_at(cursor, 0).unwrap().is_empty());
2159 }
2160
2161 #[test]
2162 #[serial_test::serial]
2163 fn pbt_page_table_u64_encoding_round_trips() {
2164 let mut runner = TestRunner::new(Config {
2165 cases: 128,
2166 ..Config::default()
2167 });
2168
2169 runner
2170 .run(
2171 &proptest::collection::vec(any::<u64>(), 0..=512),
2172 |entries| {
2173 crate::stable::memory::reset_for_tests();
2174 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2175 invalidate_read_cache();
2176
2177 let mut cursor = 128_u64;
2178 let byte_len = entries.len().checked_mul(8).unwrap();
2179 let end = cursor + u64::try_from(byte_len).unwrap();
2180 crate::stable::memory::ensure_capacity(end).unwrap();
2181
2182 let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2183 let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2184 prop_assert_eq!(decoded, entries.clone());
2185 prop_assert_eq!(cursor, end);
2186
2187 let mut encoded = vec![0_u8; byte_len];
2188 crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2189 let expected = entries
2190 .iter()
2191 .flat_map(|entry| entry.to_le_bytes())
2192 .collect::<Vec<_>>();
2193 prop_assert_eq!(encoded, expected);
2194 Ok(())
2195 },
2196 )
2197 .unwrap();
2198 }
2199
2200 #[test]
2201 #[serial_test::serial]
2202 fn pbt_compact_preserves_sparse_page_model() {
2203 let mut runner = TestRunner::new(Config {
2204 cases: 32,
2205 ..Config::default()
2206 });
2207
2208 runner
2209 .run(
2210 &proptest::collection::vec(prop::option::of(any::<u8>()), 0..=300),
2211 |pages| {
2212 crate::stable::memory::reset_for_tests();
2213 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2214 invalidate_read_cache();
2215
2216 let active_len = pages
2217 .iter()
2218 .rposition(Option::is_some)
2219 .map(|index| index + 1)
2220 .unwrap_or(0);
2221 for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2222 if let Some(byte) = byte {
2223 write_at(
2224 u64::try_from(page_no).unwrap() * page_size(),
2225 &vec![*byte; page_len()],
2226 )
2227 .unwrap();
2228 }
2229 }
2230
2231 compact().unwrap();
2232 let block = Superblock::load().unwrap();
2233 prop_assert_eq!(
2234 block.db_size,
2235 u64::try_from(active_len).unwrap() * page_size()
2236 );
2237 let table = read_page_table(&block).unwrap();
2238 prop_assert_eq!(table.len(), active_len);
2239
2240 let mut first_compacted_offset = None;
2241 let mut non_zero_seen = 0_u64;
2242 for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2243 let entry = table[page_no];
2244 let mut page = vec![0_u8; page_len()];
2245 read_base_at(u64::try_from(page_no).unwrap() * page_size(), &mut page)
2246 .unwrap();
2247
2248 if let Some(byte) = byte {
2249 let base = *first_compacted_offset.get_or_insert(entry);
2250 prop_assert_ne!(entry, 0);
2251 prop_assert_eq!(entry, base + non_zero_seen * page_size());
2252 prop_assert_eq!(page, vec![*byte; page_len()]);
2253 non_zero_seen += 1;
2254 } else {
2255 prop_assert_eq!(entry, 0);
2256 prop_assert_eq!(page, vec![0_u8; page_len()]);
2257 }
2258 }
2259 Ok(())
2260 },
2261 )
2262 .unwrap();
2263 }
2264
2265 #[derive(Clone, Debug)]
2266 enum BlobOp {
2267 Write { offset: u64, len: usize, byte: u8 },
2268 Truncate { size: u64 },
2269 Compact,
2270 }
2271
2272 #[test]
2273 #[serial_test::serial]
2274 fn pbt_blob_operations_match_logical_model_across_compact() {
2275 let mut runner = TestRunner::new(Config {
2276 cases: 48,
2277 ..Config::default()
2278 });
2279
2280 runner
2281 .run(&blob_operation_sequence(), |operations| {
2282 crate::stable::memory::reset_for_tests();
2283 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2284 invalidate_read_cache();
2285
2286 let mut model = Vec::new();
2287 let mut materialized = BTreeSet::new();
2288 assert_blob_model(&model, &materialized, false)?;
2289
2290 for operation in operations {
2291 let compacted = apply_blob_op(operation, &mut model, &mut materialized)?;
2292 assert_blob_model(&model, &materialized, compacted)?;
2293 }
2294 Ok(())
2295 })
2296 .unwrap();
2297 }
2298
2299 fn blob_operation_sequence() -> impl Strategy<Value = Vec<BlobOp>> {
2300 let write = (blob_offset_strategy(), blob_len_strategy(), any::<u8>())
2301 .prop_map(|(offset, len, byte)| BlobOp::Write { offset, len, byte });
2302 let truncate = blob_offset_strategy().prop_map(|size| BlobOp::Truncate { size });
2303 proptest::collection::vec(prop_oneof![write, truncate, Just(BlobOp::Compact)], 0..=48)
2304 }
2305
2306 fn blob_offset_strategy() -> impl Strategy<Value = u64> {
2307 let limit = blob_model_limit();
2308 let page = page_size();
2309 let segment = SEGMENT_PAGE_COUNT * page;
2310 prop_oneof![
2311 0_u64..=limit,
2312 prop::sample::select(boundary_values(&[
2313 0,
2314 1,
2315 page - 1,
2316 page,
2317 page + 1,
2318 segment - 1,
2319 segment,
2320 segment + 1,
2321 limit - 1,
2322 limit,
2323 ]))
2324 .prop_map(move |value| value.min(limit)),
2325 ]
2326 }
2327
2328 fn blob_len_strategy() -> impl Strategy<Value = usize> {
2329 prop_oneof![
2330 0_usize..=(page_len() * 2 + 17),
2331 prop::sample::select(vec![
2332 0,
2333 1,
2334 page_len() - 1,
2335 page_len(),
2336 page_len() + 1,
2337 page_len() * 2 + 1,
2338 ]),
2339 ]
2340 }
2341
2342 fn blob_model_limit() -> u64 {
2343 (SEGMENT_PAGE_COUNT + 3) * page_size()
2344 }
2345
2346 fn apply_blob_op(
2347 operation: BlobOp,
2348 model: &mut Vec<u8>,
2349 materialized: &mut BTreeSet<u64>,
2350 ) -> Result<bool, TestCaseError> {
2351 match operation {
2352 BlobOp::Write { offset, len, byte } => {
2353 let len = len.min(usize::try_from(blob_model_limit() - offset).unwrap());
2354 let bytes = vec![byte; len];
2355 write_at(offset, &bytes).map_err(|error| TestCaseError::fail(error.to_string()))?;
2356 if len == 0 {
2357 return Ok(false);
2358 }
2359
2360 let start = usize::try_from(offset).unwrap();
2361 let end = start + len;
2362 if model.len() < start {
2363 model.resize(start, 0);
2364 }
2365 if model.len() < end {
2366 model.resize(end, 0);
2367 }
2368 model[start..end].copy_from_slice(&bytes);
2369 mark_materialized_range(offset, len, materialized);
2370 Ok(false)
2371 }
2372 BlobOp::Truncate { size } => {
2373 truncate(size).map_err(|error| TestCaseError::fail(error.to_string()))?;
2374 let new_len = usize::try_from(size).unwrap();
2375 model.resize(new_len, 0);
2376 let active_pages = page_count_for_size(size)
2377 .map_err(|error| TestCaseError::fail(error.to_string()))?;
2378 materialized.retain(|page_no| *page_no < active_pages);
2379 if size > 0 && !size.is_multiple_of(page_size()) {
2380 materialized.insert(size / page_size());
2381 }
2382 Ok(false)
2383 }
2384 BlobOp::Compact => {
2385 compact().map_err(|error| TestCaseError::fail(error.to_string()))?;
2386 Ok(true)
2387 }
2388 }
2389 }
2390
2391 fn mark_materialized_range(offset: u64, len: usize, materialized: &mut BTreeSet<u64>) {
2392 let end = offset + u64::try_from(len).unwrap();
2393 let first_page = offset / page_size();
2394 let last_page = (end - 1) / page_size();
2395 for page_no in first_page..=last_page {
2396 materialized.insert(page_no);
2397 }
2398 }
2399
2400 fn assert_blob_model(
2401 model: &[u8],
2402 materialized: &BTreeSet<u64>,
2403 expect_compacted: bool,
2404 ) -> Result<(), TestCaseError> {
2405 let block = Superblock::load().map_err(|error| TestCaseError::fail(error.to_string()))?;
2406 prop_assert_eq!(block.db_size, u64::try_from(model.len()).unwrap());
2407
2408 if !model.is_empty() {
2409 let mut out = vec![0_u8; model.len()];
2410 read_base_at(0, &mut out).map_err(|error| TestCaseError::fail(error.to_string()))?;
2411 prop_assert_eq!(out, model);
2412 }
2413
2414 let mut tail = vec![1_u8; 32];
2415 read_base_at(u64::try_from(model.len()).unwrap(), &mut tail)
2416 .map_err(|error| TestCaseError::fail(error.to_string()))?;
2417 prop_assert_eq!(tail, vec![0_u8; 32]);
2418
2419 let table =
2420 read_page_table(&block).map_err(|error| TestCaseError::fail(error.to_string()))?;
2421 let active_pages = page_count_for_size(u64::try_from(model.len()).unwrap())
2422 .map_err(|error| TestCaseError::fail(error.to_string()))?;
2423 prop_assert_eq!(table.len(), usize::try_from(active_pages).unwrap());
2424
2425 let mut first_compacted_offset = None;
2426 let mut non_zero_seen = 0_u64;
2427 for (index, entry) in table.iter().enumerate() {
2428 let page_no = u64::try_from(index).unwrap();
2429 if materialized.contains(&page_no) {
2430 prop_assert_ne!(*entry, 0);
2431 if expect_compacted {
2432 let base = *first_compacted_offset.get_or_insert(*entry);
2433 prop_assert_eq!(*entry, base + non_zero_seen * page_size());
2434 }
2435 non_zero_seen += 1;
2436 } else {
2437 prop_assert_eq!(*entry, 0);
2438 }
2439 }
2440 Ok(())
2441 }
2442
2443 #[test]
2444 #[serial_test::serial]
2445 fn read_metrics_separate_table_cache_from_data_reads() {
2446 crate::stable::memory::reset_for_tests();
2447 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2448 invalidate_read_cache();
2449
2450 let page = vec![7_u8; page_len()];
2451 write_at(0, &page).unwrap();
2452 invalidate_read_cache();
2453 crate::read_metrics::reset_read_metrics();
2454
2455 let first = read_base_page(0).unwrap();
2456 let second = read_base_page(0).unwrap();
2457 let metrics = crate::read_metrics::read_metrics_snapshot();
2458
2459 assert_eq!(first, page);
2460 assert_eq!(second, page);
2461 assert!(metrics.stable_data_read_calls >= 2);
2462 assert!(metrics.stable_data_read_bytes >= page_size() * 2);
2463 assert!(metrics.page_table_root_misses >= 1);
2464 assert!(metrics.page_table_root_hits >= 1);
2465 assert!(metrics.page_table_segment_misses >= 1);
2466 assert!(metrics.page_table_segment_hits >= 1);
2467 #[cfg(feature = "bench-profile")]
2468 assert!(metrics.superblock_loads <= 1);
2469 #[cfg(not(feature = "bench-profile"))]
2470 assert_eq!(metrics.superblock_loads, 0);
2471 }
2472
2473 #[test]
2474 #[serial_test::serial]
2475 fn page_offset_cache_reuses_page_data_for_small_reads() {
2476 crate::stable::memory::reset_for_tests();
2477 crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2478 invalidate_read_cache();
2479
2480 let page = vec![9_u8; page_len()];
2481 write_at(0, &page).unwrap();
2482 let block = Superblock::load().unwrap();
2483 let mut cache = PageOffsetCache::new();
2484 let mut first = [0_u8; 16];
2485 let mut second = [0_u8; 16];
2486
2487 crate::read_metrics::reset_read_metrics();
2488 read_base_at_with_page_cache(&block, 0, &mut first, &mut cache).unwrap();
2489 read_base_at_with_page_cache(&block, 8, &mut second, &mut cache).unwrap();
2490 let metrics = crate::read_metrics::read_metrics_snapshot();
2491
2492 assert_eq!(first, [9_u8; 16]);
2493 assert_eq!(second, [9_u8; 16]);
2494 assert_eq!(metrics.stable_data_read_calls, 1);
2495 assert_eq!(metrics.stable_data_read_bytes, page_size());
2496 }
2497}