Skip to main content

ic_sqlite_vfs/sqlite_vfs/
stable_blob.rs

1//! Logical `/main.db` access backed by segmented stable-memory page mapping.
2//!
3//! SQLite sees a contiguous file. Internally, the active superblock points to a
4//! root table. Each root entry points to a 256-page segment table.
5
6use crate::config::{SQLITE_PAGE_SIZE, STABLE_PAGE_SIZE, SUPERBLOCK_SIZE};
7use crate::sqlite_vfs::overlay::{self, Overlay};
8use crate::stable::memory::{self, ContextId, StableMemoryError};
9use crate::stable::meta::{
10    fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
11    PAGE_MAP_LAYOUT_VERSION,
12};
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::mem::MaybeUninit;
16
17const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
18const PAGE_TABLE_ENTRY_LEN: u64 = 8;
19const SEGMENT_PAGE_COUNT: u64 = 256;
20const SEGMENT_TABLE_BYTES: u64 = SEGMENT_PAGE_COUNT * PAGE_TABLE_ENTRY_LEN;
21const SINGLE_SEGMENT_PAGE_TABLE_BYTES: u64 = SEGMENT_TABLE_BYTES + PAGE_TABLE_ENTRY_LEN;
22const READ_SEGMENT_CACHE_CAPACITY: usize = 8;
23const FILE_PAGE_OFFSET_CACHE_CAPACITY: usize = 64;
24const FILE_PAGE_DATA_CACHE_CAPACITY: usize = 8;
25const COMPACT_MIN_ORPHAN_BYTES: u64 = 16 * 1024 * 1024;
26
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub struct ChecksumRefresh {
29    pub complete: bool,
30    pub checksum: u64,
31    pub scanned_bytes: u64,
32    pub db_size: u64,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36pub struct StorageStats {
37    pub layout_version: u64,
38    pub page_count: u64,
39    pub page_table_bytes: u64,
40    pub active_bytes: u64,
41    pub allocated_bytes: u64,
42    pub orphan_bytes_estimate: u64,
43    pub orphan_ratio_basis_points: u64,
44    pub compact_recommended: bool,
45}
46
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub(crate) enum StableBlobFailpoint {
49    OverlayWrite,
50    OverlayTruncate,
51    CommitCapacity,
52    CommitChunkWrite,
53    CommitPageTableWrite,
54    CommitSuperblockStore,
55}
56
57thread_local! {
58    #[cfg(test)]
59    static FAILPOINTS: RefCell<BTreeMap<ContextId, StableBlobFailpoint>> = const { RefCell::new(BTreeMap::new()) };
60    static READ_TABLE_CACHE: RefCell<Vec<(ContextId, ReadTableCache)>> = const { RefCell::new(Vec::new()) };
61    static COMMIT_SEGMENT_CACHE: RefCell<Vec<(ContextId, CommitSegmentCache)>> = const { RefCell::new(Vec::new()) };
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65struct ReadCacheKey {
66    page_table_offset: u64,
67    page_count: u64,
68    db_size: u64,
69    last_tx_id: u64,
70}
71
72#[derive(Debug)]
73struct ReadTableCache {
74    key: Option<ReadCacheKey>,
75    root: Vec<u64>,
76    segments: Vec<CachedSegment>,
77}
78
79#[derive(Debug)]
80struct CachedSegment {
81    segment_no: u64,
82    table: Vec<u64>,
83}
84
85#[derive(Debug)]
86struct CommitSegmentCache {
87    segment_no: u64,
88    segment_offset: u64,
89    table: Vec<u64>,
90}
91
92impl ReadTableCache {
93    fn new() -> Self {
94        Self {
95            key: None,
96            root: Vec::new(),
97            segments: Vec::new(),
98        }
99    }
100
101    fn clear(&mut self) {
102        self.key = None;
103        self.root.clear();
104        self.segments.clear();
105    }
106
107    fn ensure_key(&mut self, key: ReadCacheKey) {
108        if self.key == Some(key) {
109            return;
110        }
111        self.clear();
112        self.key = Some(key);
113    }
114
115    #[inline(always)]
116    fn segment_page_offset(&mut self, segment_no: u64, index: usize) -> Option<u64> {
117        if self.segments.is_empty() {
118            return None;
119        }
120        if self.segments.len() == 1 {
121            let segment = &self.segments[0];
122            if segment.segment_no == segment_no {
123                return Some(segment.table[index]);
124            }
125            return None;
126        }
127        let position = self
128            .segments
129            .iter()
130            .position(|segment| segment.segment_no == segment_no)?;
131        let offset = Some(self.segments[position].table[index]);
132        if position + 1 != self.segments.len() {
133            let segment = self.segments.remove(position);
134            self.segments.push(segment);
135        }
136        offset
137    }
138
139    fn insert_segment(&mut self, segment_no: u64, table: Vec<u64>) {
140        if let Some(position) = self
141            .segments
142            .iter()
143            .position(|segment| segment.segment_no == segment_no)
144        {
145            self.segments.remove(position);
146        }
147        self.segments.push(CachedSegment { segment_no, table });
148        while self.segments.len() > READ_SEGMENT_CACHE_CAPACITY {
149            self.segments.remove(0);
150        }
151    }
152}
153
154#[derive(Debug)]
155pub(crate) struct PageOffsetCache {
156    entries: Vec<(u64, u64)>,
157    pages: Vec<(u64, Vec<u8>)>,
158}
159
160impl PageOffsetCache {
161    pub(crate) fn new() -> Self {
162        Self {
163            entries: Vec::with_capacity(FILE_PAGE_OFFSET_CACHE_CAPACITY),
164            pages: Vec::new(),
165        }
166    }
167
168    fn get(&self, page_no: u64) -> Option<u64> {
169        match self.entries.as_slice() {
170            [] => None,
171            [(cached_page, physical)] => (*cached_page == page_no).then_some(*physical),
172            entries => {
173                for (cached_page, physical) in entries {
174                    if *cached_page == page_no {
175                        return Some(*physical);
176                    }
177                }
178                None
179            }
180        }
181    }
182
183    fn insert(&mut self, page_no: u64, physical: u64) {
184        if self.entries.len() == FILE_PAGE_OFFSET_CACHE_CAPACITY {
185            self.entries.remove(0);
186        }
187        self.entries.push((page_no, physical));
188    }
189
190    #[inline(always)]
191    fn copy_page_slice(&self, page_no: u64, in_page: usize, dst: &mut [u8]) -> bool {
192        if self.pages.is_empty() {
193            return false;
194        }
195        if self.pages.len() == 1 {
196            let (cached_page, page) = &self.pages[0];
197            if *cached_page == page_no {
198                let end = in_page + dst.len();
199                dst.copy_from_slice(&page[in_page..end]);
200                return true;
201            }
202            return false;
203        }
204        for (cached_page, page) in &self.pages {
205            if *cached_page == page_no {
206                let end = in_page + dst.len();
207                dst.copy_from_slice(&page[in_page..end]);
208                return true;
209            }
210        }
211        false
212    }
213
214    fn insert_page(&mut self, page_no: u64, page: Vec<u8>) {
215        if self.pages.len() == FILE_PAGE_DATA_CACHE_CAPACITY {
216            self.pages.remove(0);
217        }
218        self.pages.push((page_no, page));
219    }
220}
221
222#[cfg(test)]
223pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
224    if let Ok(context) = memory::active_context_id() {
225        FAILPOINTS.with(|slot| {
226            slot.borrow_mut().insert(context, failpoint);
227        });
228    }
229}
230
231#[cfg(test)]
232pub(crate) fn clear_failpoint() {
233    FAILPOINTS.with(|slot| slot.borrow_mut().clear());
234}
235
236pub(crate) fn ensure_page_map_layout() -> Result<(), StableMemoryError> {
237    let block = Superblock::load()?;
238    if block.layout_version >= PAGE_MAP_LAYOUT_VERSION {
239        return Ok(());
240    }
241    Err(StableMemoryError::UnsupportedLayoutVersion(
242        block.layout_version,
243    ))
244}
245
246pub(crate) fn begin_update() -> Result<u64, StableMemoryError> {
247    let block = Superblock::load()?;
248    if block.layout_version < PAGE_MAP_LAYOUT_VERSION {
249        return Err(StableMemoryError::UnsupportedLayoutVersion(
250            block.layout_version,
251        ));
252    }
253    if block.is_importing() {
254        return Err(StableMemoryError::ImportAlreadyStarted);
255    }
256    overlay::begin(block.db_size)?;
257    Ok(block.db_size)
258}
259
260pub(crate) fn rollback_update() {
261    overlay::rollback();
262}
263
264#[doc(hidden)]
265pub fn invalidate_read_cache() {
266    READ_TABLE_CACHE.with(|cache| cache.borrow_mut().clear());
267    COMMIT_SEGMENT_CACHE.with(|cache| cache.borrow_mut().clear());
268}
269
270pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
271    let Some(overlay) = overlay::take() else {
272        return Ok(());
273    };
274    if overlay.is_empty() {
275        return Ok(());
276    }
277    commit_overlay(overlay, true)
278}
279
280pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
281    if let Some(result) = overlay::read_at(offset, dst) {
282        return result;
283    }
284    read_base_at(offset, dst)
285}
286
287pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
288    if dst.is_empty() {
289        return Ok(true);
290    }
291    let block = Superblock::load()?;
292    read_base_at_with_block(&block, offset, dst)
293}
294
295pub(crate) fn read_base_at_with_block(
296    block: &Superblock,
297    offset: u64,
298    dst: &mut [u8],
299) -> Result<bool, StableMemoryError> {
300    if dst.is_empty() {
301        return Ok(true);
302    }
303    if offset >= block.db_size {
304        dst.fill(0);
305        return Ok(false);
306    }
307    let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
308    if requested <= block.db_size - offset {
309        read_logical_range(block, offset, dst)?;
310        return Ok(true);
311    }
312    let copied = requested.min(block.db_size - offset);
313    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
314    read_logical_range(block, offset, &mut dst[..copied_len])?;
315    dst[copied_len..].fill(0);
316    Ok(copied == requested)
317}
318
319#[inline(always)]
320pub(crate) fn read_base_at_with_page_cache(
321    block: &Superblock,
322    offset: u64,
323    dst: &mut [u8],
324    page_offsets: &mut PageOffsetCache,
325) -> Result<bool, StableMemoryError> {
326    if dst.is_empty() {
327        return Ok(true);
328    }
329    if offset >= block.db_size {
330        dst.fill(0);
331        return Ok(false);
332    }
333    let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
334    if requested <= block.db_size - offset {
335        read_logical_range_with_page_cache(block, offset, dst, page_offsets)?;
336        return Ok(true);
337    }
338    let copied = requested.min(block.db_size - offset);
339    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
340    read_logical_range_with_page_cache(block, offset, &mut dst[..copied_len], page_offsets)?;
341    dst[copied_len..].fill(0);
342    Ok(copied == requested)
343}
344
345pub(crate) fn read_base_page(page_no: u64) -> Result<Vec<u8>, StableMemoryError> {
346    let block = Superblock::load()?;
347    let mut page = zero_page();
348    if page_no >= active_page_count(&block)? {
349        return Ok(page);
350    }
351    let physical = page_offset_for(&block, page_no)?;
352    if physical != 0 {
353        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
354        crate::read_metrics::record_stable_data_read(page.len());
355        memory::read_preallocated(physical, &mut page)?;
356    }
357    Ok(page)
358}
359
360pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
361    if let Some(result) = overlay::write_at(offset, bytes) {
362        hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
363        return result;
364    }
365    if bytes.is_empty() {
366        return Ok(());
367    }
368    ensure_page_map_layout()?;
369    let mut direct = Overlay::new(Superblock::load()?.db_size);
370    direct.write_at(offset, bytes)?;
371    commit_overlay(direct, false)
372}
373
374pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
375    if let Some(result) = overlay::truncate(size) {
376        hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
377        return result;
378    }
379    ensure_page_map_layout()?;
380    let mut direct = Overlay::new(Superblock::load()?.db_size);
381    direct.truncate(size)?;
382    if direct.is_empty() {
383        return Ok(());
384    }
385    commit_overlay(direct, false)
386}
387
388pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
389    if let Some(size) = overlay::file_size() {
390        return Ok(size);
391    }
392    Ok(Superblock::load()?.db_size)
393}
394
395pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
396    reject_during_update()?;
397    let block = Superblock::load()?;
398    if offset >= block.db_size {
399        return Ok(Vec::new());
400    }
401    let copied = len.min(block.db_size - offset);
402    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
403    let mut out = vec![0_u8; copied_len];
404    read_logical_range(&block, offset, &mut out)?;
405    Ok(out)
406}
407
408pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
409    reject_during_update()?;
410    let mut block = Superblock::load()?;
411    if !block.is_importing() {
412        return Err(StableMemoryError::ImportNotStarted);
413    }
414    let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
415    if offset != block.import_written_until {
416        return Err(StableMemoryError::ImportOutOfOrder {
417            offset,
418            expected: block.import_written_until,
419        });
420    }
421    let end = checked_add(offset, len)?;
422    if end > block.import_total_size {
423        return Err(StableMemoryError::ImportOutOfBounds {
424            offset,
425            len,
426            db_size: block.import_total_size,
427        });
428    }
429    memory::write(import_offset(&block, offset)?, bytes)?;
430    block.import_written_until = end;
431    block.store()?;
432    invalidate_read_cache();
433    Ok(())
434}
435
436pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
437    reject_during_update()?;
438    let mut block = Superblock::load()?;
439    if block.is_importing() {
440        return Err(StableMemoryError::ImportAlreadyStarted);
441    }
442    let import_base_offset = append_base()?;
443    checked_add(import_base_offset, total_size)?;
444    block.flags |= FLAG_IMPORTING;
445    block.clear_checksum_refresh();
446    block.import_expected_checksum = expected_checksum;
447    block.import_written_until = 0;
448    block.import_total_size = total_size;
449    block.import_base_offset = import_base_offset;
450    block.store()?;
451    invalidate_read_cache();
452    Ok(())
453}
454
455pub fn finish_import() -> Result<(), StableMemoryError> {
456    reject_during_update()?;
457    let mut block = Superblock::load()?;
458    if !block.is_importing() {
459        return Err(StableMemoryError::ImportNotStarted);
460    }
461    if block.import_written_until != block.import_total_size {
462        return Err(StableMemoryError::ImportIncomplete {
463            written_until: block.import_written_until,
464            db_size: block.import_total_size,
465        });
466    }
467    let checksum = checksum_physical_range(block.import_base_offset, block.import_total_size)?;
468    if checksum != block.import_expected_checksum {
469        let expected = block.import_expected_checksum;
470        clear_import(&mut block)?;
471        return Err(StableMemoryError::ChecksumMismatch {
472            expected,
473            actual: checksum,
474        });
475    }
476    let entries = imported_page_table(&block)?;
477    let (root_offset, root_len) = write_segmented_tables(&entries)?;
478    block.db_size = block.import_total_size;
479    block.db_base_offset = block.import_base_offset;
480    block.page_table_offset = root_offset;
481    block.page_count = root_len;
482    block.layout_version = PAGE_MAP_LAYOUT_VERSION;
483    block.flags &= !FLAG_IMPORTING;
484    block.flags &= !FLAG_CHECKSUM_STALE;
485    block.clear_checksum_refresh();
486    block.checksum = checksum;
487    block.import_expected_checksum = 0;
488    block.import_written_until = 0;
489    block.import_total_size = 0;
490    block.import_base_offset = 0;
491    block.store()?;
492    invalidate_read_cache();
493    Ok(())
494}
495
496pub fn cancel_import() -> Result<(), StableMemoryError> {
497    reject_during_update()?;
498    let mut block = Superblock::load()?;
499    if !block.is_importing() {
500        return Err(StableMemoryError::ImportNotStarted);
501    }
502    clear_import(&mut block)
503}
504
505pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
506    reject_during_update()?;
507    let checksum = checksum()?;
508    let mut block = Superblock::load()?;
509    block.checksum = checksum;
510    block.flags &= !FLAG_CHECKSUM_STALE;
511    block.clear_checksum_refresh();
512    block.store()?;
513    invalidate_read_cache();
514    Ok(checksum)
515}
516
517pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
518    reject_during_update()?;
519    if max_bytes == 0 {
520        return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
521    }
522
523    let mut block = Superblock::load()?;
524    if block.is_importing() {
525        return Err(StableMemoryError::ImportAlreadyStarted);
526    }
527    if !block.is_checksum_refreshing() {
528        block.flags |= FLAG_CHECKSUM_REFRESHING;
529        block.checksum_refresh_offset = 0;
530        block.checksum_refresh_hash = fnv1a64(&[]);
531        block.checksum_refresh_tx_id = block.last_tx_id;
532    }
533    if block.checksum_refresh_tx_id != block.last_tx_id {
534        block.clear_checksum_refresh();
535        block.store()?;
536        invalidate_read_cache();
537        return refresh_checksum_chunk(max_bytes);
538    }
539
540    let start = block.checksum_refresh_offset;
541    let end = block.db_size.min(start.saturating_add(max_bytes));
542    let mut offset = start;
543    let mut hash = block.checksum_refresh_hash;
544    while offset < end {
545        let len = (end - offset).min(CHECKSUM_CHUNK_LEN);
546        let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
547        let mut bytes = vec![0_u8; copied_len];
548        read_logical_range(&block, offset, &mut bytes)?;
549        hash = fold_fnv1a64(hash, &bytes);
550        offset += len;
551    }
552
553    block.checksum_refresh_offset = offset;
554    block.checksum_refresh_hash = hash;
555    if offset == block.db_size {
556        block.checksum = hash;
557        block.flags &= !FLAG_CHECKSUM_STALE;
558        block.clear_checksum_refresh();
559    }
560    let out = ChecksumRefresh {
561        complete: offset == block.db_size,
562        checksum: hash,
563        scanned_bytes: offset,
564        db_size: block.db_size,
565    };
566    block.store()?;
567    invalidate_read_cache();
568    Ok(out)
569}
570
571pub fn checksum() -> Result<u64, StableMemoryError> {
572    reject_during_update()?;
573    let block = Superblock::load()?;
574    checksum_logical_range(&block, block.db_size)
575}
576
577pub fn compact() -> Result<(), StableMemoryError> {
578    reject_during_update()?;
579    ensure_page_map_layout()?;
580    let block = Superblock::load()?;
581    let table = read_page_table(&block)?;
582    let mut compacted = Vec::with_capacity(table.len());
583    let mut cursor = append_base()?;
584    let non_zero_pages = table.iter().filter(|offset| **offset != 0).count();
585    let data_bytes = u64::try_from(non_zero_pages)
586        .map_err(|_| StableMemoryError::OffsetOverflow)?
587        .checked_mul(page_size())
588        .ok_or(StableMemoryError::OffsetOverflow)?;
589    memory::ensure_capacity(checked_add(cursor, data_bytes)?)?;
590
591    for offset in table {
592        if offset == 0 {
593            compacted.push(0);
594            continue;
595        }
596        let mut page = zero_page();
597        memory::read_preallocated(offset, &mut page)?;
598        memory::write_preallocated(cursor, &page)?;
599        compacted.push(cursor);
600        cursor = checked_add(cursor, page_size())?;
601    }
602
603    let (root_offset, root_len) = write_segmented_tables(&compacted)?;
604    Superblock::store_page_map_without_tx(root_offset, root_len, block.db_size)?;
605    invalidate_read_cache();
606    Ok(())
607}
608
609pub fn storage_stats() -> Result<StorageStats, StableMemoryError> {
610    let block = Superblock::load()?;
611    let table = read_page_table(&block)?;
612    let non_zero_pages = u64::try_from(table.iter().filter(|offset| **offset != 0).count())
613        .map_err(|_| StableMemoryError::OffsetOverflow)?;
614    let segment_count = active_segment_count(&block)?;
615    let root_bytes = root_table_bytes(segment_count)?;
616    let segment_bytes = segment_count
617        .checked_mul(segment_table_bytes()?)
618        .ok_or(StableMemoryError::OffsetOverflow)?;
619    let page_table_bytes = checked_add(root_bytes, segment_bytes)?;
620    let active_bytes = SUPERBLOCK_SIZE
621        .checked_add(non_zero_pages.saturating_mul(page_size()))
622        .and_then(|value| value.checked_add(page_table_bytes))
623        .ok_or(StableMemoryError::OffsetOverflow)?;
624    let allocated_bytes = memory::size_pages()
625        .checked_mul(STABLE_PAGE_SIZE)
626        .ok_or(StableMemoryError::OffsetOverflow)?;
627    let orphan_bytes_estimate = allocated_bytes.saturating_sub(active_bytes);
628    let orphan_ratio_basis_points = orphan_bytes_estimate
629        .saturating_mul(10_000)
630        .checked_div(active_bytes)
631        .unwrap_or(0);
632    Ok(StorageStats {
633        layout_version: block.layout_version,
634        page_count: active_page_count(&block)?,
635        page_table_bytes,
636        active_bytes,
637        allocated_bytes,
638        orphan_bytes_estimate,
639        orphan_ratio_basis_points,
640        compact_recommended: orphan_bytes_estimate >= active_bytes
641            && orphan_bytes_estimate >= COMPACT_MIN_ORPHAN_BYTES,
642    })
643}
644
645pub(crate) fn page_count_for_size(size: u64) -> Result<u64, StableMemoryError> {
646    Ok(size.div_ceil(page_size()))
647}
648
649#[cfg(test)]
650pub(crate) fn debug_root_table_for_tests() -> Result<Vec<u64>, StableMemoryError> {
651    let block = Superblock::load()?;
652    read_root_table(&block)
653}
654
655fn commit_overlay(overlay: Overlay, advance_tx: bool) -> Result<(), StableMemoryError> {
656    hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
657    let profile_enabled = commit_profile_enabled();
658    let block = Superblock::load()?;
659    let overlay_size = overlay.size();
660    let final_page_count = page_count_for_size(overlay_size)?;
661    let data_cursor = append_base()?;
662    debug_assert!(overlay
663        .dirty_pages()
664        .iter()
665        .all(|(page_no, _)| *page_no < final_page_count));
666    let dirty_pages = overlay.dirty_pages();
667    if let [(page_no, page)] = dirty_pages {
668        if overlay_size >= block.db_size
669            && *page_no < final_page_count
670            && final_page_count <= SEGMENT_PAGE_COUNT
671        {
672            let build_profile_start = commit_profile_start(profile_enabled);
673            let options = SinglePageCommitOptions {
674                advance_tx,
675                overlay_size,
676                data_cursor,
677                profile_enabled,
678                build_profile_start,
679            };
680            return commit_single_segment_page_overlay(&block, *page_no, page, options);
681        }
682    }
683
684    let final_segment_count = segment_count_for_pages(final_page_count)?;
685    let profile_start = commit_profile_start(profile_enabled);
686    let mut root = read_commit_root_table(&block)?;
687    commit_profile_record_load(profile_start);
688
689    let build_profile_start = commit_profile_start(profile_enabled);
690    let root_len =
691        usize::try_from(final_segment_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
692    if root.len() != root_len {
693        root.resize(root_len, 0);
694    }
695
696    if let [(page_no, page)] = dirty_pages {
697        if overlay_size >= block.db_size && *page_no < final_page_count {
698            let options = SinglePageCommitOptions {
699                advance_tx,
700                overlay_size,
701                data_cursor,
702                profile_enabled,
703                build_profile_start,
704            };
705            return commit_single_page_overlay(
706                &block,
707                final_segment_count,
708                root,
709                *page_no,
710                page,
711                options,
712            );
713        }
714    }
715
716    let mut segment_updates = BTreeMap::<u64, Vec<u64>>::new();
717    let mut page_cursor = data_cursor;
718
719    for (page_no, _) in dirty_pages {
720        if *page_no >= final_page_count {
721            continue;
722        }
723        let segment_no = segment_no(*page_no);
724        let index = segment_index(*page_no)?;
725        let table = load_segment_for_update(&block, &root, &mut segment_updates, segment_no)?;
726        table[index] = page_cursor;
727        page_cursor = checked_add(page_cursor, page_size())?;
728    }
729
730    if overlay_size < block.db_size {
731        clear_truncated_tail(&block, &root, &mut segment_updates, final_page_count)?;
732    }
733    commit_profile_record_build_segments(build_profile_start);
734
735    let mut table_cursor = page_cursor;
736    let root_entries_len = final_segment_count;
737    let segment_table_writes = segment_updates.len();
738    let segment_table_bytes = u64::try_from(segment_table_writes)
739        .map_err(|_| StableMemoryError::OffsetOverflow)?
740        .checked_mul(segment_table_bytes()?)
741        .ok_or(StableMemoryError::OffsetOverflow)?;
742    let page_table_bytes = checked_add(segment_table_bytes, root_table_bytes(root_entries_len)?)?;
743    let profile_start = commit_profile_start(profile_enabled);
744    memory::ensure_capacity(checked_add(table_cursor, page_table_bytes)?)?;
745    commit_profile_record_capacity(profile_start);
746
747    let profile_start = commit_profile_start(profile_enabled);
748    let mut cursor = data_cursor;
749    for (_, page) in dirty_pages {
750        hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
751        write_commit_page(cursor, page, profile_enabled)?;
752        cursor = checked_add(cursor, page_size())?;
753    }
754    commit_profile_record_page_write(profile_start);
755
756    hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
757    let profile_start = commit_profile_start(profile_enabled);
758    for (segment_no, table) in segment_updates {
759        let offset = write_commit_segment_table_at(&table, &mut table_cursor, profile_enabled)?;
760        let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
761        root[index] = offset;
762    }
763    let root_offset = write_commit_root_table_at(&root, &mut table_cursor, profile_enabled)?;
764    commit_profile_record_table_write(profile_start);
765
766    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
767    let profile_start = commit_profile_start(profile_enabled);
768    let result = store_commit_page_map(
769        advance_tx,
770        root_offset,
771        root_entries_len,
772        overlay_size,
773        profile_enabled,
774    );
775    commit_profile_record_superblock_store(profile_start);
776    result
777}
778
779#[derive(Clone, Copy)]
780struct SinglePageCommitOptions {
781    advance_tx: bool,
782    overlay_size: u64,
783    data_cursor: u64,
784    profile_enabled: bool,
785    build_profile_start: Option<u64>,
786}
787
788fn commit_single_page_overlay(
789    block: &Superblock,
790    final_segment_count: u64,
791    mut root: Vec<u64>,
792    page_no: u64,
793    page: &[u8],
794    options: SinglePageCommitOptions,
795) -> Result<(), StableMemoryError> {
796    let segment_no = segment_no(page_no);
797    let index = segment_index(page_no)?;
798    let mut table = read_commit_segment_table(block, &root, segment_no)?;
799    table[index] = options.data_cursor;
800    let page_cursor = checked_add(options.data_cursor, page_size())?;
801    commit_profile_record_build_segments(options.build_profile_start);
802
803    let root_entries_len = final_segment_count;
804    let page_table_bytes =
805        checked_add(segment_table_bytes()?, root_table_bytes(root_entries_len)?)?;
806    let profile_start = commit_profile_start(options.profile_enabled);
807    memory::ensure_capacity(checked_add(page_cursor, page_table_bytes)?)?;
808    commit_profile_record_capacity(profile_start);
809
810    hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
811    let profile_start = commit_profile_start(options.profile_enabled);
812    write_commit_page(options.data_cursor, page, options.profile_enabled)?;
813    commit_profile_record_page_write(profile_start);
814
815    hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
816    let profile_start = commit_profile_start(options.profile_enabled);
817    let mut table_cursor = page_cursor;
818    let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
819    let root_offset = if final_segment_count == 1 {
820        write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?
821    } else {
822        let root_index =
823            usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
824        root[root_index] = offset;
825        write_commit_root_table_at(&root, &mut table_cursor, options.profile_enabled)?
826    };
827    commit_profile_record_table_write(profile_start);
828
829    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
830    let profile_start = commit_profile_start(options.profile_enabled);
831    let result = store_commit_page_map(
832        options.advance_tx,
833        root_offset,
834        root_entries_len,
835        options.overlay_size,
836        options.profile_enabled,
837    );
838    commit_profile_record_superblock_store(profile_start);
839    if result.is_ok() {
840        cache_commit_segment_table(segment_no, offset, table);
841    }
842    result
843}
844
845fn commit_single_segment_page_overlay(
846    block: &Superblock,
847    page_no: u64,
848    page: &[u8],
849    options: SinglePageCommitOptions,
850) -> Result<(), StableMemoryError> {
851    let index = segment_index(page_no)?;
852    let root = read_commit_root_table(block)?;
853    let mut table = read_commit_segment_table(block, &root, 0)?;
854    table[index] = options.data_cursor;
855    let page_cursor = checked_add(options.data_cursor, page_size())?;
856    commit_profile_record_build_segments(options.build_profile_start);
857
858    let profile_start = commit_profile_start(options.profile_enabled);
859    memory::ensure_capacity(checked_add(page_cursor, SINGLE_SEGMENT_PAGE_TABLE_BYTES)?)?;
860    commit_profile_record_capacity(profile_start);
861
862    hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
863    let profile_start = commit_profile_start(options.profile_enabled);
864    memory::write_prechecked(options.data_cursor, page)?;
865    commit_profile_record_page_write(profile_start);
866
867    hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
868    let profile_start = commit_profile_start(options.profile_enabled);
869    let mut table_cursor = page_cursor;
870    let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
871    let root_offset =
872        write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?;
873    commit_profile_record_table_write(profile_start);
874
875    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
876    let profile_start = commit_profile_start(options.profile_enabled);
877    let result = store_commit_page_map(
878        options.advance_tx,
879        root_offset,
880        1,
881        options.overlay_size,
882        options.profile_enabled,
883    );
884    commit_profile_record_superblock_store(profile_start);
885    if result.is_ok() {
886        cache_commit_segment_table(0, offset, table);
887    }
888    result
889}
890
891#[cfg(any(test, debug_assertions, feature = "bench-profile"))]
892#[inline(always)]
893fn commit_profile_enabled() -> bool {
894    crate::read_metrics::metrics_enabled()
895}
896
897#[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
898#[inline(always)]
899fn commit_profile_enabled() -> bool {
900    false
901}
902
903#[inline(always)]
904fn commit_profile_start(enabled: bool) -> Option<u64> {
905    if enabled {
906        Some(crate::read_metrics::instruction_counter())
907    } else {
908        None
909    }
910}
911
912macro_rules! commit_profile_recorder {
913    ($name:ident, $record:ident) => {
914        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
915        #[inline(always)]
916        fn $name(start: Option<u64>) {
917            if let Some(start) = start {
918                crate::read_metrics::$record(
919                    crate::read_metrics::instruction_counter().saturating_sub(start),
920                );
921            }
922        }
923
924        #[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
925        #[inline(always)]
926        fn $name(_start: Option<u64>) {}
927    };
928}
929
930commit_profile_recorder!(commit_profile_record_load, record_commit_load);
931commit_profile_recorder!(
932    commit_profile_record_build_segments,
933    record_commit_build_segments
934);
935commit_profile_recorder!(commit_profile_record_capacity, record_commit_capacity);
936commit_profile_recorder!(commit_profile_record_page_write, record_commit_page_write);
937commit_profile_recorder!(commit_profile_record_table_write, record_commit_table_write);
938commit_profile_recorder!(
939    commit_profile_record_superblock_store,
940    record_commit_superblock_store
941);
942
943#[inline(always)]
944fn write_commit_page(
945    offset: u64,
946    page: &[u8],
947    profile_enabled: bool,
948) -> Result<(), StableMemoryError> {
949    if profile_enabled {
950        memory::write_prechecked(offset, page)
951    } else {
952        memory::write_prechecked_unmetered(offset, page)
953    }
954}
955
956fn store_commit_page_map(
957    advance_tx: bool,
958    root_offset: u64,
959    root_entries_len: u64,
960    overlay_size: u64,
961    profile_enabled: bool,
962) -> Result<(), StableMemoryError> {
963    match (advance_tx, profile_enabled) {
964        (true, true) => Superblock::commit_page_map(root_offset, root_entries_len, overlay_size),
965        (true, false) => {
966            Superblock::commit_page_map_unmetered(root_offset, root_entries_len, overlay_size)
967        }
968        (false, true) => {
969            Superblock::store_page_map_without_tx(root_offset, root_entries_len, overlay_size)
970        }
971        (false, false) => Superblock::store_page_map_without_tx_unmetered(
972            root_offset,
973            root_entries_len,
974            overlay_size,
975        ),
976    }
977}
978
979fn load_segment_for_update<'a>(
980    block: &Superblock,
981    root: &[u64],
982    updates: &'a mut BTreeMap<u64, Vec<u64>>,
983    segment_no: u64,
984) -> Result<&'a mut Vec<u64>, StableMemoryError> {
985    match updates.entry(segment_no) {
986        std::collections::btree_map::Entry::Occupied(entry) => Ok(entry.into_mut()),
987        std::collections::btree_map::Entry::Vacant(entry) => {
988            let table = read_segment_table(block, root, segment_no)?;
989            Ok(entry.insert(table))
990        }
991    }
992}
993
994fn clear_truncated_tail(
995    block: &Superblock,
996    root: &[u64],
997    updates: &mut BTreeMap<u64, Vec<u64>>,
998    final_page_count: u64,
999) -> Result<(), StableMemoryError> {
1000    let old_page_count = active_page_count(block)?;
1001    if final_page_count >= old_page_count || final_page_count == 0 {
1002        return Ok(());
1003    }
1004    let boundary_segment = segment_no(final_page_count);
1005    if boundary_segment >= segment_count_for_pages(final_page_count)? {
1006        return Ok(());
1007    }
1008    let start = segment_index(final_page_count)?;
1009    if start == 0 {
1010        return Ok(());
1011    }
1012    let table = load_segment_for_update(block, root, updates, boundary_segment)?;
1013    table[start..].fill(0);
1014    Ok(())
1015}
1016
1017fn reject_during_update() -> Result<(), StableMemoryError> {
1018    if overlay::is_active() {
1019        Err(StableMemoryError::UpdateInProgress)
1020    } else {
1021        Ok(())
1022    }
1023}
1024
1025fn read_logical_range(
1026    block: &Superblock,
1027    offset: u64,
1028    dst: &mut [u8],
1029) -> Result<(), StableMemoryError> {
1030    if dst.is_empty() {
1031        return Ok(());
1032    }
1033    let in_page =
1034        usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1035    if dst.len() <= page_len() - in_page {
1036        return read_logical_page_slice(block, offset / page_size(), in_page, dst);
1037    }
1038
1039    let mut copied_total = 0_usize;
1040    while copied_total < dst.len() {
1041        let absolute = checked_add(
1042            offset,
1043            u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1044        )?;
1045        let page_no = absolute / page_size();
1046        let in_page = usize::try_from(absolute % page_size())
1047            .map_err(|_| StableMemoryError::OffsetOverflow)?;
1048        let copied = (page_len() - in_page).min(dst.len() - copied_total);
1049        read_logical_page_slice(
1050            block,
1051            page_no,
1052            in_page,
1053            &mut dst[copied_total..copied_total + copied],
1054        )?;
1055        copied_total += copied;
1056    }
1057    Ok(())
1058}
1059
1060fn read_logical_range_with_page_cache(
1061    block: &Superblock,
1062    offset: u64,
1063    dst: &mut [u8],
1064    page_offsets: &mut PageOffsetCache,
1065) -> Result<(), StableMemoryError> {
1066    let in_page =
1067        usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1068    if dst.len() <= page_len() - in_page {
1069        return read_logical_page_slice_with_page_cache(
1070            block,
1071            offset / page_size(),
1072            in_page,
1073            dst,
1074            page_offsets,
1075        );
1076    }
1077
1078    let mut copied_total = 0_usize;
1079    while copied_total < dst.len() {
1080        let absolute = checked_add(
1081            offset,
1082            u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1083        )?;
1084        let page_no = absolute / page_size();
1085        let in_page = usize::try_from(absolute % page_size())
1086            .map_err(|_| StableMemoryError::OffsetOverflow)?;
1087        let copied = (page_len() - in_page).min(dst.len() - copied_total);
1088        read_logical_page_slice_with_page_cache(
1089            block,
1090            page_no,
1091            in_page,
1092            &mut dst[copied_total..copied_total + copied],
1093            page_offsets,
1094        )?;
1095        copied_total += copied;
1096    }
1097    Ok(())
1098}
1099
1100fn read_logical_page_slice(
1101    block: &Superblock,
1102    page_no: u64,
1103    in_page: usize,
1104    dst: &mut [u8],
1105) -> Result<(), StableMemoryError> {
1106    let physical = page_offset_for(block, page_no)?;
1107    if physical == 0 {
1108        dst.fill(0);
1109        return Ok(());
1110    }
1111    let stable_offset = checked_add(
1112        physical,
1113        u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1114    )?;
1115    #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1116    crate::read_metrics::record_stable_data_read(dst.len());
1117    memory::read_preallocated(stable_offset, dst)
1118}
1119
1120#[inline(always)]
1121fn read_logical_page_slice_with_page_cache(
1122    block: &Superblock,
1123    page_no: u64,
1124    in_page: usize,
1125    dst: &mut [u8],
1126    page_offsets: &mut PageOffsetCache,
1127) -> Result<(), StableMemoryError> {
1128    if dst.len() < page_len() && page_offsets.copy_page_slice(page_no, in_page, dst) {
1129        return Ok(());
1130    }
1131    let physical = match page_offsets.get(page_no) {
1132        Some(physical) => physical,
1133        None => {
1134            let physical = if block.page_table_offset == 0 {
1135                0
1136            } else {
1137                cached_page_offset_for(block, page_no)?
1138            };
1139            page_offsets.insert(page_no, physical);
1140            physical
1141        }
1142    };
1143    if physical == 0 {
1144        dst.fill(0);
1145        return Ok(());
1146    }
1147    if in_page == 0 && dst.len() == page_len() {
1148        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1149        crate::read_metrics::record_stable_data_read(dst.len());
1150        return memory::read_preallocated(physical, dst);
1151    }
1152    if dst.len() < page_len() {
1153        let mut page = zero_page();
1154        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1155        crate::read_metrics::record_stable_data_read(page.len());
1156        memory::read_preallocated(physical, &mut page)?;
1157        let end = in_page + dst.len();
1158        dst.copy_from_slice(&page[in_page..end]);
1159        page_offsets.insert_page(page_no, page);
1160        return Ok(());
1161    }
1162    let stable_offset = checked_add(
1163        physical,
1164        u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1165    )?;
1166    #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1167    crate::read_metrics::record_stable_data_read(dst.len());
1168    memory::read_preallocated(stable_offset, dst)
1169}
1170
1171fn page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1172    if page_no >= active_page_count(block)? || block.page_table_offset == 0 {
1173        return Ok(0);
1174    }
1175    cached_page_offset_for(block, page_no)
1176}
1177
1178fn read_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1179    let root = read_root_table(block)?;
1180    let count = active_page_count(block)?;
1181    let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1182    let mut entries = Vec::with_capacity(capacity);
1183    for segment_no in 0..segment_count_for_pages(count)? {
1184        let table = read_segment_table(block, &root, segment_no)?;
1185        for entry in table {
1186            if entries.len() == capacity {
1187                break;
1188            }
1189            entries.push(entry);
1190        }
1191    }
1192    Ok(entries)
1193}
1194
1195fn cached_page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1196    let context = memory::active_context_id()?;
1197    let key = read_cache_key(block);
1198    let segment_no = segment_no(page_no);
1199    let index = segment_index(page_no)?;
1200    READ_TABLE_CACHE.with(|cache| {
1201        let mut caches = cache.borrow_mut();
1202        let cache = match read_table_cache_index(&caches, context) {
1203            Some(index) => &mut caches[index].1,
1204            None => {
1205                caches.push((context, ReadTableCache::new()));
1206                &mut caches
1207                    .last_mut()
1208                    .ok_or(StableMemoryError::OffsetOverflow)?
1209                    .1
1210            }
1211        };
1212        cache.ensure_key(key);
1213        if cache.root.is_empty() {
1214            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1215            crate::read_metrics::record_page_table_root_miss();
1216            cache.root = read_root_table(block)?;
1217        } else {
1218            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1219            crate::read_metrics::record_page_table_root_hit();
1220        }
1221        let root_index =
1222            usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1223        let segment_offset = cache.root[root_index];
1224        if segment_offset == 0 {
1225            return Ok(0);
1226        }
1227        if let Some(offset) = cache.segment_page_offset(segment_no, index) {
1228            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1229            crate::read_metrics::record_page_table_segment_hit();
1230            return Ok(offset);
1231        }
1232        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1233        crate::read_metrics::record_page_table_segment_miss();
1234        let table = read_segment_table_at(segment_offset)?;
1235        let offset = table[index];
1236        cache.insert_segment(segment_no, table);
1237        Ok(offset)
1238    })
1239}
1240
1241fn read_table_cache_index(
1242    caches: &[(ContextId, ReadTableCache)],
1243    context: ContextId,
1244) -> Option<usize> {
1245    caches
1246        .iter()
1247        .position(|(stored_context, _)| *stored_context == context)
1248}
1249
1250fn read_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1251    if block.page_count == 0 {
1252        return Ok(Vec::new());
1253    }
1254    let entries_len =
1255        usize::try_from(block.page_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1256    read_u64_table_at(block.page_table_offset, entries_len)
1257}
1258
1259fn read_commit_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1260    read_root_table(block)
1261}
1262
1263fn read_segment_table(
1264    _block: &Superblock,
1265    root: &[u64],
1266    segment_no: u64,
1267) -> Result<Vec<u64>, StableMemoryError> {
1268    let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1269    let Some(offset) = root.get(index).copied() else {
1270        return Ok(vec![0_u64; segment_page_count_usize()]);
1271    };
1272    if offset == 0 {
1273        return Ok(vec![0_u64; segment_page_count_usize()]);
1274    }
1275    read_segment_table_at(offset)
1276}
1277
1278fn read_commit_segment_table(
1279    _block: &Superblock,
1280    root: &[u64],
1281    segment_no: u64,
1282) -> Result<Vec<u64>, StableMemoryError> {
1283    let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1284    let Some(offset) = root.get(index).copied() else {
1285        return Ok(vec![0_u64; segment_page_count_usize()]);
1286    };
1287    if offset == 0 {
1288        return Ok(vec![0_u64; segment_page_count_usize()]);
1289    }
1290    read_commit_segment_table_at(segment_no, offset)
1291}
1292
1293fn read_commit_segment_table_at(
1294    segment_no: u64,
1295    offset: u64,
1296) -> Result<Vec<u64>, StableMemoryError> {
1297    if offset == 0 {
1298        return Ok(vec![0_u64; segment_page_count_usize()]);
1299    }
1300    if let Some(table) = take_commit_segment_table(segment_no, offset) {
1301        return Ok(table);
1302    }
1303    read_segment_table_at(offset)
1304}
1305
1306fn take_commit_segment_table(segment_no: u64, segment_offset: u64) -> Option<Vec<u64>> {
1307    let Ok(context) = memory::active_context_id() else {
1308        return None;
1309    };
1310    COMMIT_SEGMENT_CACHE.with(|cache| {
1311        let mut cache = cache.borrow_mut();
1312        if cache.len() == 1 {
1313            let (stored_context, cached) = &cache[0];
1314            if *stored_context == context
1315                && cached.segment_no == segment_no
1316                && cached.segment_offset == segment_offset
1317            {
1318                return cache.pop().map(|(_, cached)| cached.table);
1319            }
1320            return None;
1321        }
1322        cache
1323            .iter()
1324            .position(|(stored_context, cached)| {
1325                *stored_context == context
1326                    && cached.segment_no == segment_no
1327                    && cached.segment_offset == segment_offset
1328            })
1329            .map(|position| cache.remove(position).1.table)
1330    })
1331}
1332
1333fn cache_commit_segment_table(segment_no: u64, segment_offset: u64, table: Vec<u64>) {
1334    let Ok(context) = memory::active_context_id() else {
1335        return;
1336    };
1337    COMMIT_SEGMENT_CACHE.with(|cache| {
1338        let mut cache = cache.borrow_mut();
1339        if cache.is_empty() {
1340            cache.push((
1341                context,
1342                CommitSegmentCache {
1343                    segment_no,
1344                    segment_offset,
1345                    table,
1346                },
1347            ));
1348            return;
1349        }
1350        if cache.len() == 1 {
1351            let (stored_context, cached) = &mut cache[0];
1352            if *stored_context == context {
1353                cached.segment_no = segment_no;
1354                cached.segment_offset = segment_offset;
1355                cached.table = table;
1356                return;
1357            }
1358        } else if let Some((_, cached)) = cache
1359            .iter_mut()
1360            .find(|(stored_context, _)| *stored_context == context)
1361        {
1362            cached.segment_no = segment_no;
1363            cached.segment_offset = segment_offset;
1364            cached.table = table;
1365            return;
1366        }
1367        cache.push((
1368            context,
1369            CommitSegmentCache {
1370                segment_no,
1371                segment_offset,
1372                table,
1373            },
1374        ));
1375    });
1376}
1377
1378fn read_segment_table_at(offset: u64) -> Result<Vec<u64>, StableMemoryError> {
1379    read_u64_table_at(offset, segment_page_count_usize())
1380}
1381
1382fn write_segmented_tables(entries: &[u64]) -> Result<(u64, u64), StableMemoryError> {
1383    if entries.is_empty() {
1384        return Ok((0, 0));
1385    }
1386    let root_len = segment_count_for_pages(entries_len_u64(entries)?)?;
1387    let mut cursor = append_base()?;
1388    let segment_bytes = root_len
1389        .checked_mul(segment_table_bytes()?)
1390        .ok_or(StableMemoryError::OffsetOverflow)?;
1391    let page_table_bytes = checked_add(segment_bytes, root_table_bytes(root_len)?)?;
1392    memory::ensure_capacity(checked_add(cursor, page_table_bytes)?)?;
1393    let mut root = Vec::with_capacity(
1394        usize::try_from(root_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1395    );
1396    for segment_no in 0..root_len {
1397        let start = usize::try_from(
1398            segment_no
1399                .checked_mul(SEGMENT_PAGE_COUNT)
1400                .ok_or(StableMemoryError::OffsetOverflow)?,
1401        )
1402        .map_err(|_| StableMemoryError::OffsetOverflow)?;
1403        let mut table = vec![0_u64; segment_page_count_usize()];
1404        for (offset, entry) in entries[start..]
1405            .iter()
1406            .take(segment_page_count_usize())
1407            .enumerate()
1408        {
1409            table[offset] = *entry;
1410        }
1411        root.push(write_segment_table_at(&table, &mut cursor)?);
1412    }
1413    let root_offset = write_root_table_at(&root, &mut cursor)?;
1414    Ok((root_offset, entries_len_u64(&root)?))
1415}
1416
1417#[inline(always)]
1418fn write_segment_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1419    if entries.len() == segment_page_count_usize() {
1420        return write_u64_table_at(entries, cursor);
1421    }
1422
1423    let mut table = vec![0_u64; segment_page_count_usize()];
1424    for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1425        table[index] = *entry;
1426    }
1427    write_u64_table_at(&table, cursor)
1428}
1429
1430fn write_root_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1431    write_u64_table_at(entries, cursor)
1432}
1433
1434#[inline(always)]
1435fn write_commit_segment_table_at(
1436    entries: &[u64],
1437    cursor: &mut u64,
1438    profile_enabled: bool,
1439) -> Result<u64, StableMemoryError> {
1440    if profile_enabled {
1441        write_segment_table_at(entries, cursor)
1442    } else {
1443        write_segment_table_at_unmetered(entries, cursor)
1444    }
1445}
1446
1447#[inline(always)]
1448fn write_commit_root_table_at(
1449    entries: &[u64],
1450    cursor: &mut u64,
1451    profile_enabled: bool,
1452) -> Result<u64, StableMemoryError> {
1453    if profile_enabled {
1454        write_root_table_at(entries, cursor)
1455    } else {
1456        write_u64_table_at_unmetered(entries, cursor)
1457    }
1458}
1459
1460fn write_segment_table_at_unmetered(
1461    entries: &[u64],
1462    cursor: &mut u64,
1463) -> Result<u64, StableMemoryError> {
1464    if entries.len() == segment_page_count_usize() {
1465        return write_u64_table_at_unmetered(entries, cursor);
1466    }
1467
1468    let mut table = vec![0_u64; segment_page_count_usize()];
1469    for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1470        table[index] = *entry;
1471    }
1472    write_u64_table_at_unmetered(&table, cursor)
1473}
1474
1475fn write_u64_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1476    if entries.is_empty() {
1477        return Ok(0);
1478    }
1479    let offset = *cursor;
1480    let byte_len = entries
1481        .len()
1482        .checked_mul(8)
1483        .ok_or(StableMemoryError::OffsetOverflow)?;
1484    #[cfg(target_endian = "little")]
1485    {
1486        // SAFETY: page-table encoding is little-endian u64 and the target is little-endian.
1487        let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1488        memory::write_prechecked(offset, bytes)?;
1489        *cursor = checked_add(
1490            offset,
1491            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1492        )?;
1493        Ok(offset)
1494    }
1495
1496    #[cfg(not(target_endian = "little"))]
1497    {
1498        let mut bytes = vec![0_u8; byte_len];
1499        for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1500            chunk.copy_from_slice(&entry.to_le_bytes());
1501        }
1502        memory::write_prechecked(offset, &bytes)?;
1503        *cursor = checked_add(
1504            offset,
1505            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1506        )?;
1507        Ok(offset)
1508    }
1509}
1510
1511fn read_u64_table_at(offset: u64, entries_len: usize) -> Result<Vec<u64>, StableMemoryError> {
1512    if entries_len == 0 {
1513        return Ok(Vec::new());
1514    }
1515    let byte_len = entries_len
1516        .checked_mul(8)
1517        .ok_or(StableMemoryError::OffsetOverflow)?;
1518    #[cfg(target_endian = "little")]
1519    {
1520        let mut entries = Vec::<MaybeUninit<u64>>::with_capacity(entries_len);
1521        unsafe {
1522            entries.set_len(entries_len);
1523        }
1524        // SAFETY: the buffer has `entries_len` u64 slots. The stable-memory read
1525        // fills every byte before conversion to initialized `u64` values.
1526        let bytes =
1527            unsafe { std::slice::from_raw_parts_mut(entries.as_mut_ptr().cast::<u8>(), byte_len) };
1528        memory::read_preallocated(offset, bytes)?;
1529        let ptr = entries.as_mut_ptr().cast::<u64>();
1530        let len = entries.len();
1531        let capacity = entries.capacity();
1532        std::mem::forget(entries);
1533        // SAFETY: all bytes were just initialized by `read_preallocated`, and
1534        // every bit pattern is valid for `u64`.
1535        unsafe { Ok(Vec::from_raw_parts(ptr, len, capacity)) }
1536    }
1537
1538    #[cfg(not(target_endian = "little"))]
1539    {
1540        let mut bytes = vec![0_u8; byte_len];
1541        memory::read_preallocated(offset, &mut bytes)?;
1542        decode_u64_table(&bytes)
1543    }
1544}
1545
1546fn write_u64_table_at_unmetered(
1547    entries: &[u64],
1548    cursor: &mut u64,
1549) -> Result<u64, StableMemoryError> {
1550    if entries.is_empty() {
1551        return Ok(0);
1552    }
1553    let offset = *cursor;
1554    let byte_len = entries
1555        .len()
1556        .checked_mul(8)
1557        .ok_or(StableMemoryError::OffsetOverflow)?;
1558    #[cfg(target_endian = "little")]
1559    {
1560        // SAFETY: page-table encoding is little-endian u64 and the target is little-endian.
1561        let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1562        memory::write_prechecked_unmetered(offset, bytes)?;
1563        *cursor = checked_add(
1564            offset,
1565            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1566        )?;
1567        Ok(offset)
1568    }
1569
1570    #[cfg(not(target_endian = "little"))]
1571    {
1572        let mut bytes = vec![0_u8; byte_len];
1573        for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1574            chunk.copy_from_slice(&entry.to_le_bytes());
1575        }
1576        memory::write_prechecked_unmetered(offset, &bytes)?;
1577        *cursor = checked_add(
1578            offset,
1579            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1580        )?;
1581        Ok(offset)
1582    }
1583}
1584
1585#[cfg(not(target_endian = "little"))]
1586fn decode_u64_table(bytes: &[u8]) -> Result<Vec<u64>, StableMemoryError> {
1587    if !bytes.len().is_multiple_of(8) {
1588        return Err(StableMemoryError::OffsetOverflow);
1589    }
1590    let mut entries = Vec::with_capacity(bytes.len() / 8);
1591    for chunk in bytes.chunks_exact(8) {
1592        entries.push(u64::from_le_bytes([
1593            chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
1594        ]));
1595    }
1596    Ok(entries)
1597}
1598
1599fn imported_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1600    let count = page_count_for_size(block.import_total_size)?;
1601    let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1602    let mut entries = Vec::with_capacity(capacity);
1603    for page_no in 0..count {
1604        entries.push(checked_add(
1605            block.import_base_offset,
1606            page_no
1607                .checked_mul(page_size())
1608                .ok_or(StableMemoryError::OffsetOverflow)?,
1609        )?);
1610    }
1611    Ok(entries)
1612}
1613
1614fn checksum_logical_range(block: &Superblock, len: u64) -> Result<u64, StableMemoryError> {
1615    let mut offset = 0_u64;
1616    let mut hash = fnv1a64(&[]);
1617    while offset < len {
1618        let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1619        let copied_len =
1620            usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1621        let mut bytes = vec![0_u8; copied_len];
1622        read_logical_range(block, offset, &mut bytes)?;
1623        hash = fold_fnv1a64(hash, &bytes);
1624        offset += chunk_len;
1625    }
1626    Ok(hash)
1627}
1628
1629fn checksum_physical_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
1630    let mut offset = 0_u64;
1631    let mut hash = fnv1a64(&[]);
1632    while offset < len {
1633        let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1634        let copied_len =
1635            usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1636        let mut bytes = vec![0_u8; copied_len];
1637        memory::read_preallocated(checked_add(base_offset, offset)?, &mut bytes)?;
1638        hash = fold_fnv1a64(hash, &bytes);
1639        offset += chunk_len;
1640    }
1641    Ok(hash)
1642}
1643
1644fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
1645    block.flags &= !FLAG_IMPORTING;
1646    block.import_expected_checksum = 0;
1647    block.import_written_until = 0;
1648    block.import_total_size = 0;
1649    block.import_base_offset = 0;
1650    block.store()?;
1651    invalidate_read_cache();
1652    Ok(())
1653}
1654
1655fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
1656    checked_add(block.import_base_offset, offset)
1657}
1658
1659fn active_page_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1660    page_count_for_size(block.db_size)
1661}
1662
1663fn active_segment_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1664    Ok(block.page_count)
1665}
1666
1667fn read_cache_key(block: &Superblock) -> ReadCacheKey {
1668    ReadCacheKey {
1669        page_table_offset: block.page_table_offset,
1670        page_count: block.page_count,
1671        db_size: block.db_size,
1672        last_tx_id: block.last_tx_id,
1673    }
1674}
1675
1676fn segment_count_for_pages(page_count: u64) -> Result<u64, StableMemoryError> {
1677    Ok(page_count.div_ceil(SEGMENT_PAGE_COUNT))
1678}
1679
1680fn segment_no(page_no: u64) -> u64 {
1681    page_no / SEGMENT_PAGE_COUNT
1682}
1683
1684fn segment_index(page_no: u64) -> Result<usize, StableMemoryError> {
1685    usize::try_from(page_no % SEGMENT_PAGE_COUNT).map_err(|_| StableMemoryError::OffsetOverflow)
1686}
1687
1688fn segment_page_count_usize() -> usize {
1689    usize::try_from(SEGMENT_PAGE_COUNT).expect("segment page count fits usize")
1690}
1691
1692fn segment_table_len() -> usize {
1693    segment_page_count_usize() * 8
1694}
1695
1696fn segment_table_bytes() -> Result<u64, StableMemoryError> {
1697    u64::try_from(segment_table_len()).map_err(|_| StableMemoryError::OffsetOverflow)
1698}
1699
1700fn root_table_bytes(entry_count: u64) -> Result<u64, StableMemoryError> {
1701    entry_count
1702        .checked_mul(PAGE_TABLE_ENTRY_LEN)
1703        .ok_or(StableMemoryError::OffsetOverflow)
1704}
1705
1706fn entries_len_u64<T>(entries: &[T]) -> Result<u64, StableMemoryError> {
1707    u64::try_from(entries.len()).map_err(|_| StableMemoryError::OffsetOverflow)
1708}
1709
1710fn append_base() -> Result<u64, StableMemoryError> {
1711    memory::size_pages()
1712        .checked_mul(STABLE_PAGE_SIZE)
1713        .ok_or(StableMemoryError::OffsetOverflow)
1714}
1715
1716fn page_size() -> u64 {
1717    u64::from(SQLITE_PAGE_SIZE)
1718}
1719
1720fn page_len() -> usize {
1721    usize::try_from(SQLITE_PAGE_SIZE).expect("SQLite page size fits usize")
1722}
1723
1724fn zero_page() -> Vec<u8> {
1725    vec![0_u8; page_len()]
1726}
1727
1728fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
1729    left.checked_add(right)
1730        .ok_or(StableMemoryError::OffsetOverflow)
1731}
1732
1733fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
1734    for byte in bytes {
1735        hash ^= u64::from(*byte);
1736        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
1737    }
1738    hash
1739}
1740
1741#[cfg(test)]
1742fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1743    let Ok(context) = memory::active_context_id() else {
1744        return Ok(());
1745    };
1746    FAILPOINTS.with(|slot| {
1747        let mut slot = slot.borrow_mut();
1748        if slot.get(&context).copied() == Some(failpoint) {
1749            slot.remove(&context);
1750            Err(StableMemoryError::Failpoint(failpoint.name()))
1751        } else {
1752            Ok(())
1753        }
1754    })
1755}
1756
1757#[cfg(not(test))]
1758fn hit_failpoint(_failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1759    Ok(())
1760}
1761
1762#[cfg(test)]
1763impl StableBlobFailpoint {
1764    fn name(self) -> &'static str {
1765        match self {
1766            Self::OverlayWrite => "before overlay write",
1767            Self::OverlayTruncate => "before overlay truncate",
1768            Self::CommitCapacity => "before commit capacity",
1769            Self::CommitChunkWrite => "before commit page write",
1770            Self::CommitPageTableWrite => "before commit page table write",
1771            Self::CommitSuperblockStore => "before commit superblock store",
1772        }
1773    }
1774}
1775
1776#[cfg(test)]
1777mod tests {
1778    use super::*;
1779    use proptest::prelude::*;
1780    use proptest::test_runner::{Config, TestRunner};
1781    use std::collections::BTreeSet;
1782
1783    #[test]
1784    fn layout_math_matches_expected_boundaries() {
1785        assert_eq!(page_count_for_size(0).unwrap(), 0);
1786        assert_eq!(page_count_for_size(1).unwrap(), 1);
1787        assert_eq!(page_count_for_size(page_size()).unwrap(), 1);
1788        assert_eq!(page_count_for_size(page_size() + 1).unwrap(), 2);
1789
1790        assert_eq!(segment_count_for_pages(0).unwrap(), 0);
1791        assert_eq!(segment_count_for_pages(1).unwrap(), 1);
1792        assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT).unwrap(), 1);
1793        assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT + 1).unwrap(), 2);
1794
1795        assert_eq!(segment_no(SEGMENT_PAGE_COUNT), 1);
1796        assert_eq!(segment_index(SEGMENT_PAGE_COUNT - 1).unwrap(), 255);
1797        assert_eq!(segment_index(SEGMENT_PAGE_COUNT).unwrap(), 0);
1798        assert_eq!(root_table_bytes(2).unwrap(), 16);
1799    }
1800
1801    #[test]
1802    fn layout_math_rejects_u64_max_overflow_boundaries() {
1803        assert!(matches!(
1804            root_table_bytes(u64::MAX),
1805            Err(StableMemoryError::OffsetOverflow)
1806        ));
1807        assert!(matches!(
1808            checked_add(u64::MAX, 1),
1809            Err(StableMemoryError::OffsetOverflow)
1810        ));
1811
1812        let mut block = Superblock::fresh();
1813        block.import_base_offset = u64::MAX;
1814        assert!(matches!(
1815            import_offset(&block, 1),
1816            Err(StableMemoryError::OffsetOverflow)
1817        ));
1818
1819        block.import_base_offset = u64::MAX - page_size() + 1;
1820        block.import_total_size = page_size() + 1;
1821        assert!(matches!(
1822            imported_page_table(&block),
1823            Err(StableMemoryError::OffsetOverflow)
1824        ));
1825    }
1826
1827    #[test]
1828    fn pbt_layout_math_matches_verus_model() {
1829        let mut runner = TestRunner::new(Config {
1830            cases: 512,
1831            ..Config::default()
1832        });
1833
1834        runner
1835            .run(
1836                &(
1837                    boundary_size_strategy(),
1838                    boundary_page_strategy(),
1839                    boundary_entry_strategy(),
1840                ),
1841                |(size, page_no, entries)| {
1842                    let page_count = page_count_for_size(size).unwrap();
1843                    let page_size = u128::from(page_size());
1844                    if size == 0 {
1845                        prop_assert_eq!(page_count, 0);
1846                    } else {
1847                        prop_assert!(u128::from(page_count - 1) * page_size < u128::from(size));
1848                        prop_assert!(u128::from(size) <= u128::from(page_count) * page_size);
1849                    }
1850
1851                    let segment_count = segment_count_for_pages(page_count).unwrap();
1852                    if page_count == 0 {
1853                        prop_assert_eq!(segment_count, 0);
1854                    } else {
1855                        prop_assert!(
1856                            u128::from(segment_count - 1) * u128::from(SEGMENT_PAGE_COUNT)
1857                                < u128::from(page_count)
1858                        );
1859                        prop_assert!(
1860                            u128::from(page_count)
1861                                <= u128::from(segment_count) * u128::from(SEGMENT_PAGE_COUNT)
1862                        );
1863                    }
1864
1865                    let index = segment_index(page_no).unwrap();
1866                    prop_assert!(index < segment_page_count_usize());
1867                    prop_assert_eq!(
1868                        u128::from(segment_no(page_no)) * u128::from(SEGMENT_PAGE_COUNT)
1869                            + index as u128,
1870                        u128::from(page_no)
1871                    );
1872
1873                    match root_table_bytes(entries) {
1874                        Ok(bytes) => prop_assert_eq!(bytes, entries * PAGE_TABLE_ENTRY_LEN),
1875                        Err(StableMemoryError::OffsetOverflow) => {
1876                            prop_assert!(entries.checked_mul(PAGE_TABLE_ENTRY_LEN).is_none());
1877                        }
1878                        Err(error) => return Err(TestCaseError::fail(error.to_string())),
1879                    }
1880                    Ok(())
1881                },
1882            )
1883            .unwrap();
1884    }
1885
1886    fn boundary_size_strategy() -> impl Strategy<Value = u64> {
1887        let page = page_size();
1888        let segment_bytes = SEGMENT_PAGE_COUNT * page;
1889        prop_oneof![
1890            any::<u64>(),
1891            prop::sample::select(boundary_values(&[
1892                0,
1893                1,
1894                page - 1,
1895                page,
1896                page + 1,
1897                segment_bytes - 1,
1898                segment_bytes,
1899                segment_bytes + 1,
1900                u64::MAX,
1901            ])),
1902        ]
1903    }
1904
1905    fn boundary_page_strategy() -> impl Strategy<Value = u64> {
1906        prop_oneof![
1907            any::<u64>(),
1908            prop::sample::select(boundary_values(&[
1909                0,
1910                1,
1911                SEGMENT_PAGE_COUNT - 1,
1912                SEGMENT_PAGE_COUNT,
1913                SEGMENT_PAGE_COUNT + 1,
1914                u64::MAX,
1915            ])),
1916        ]
1917    }
1918
1919    fn boundary_entry_strategy() -> impl Strategy<Value = u64> {
1920        let max_without_overflow = u64::MAX / PAGE_TABLE_ENTRY_LEN;
1921        prop_oneof![
1922            any::<u64>(),
1923            prop::sample::select(boundary_values(&[
1924                0,
1925                1,
1926                SEGMENT_PAGE_COUNT - 1,
1927                SEGMENT_PAGE_COUNT,
1928                SEGMENT_PAGE_COUNT + 1,
1929                max_without_overflow - 1,
1930                max_without_overflow,
1931                max_without_overflow + 1,
1932                u64::MAX - 1,
1933                u64::MAX,
1934            ])),
1935        ]
1936    }
1937
1938    fn boundary_values(values: &[u64]) -> Vec<u64> {
1939        values
1940            .iter()
1941            .flat_map(|value| [value.saturating_sub(1), *value, value.saturating_add(1)])
1942            .collect()
1943    }
1944
1945    #[test]
1946    fn fnv_fold_matches_one_pass_for_multiple_partitions() {
1947        let bytes: Vec<u8> = (0..97)
1948            .map(|index| (index as u8).wrapping_mul(37).wrapping_add(11))
1949            .collect();
1950        let expected = fnv1a64(&bytes);
1951
1952        for split in [0_usize, 1, 2, 7, 31, 64, bytes.len()] {
1953            let split = split.min(bytes.len());
1954            let mut hash = fnv1a64(&[]);
1955            hash = fold_fnv1a64(hash, &bytes[..split]);
1956            hash = fold_fnv1a64(hash, &bytes[split..]);
1957            assert_eq!(hash, expected);
1958        }
1959
1960        let mut hash = fnv1a64(&[]);
1961        for chunk in bytes.chunks(13) {
1962            hash = fold_fnv1a64(hash, chunk);
1963        }
1964        assert_eq!(hash, expected);
1965    }
1966
1967    #[test]
1968    #[serial_test::serial]
1969    fn page_map_commit_tracks_dirty_page_offsets() {
1970        crate::stable::memory::reset_for_tests();
1971        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
1972        invalidate_read_cache();
1973
1974        let page_zero = vec![1_u8; page_len()];
1975        let page_later = vec![2_u8; page_len()];
1976        let later_page_no = SEGMENT_PAGE_COUNT + 1;
1977        write_at(0, &page_zero).unwrap();
1978        write_at(later_page_no * page_size(), &page_later).unwrap();
1979
1980        let block = Superblock::load().unwrap();
1981        let root = read_root_table(&block).unwrap();
1982        let table = read_page_table(&block).unwrap();
1983        let expected_pages = active_page_count(&block).unwrap();
1984        let expected_segments = segment_count_for_pages(expected_pages).unwrap();
1985
1986        assert_eq!(root.len() as u64, expected_segments);
1987        assert_eq!(table.len() as u64, expected_pages);
1988        assert_ne!(table[0], 0);
1989        assert_ne!(table[later_page_no as usize], 0);
1990
1991        let old_page_zero_offset = table[0];
1992        let updated_page_zero = vec![3_u8; page_len()];
1993        write_at(0, &updated_page_zero).unwrap();
1994        let updated_table = read_page_table(&Superblock::load().unwrap()).unwrap();
1995        let mut out = vec![0_u8; page_len()];
1996        read_base_at(0, &mut out).unwrap();
1997
1998        assert_ne!(updated_table[0], old_page_zero_offset);
1999        assert_eq!(out, updated_page_zero);
2000    }
2001
2002    #[test]
2003    #[serial_test::serial]
2004    fn page_map_commit_tracks_multi_segment_dirty_and_clean_pages() {
2005        crate::stable::memory::reset_for_tests();
2006        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2007        invalidate_read_cache();
2008
2009        let clean_page_no = 1;
2010        let later_page_no = SEGMENT_PAGE_COUNT + 1;
2011        write_at(0, &vec![1_u8; page_len()]).unwrap();
2012        write_at(clean_page_no * page_size(), &vec![2_u8; page_len()]).unwrap();
2013        write_at(later_page_no * page_size(), &vec![3_u8; page_len()]).unwrap();
2014
2015        let before = Superblock::load().unwrap();
2016        let before_root = read_root_table(&before).unwrap();
2017        let before_table = read_page_table(&before).unwrap();
2018
2019        begin_update().unwrap();
2020        write_at(0, &vec![4_u8; page_len()]).unwrap();
2021        write_at(later_page_no * page_size(), &vec![5_u8; page_len()]).unwrap();
2022        commit_update().unwrap();
2023
2024        let after = Superblock::load().unwrap();
2025        let after_root = read_root_table(&after).unwrap();
2026        let after_table = read_page_table(&after).unwrap();
2027
2028        assert_eq!(after_root.len(), after.page_count as usize);
2029        assert_eq!(after_root.len(), before_root.len());
2030        assert_ne!(after_table[0], before_table[0]);
2031        assert_eq!(
2032            after_table[clean_page_no as usize],
2033            before_table[clean_page_no as usize]
2034        );
2035        assert_ne!(
2036            after_table[later_page_no as usize],
2037            before_table[later_page_no as usize]
2038        );
2039    }
2040
2041    #[test]
2042    #[serial_test::serial]
2043    fn page_map_commit_zeroes_truncated_tail_slots() {
2044        crate::stable::memory::reset_for_tests();
2045        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2046        invalidate_read_cache();
2047
2048        write_at(0, &vec![1_u8; page_len()]).unwrap();
2049        write_at(page_size(), &vec![2_u8; page_len()]).unwrap();
2050        write_at(2 * page_size(), &vec![3_u8; page_len()]).unwrap();
2051        truncate(page_size()).unwrap();
2052
2053        let block = Superblock::load().unwrap();
2054        let root = read_root_table(&block).unwrap();
2055        let segment = read_segment_table(&block, &root, 0).unwrap();
2056
2057        assert_eq!(block.db_size, page_size());
2058        assert!(segment[0] != 0);
2059        assert_eq!(segment[1], 0);
2060        assert_eq!(segment[2], 0);
2061    }
2062
2063    #[test]
2064    #[serial_test::serial]
2065    fn compact_keeps_zero_pages_and_densifies_offsets_across_segments() {
2066        crate::stable::memory::reset_for_tests();
2067        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2068        invalidate_read_cache();
2069
2070        let later_page_no = SEGMENT_PAGE_COUNT + 2;
2071        let first_page = vec![7_u8; page_len()];
2072        let later_page = vec![9_u8; page_len()];
2073        write_at(0, &first_page).unwrap();
2074        write_at(later_page_no * page_size(), &later_page).unwrap();
2075
2076        compact().unwrap();
2077
2078        let block = Superblock::load().unwrap();
2079        let root = read_root_table(&block).unwrap();
2080        let table = read_page_table(&block).unwrap();
2081        let mut first_out = vec![0_u8; page_len()];
2082        let mut later_out = vec![0_u8; page_len()];
2083
2084        read_base_at(0, &mut first_out).unwrap();
2085        read_base_at(later_page_no * page_size(), &mut later_out).unwrap();
2086
2087        assert_eq!(root.len() as u64, block.page_count);
2088        assert_eq!(table.len() as u64, active_page_count(&block).unwrap());
2089        assert_ne!(table[0], 0);
2090        assert_eq!(table[1], 0);
2091        assert_eq!(table[later_page_no as usize], table[0] + page_size());
2092        assert_eq!(first_out, first_page);
2093        assert_eq!(later_out, later_page);
2094    }
2095
2096    #[test]
2097    #[serial_test::serial]
2098    fn single_segment_fast_path_preserves_table_after_expand_only_commit() {
2099        crate::stable::memory::reset_for_tests();
2100        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2101        invalidate_read_cache();
2102
2103        write_at(0, &[0]).unwrap();
2104        truncate(page_size() * 4).unwrap();
2105        truncate(page_size() * 4 + 1).unwrap();
2106
2107        let block = Superblock::load().unwrap();
2108        let table = read_page_table(&block).unwrap();
2109        let mut first = [1_u8; 1];
2110        let mut expanded_tail = [1_u8; 1];
2111
2112        read_base_at(0, &mut first).unwrap();
2113        read_base_at(page_size() * 4, &mut expanded_tail).unwrap();
2114
2115        assert_eq!(block.db_size, page_size() * 4 + 1);
2116        assert_ne!(table[0], 0);
2117        assert_eq!(table[1], 0);
2118        assert_ne!(table[4], 0);
2119        assert_eq!(first, [0]);
2120        assert_eq!(expanded_tail, [0]);
2121    }
2122
2123    #[test]
2124    #[serial_test::serial]
2125    fn page_table_u64_encoding_is_little_endian_and_round_trips() {
2126        crate::stable::memory::reset_for_tests();
2127        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2128        invalidate_read_cache();
2129
2130        let entries = [
2131            0_u64,
2132            1,
2133            0x0102_0304_0506_0708,
2134            0xf1f2_f3f4_f5f6_f7f8,
2135            u64::MAX,
2136        ];
2137        let mut cursor = 128_u64;
2138        let expected_len = u64::try_from(entries.len() * 8).unwrap();
2139        crate::stable::memory::ensure_capacity(cursor + expected_len).unwrap();
2140
2141        let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2142        let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2143        let mut encoded = vec![0_u8; entries.len() * 8];
2144        crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2145        let expected = entries
2146            .iter()
2147            .flat_map(|entry| entry.to_le_bytes())
2148            .collect::<Vec<_>>();
2149
2150        assert_eq!(offset, 128);
2151        assert_eq!(cursor, 128 + expected_len);
2152        assert_eq!(decoded, entries);
2153        assert_eq!(encoded, expected);
2154
2155        let mut empty_cursor = cursor;
2156        assert_eq!(write_u64_table_at(&[], &mut empty_cursor).unwrap(), 0);
2157        assert_eq!(empty_cursor, cursor);
2158        assert!(read_u64_table_at(cursor, 0).unwrap().is_empty());
2159    }
2160
2161    #[test]
2162    #[serial_test::serial]
2163    fn pbt_page_table_u64_encoding_round_trips() {
2164        let mut runner = TestRunner::new(Config {
2165            cases: 128,
2166            ..Config::default()
2167        });
2168
2169        runner
2170            .run(
2171                &proptest::collection::vec(any::<u64>(), 0..=512),
2172                |entries| {
2173                    crate::stable::memory::reset_for_tests();
2174                    crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2175                    invalidate_read_cache();
2176
2177                    let mut cursor = 128_u64;
2178                    let byte_len = entries.len().checked_mul(8).unwrap();
2179                    let end = cursor + u64::try_from(byte_len).unwrap();
2180                    crate::stable::memory::ensure_capacity(end).unwrap();
2181
2182                    let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2183                    let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2184                    prop_assert_eq!(decoded, entries.clone());
2185                    prop_assert_eq!(cursor, end);
2186
2187                    let mut encoded = vec![0_u8; byte_len];
2188                    crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2189                    let expected = entries
2190                        .iter()
2191                        .flat_map(|entry| entry.to_le_bytes())
2192                        .collect::<Vec<_>>();
2193                    prop_assert_eq!(encoded, expected);
2194                    Ok(())
2195                },
2196            )
2197            .unwrap();
2198    }
2199
2200    #[test]
2201    #[serial_test::serial]
2202    fn pbt_compact_preserves_sparse_page_model() {
2203        let mut runner = TestRunner::new(Config {
2204            cases: 32,
2205            ..Config::default()
2206        });
2207
2208        runner
2209            .run(
2210                &proptest::collection::vec(prop::option::of(any::<u8>()), 0..=300),
2211                |pages| {
2212                    crate::stable::memory::reset_for_tests();
2213                    crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2214                    invalidate_read_cache();
2215
2216                    let active_len = pages
2217                        .iter()
2218                        .rposition(Option::is_some)
2219                        .map(|index| index + 1)
2220                        .unwrap_or(0);
2221                    for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2222                        if let Some(byte) = byte {
2223                            write_at(
2224                                u64::try_from(page_no).unwrap() * page_size(),
2225                                &vec![*byte; page_len()],
2226                            )
2227                            .unwrap();
2228                        }
2229                    }
2230
2231                    compact().unwrap();
2232                    let block = Superblock::load().unwrap();
2233                    prop_assert_eq!(
2234                        block.db_size,
2235                        u64::try_from(active_len).unwrap() * page_size()
2236                    );
2237                    let table = read_page_table(&block).unwrap();
2238                    prop_assert_eq!(table.len(), active_len);
2239
2240                    let mut first_compacted_offset = None;
2241                    let mut non_zero_seen = 0_u64;
2242                    for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2243                        let entry = table[page_no];
2244                        let mut page = vec![0_u8; page_len()];
2245                        read_base_at(u64::try_from(page_no).unwrap() * page_size(), &mut page)
2246                            .unwrap();
2247
2248                        if let Some(byte) = byte {
2249                            let base = *first_compacted_offset.get_or_insert(entry);
2250                            prop_assert_ne!(entry, 0);
2251                            prop_assert_eq!(entry, base + non_zero_seen * page_size());
2252                            prop_assert_eq!(page, vec![*byte; page_len()]);
2253                            non_zero_seen += 1;
2254                        } else {
2255                            prop_assert_eq!(entry, 0);
2256                            prop_assert_eq!(page, vec![0_u8; page_len()]);
2257                        }
2258                    }
2259                    Ok(())
2260                },
2261            )
2262            .unwrap();
2263    }
2264
2265    #[derive(Clone, Debug)]
2266    enum BlobOp {
2267        Write { offset: u64, len: usize, byte: u8 },
2268        Truncate { size: u64 },
2269        Compact,
2270    }
2271
2272    #[test]
2273    #[serial_test::serial]
2274    fn pbt_blob_operations_match_logical_model_across_compact() {
2275        let mut runner = TestRunner::new(Config {
2276            cases: 48,
2277            ..Config::default()
2278        });
2279
2280        runner
2281            .run(&blob_operation_sequence(), |operations| {
2282                crate::stable::memory::reset_for_tests();
2283                crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2284                invalidate_read_cache();
2285
2286                let mut model = Vec::new();
2287                let mut materialized = BTreeSet::new();
2288                assert_blob_model(&model, &materialized, false)?;
2289
2290                for operation in operations {
2291                    let compacted = apply_blob_op(operation, &mut model, &mut materialized)?;
2292                    assert_blob_model(&model, &materialized, compacted)?;
2293                }
2294                Ok(())
2295            })
2296            .unwrap();
2297    }
2298
2299    fn blob_operation_sequence() -> impl Strategy<Value = Vec<BlobOp>> {
2300        let write = (blob_offset_strategy(), blob_len_strategy(), any::<u8>())
2301            .prop_map(|(offset, len, byte)| BlobOp::Write { offset, len, byte });
2302        let truncate = blob_offset_strategy().prop_map(|size| BlobOp::Truncate { size });
2303        proptest::collection::vec(prop_oneof![write, truncate, Just(BlobOp::Compact)], 0..=48)
2304    }
2305
2306    fn blob_offset_strategy() -> impl Strategy<Value = u64> {
2307        let limit = blob_model_limit();
2308        let page = page_size();
2309        let segment = SEGMENT_PAGE_COUNT * page;
2310        prop_oneof![
2311            0_u64..=limit,
2312            prop::sample::select(boundary_values(&[
2313                0,
2314                1,
2315                page - 1,
2316                page,
2317                page + 1,
2318                segment - 1,
2319                segment,
2320                segment + 1,
2321                limit - 1,
2322                limit,
2323            ]))
2324            .prop_map(move |value| value.min(limit)),
2325        ]
2326    }
2327
2328    fn blob_len_strategy() -> impl Strategy<Value = usize> {
2329        prop_oneof![
2330            0_usize..=(page_len() * 2 + 17),
2331            prop::sample::select(vec![
2332                0,
2333                1,
2334                page_len() - 1,
2335                page_len(),
2336                page_len() + 1,
2337                page_len() * 2 + 1,
2338            ]),
2339        ]
2340    }
2341
2342    fn blob_model_limit() -> u64 {
2343        (SEGMENT_PAGE_COUNT + 3) * page_size()
2344    }
2345
2346    fn apply_blob_op(
2347        operation: BlobOp,
2348        model: &mut Vec<u8>,
2349        materialized: &mut BTreeSet<u64>,
2350    ) -> Result<bool, TestCaseError> {
2351        match operation {
2352            BlobOp::Write { offset, len, byte } => {
2353                let len = len.min(usize::try_from(blob_model_limit() - offset).unwrap());
2354                let bytes = vec![byte; len];
2355                write_at(offset, &bytes).map_err(|error| TestCaseError::fail(error.to_string()))?;
2356                if len == 0 {
2357                    return Ok(false);
2358                }
2359
2360                let start = usize::try_from(offset).unwrap();
2361                let end = start + len;
2362                if model.len() < start {
2363                    model.resize(start, 0);
2364                }
2365                if model.len() < end {
2366                    model.resize(end, 0);
2367                }
2368                model[start..end].copy_from_slice(&bytes);
2369                mark_materialized_range(offset, len, materialized);
2370                Ok(false)
2371            }
2372            BlobOp::Truncate { size } => {
2373                truncate(size).map_err(|error| TestCaseError::fail(error.to_string()))?;
2374                let new_len = usize::try_from(size).unwrap();
2375                model.resize(new_len, 0);
2376                let active_pages = page_count_for_size(size)
2377                    .map_err(|error| TestCaseError::fail(error.to_string()))?;
2378                materialized.retain(|page_no| *page_no < active_pages);
2379                if size > 0 && !size.is_multiple_of(page_size()) {
2380                    materialized.insert(size / page_size());
2381                }
2382                Ok(false)
2383            }
2384            BlobOp::Compact => {
2385                compact().map_err(|error| TestCaseError::fail(error.to_string()))?;
2386                Ok(true)
2387            }
2388        }
2389    }
2390
2391    fn mark_materialized_range(offset: u64, len: usize, materialized: &mut BTreeSet<u64>) {
2392        let end = offset + u64::try_from(len).unwrap();
2393        let first_page = offset / page_size();
2394        let last_page = (end - 1) / page_size();
2395        for page_no in first_page..=last_page {
2396            materialized.insert(page_no);
2397        }
2398    }
2399
2400    fn assert_blob_model(
2401        model: &[u8],
2402        materialized: &BTreeSet<u64>,
2403        expect_compacted: bool,
2404    ) -> Result<(), TestCaseError> {
2405        let block = Superblock::load().map_err(|error| TestCaseError::fail(error.to_string()))?;
2406        prop_assert_eq!(block.db_size, u64::try_from(model.len()).unwrap());
2407
2408        if !model.is_empty() {
2409            let mut out = vec![0_u8; model.len()];
2410            read_base_at(0, &mut out).map_err(|error| TestCaseError::fail(error.to_string()))?;
2411            prop_assert_eq!(out, model);
2412        }
2413
2414        let mut tail = vec![1_u8; 32];
2415        read_base_at(u64::try_from(model.len()).unwrap(), &mut tail)
2416            .map_err(|error| TestCaseError::fail(error.to_string()))?;
2417        prop_assert_eq!(tail, vec![0_u8; 32]);
2418
2419        let table =
2420            read_page_table(&block).map_err(|error| TestCaseError::fail(error.to_string()))?;
2421        let active_pages = page_count_for_size(u64::try_from(model.len()).unwrap())
2422            .map_err(|error| TestCaseError::fail(error.to_string()))?;
2423        prop_assert_eq!(table.len(), usize::try_from(active_pages).unwrap());
2424
2425        let mut first_compacted_offset = None;
2426        let mut non_zero_seen = 0_u64;
2427        for (index, entry) in table.iter().enumerate() {
2428            let page_no = u64::try_from(index).unwrap();
2429            if materialized.contains(&page_no) {
2430                prop_assert_ne!(*entry, 0);
2431                if expect_compacted {
2432                    let base = *first_compacted_offset.get_or_insert(*entry);
2433                    prop_assert_eq!(*entry, base + non_zero_seen * page_size());
2434                }
2435                non_zero_seen += 1;
2436            } else {
2437                prop_assert_eq!(*entry, 0);
2438            }
2439        }
2440        Ok(())
2441    }
2442
2443    #[test]
2444    #[serial_test::serial]
2445    fn read_metrics_separate_table_cache_from_data_reads() {
2446        crate::stable::memory::reset_for_tests();
2447        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2448        invalidate_read_cache();
2449
2450        let page = vec![7_u8; page_len()];
2451        write_at(0, &page).unwrap();
2452        invalidate_read_cache();
2453        crate::read_metrics::reset_read_metrics();
2454
2455        let first = read_base_page(0).unwrap();
2456        let second = read_base_page(0).unwrap();
2457        let metrics = crate::read_metrics::read_metrics_snapshot();
2458
2459        assert_eq!(first, page);
2460        assert_eq!(second, page);
2461        assert!(metrics.stable_data_read_calls >= 2);
2462        assert!(metrics.stable_data_read_bytes >= page_size() * 2);
2463        assert!(metrics.page_table_root_misses >= 1);
2464        assert!(metrics.page_table_root_hits >= 1);
2465        assert!(metrics.page_table_segment_misses >= 1);
2466        assert!(metrics.page_table_segment_hits >= 1);
2467        #[cfg(feature = "bench-profile")]
2468        assert!(metrics.superblock_loads <= 1);
2469        #[cfg(not(feature = "bench-profile"))]
2470        assert_eq!(metrics.superblock_loads, 0);
2471    }
2472
2473    #[test]
2474    #[serial_test::serial]
2475    fn page_offset_cache_reuses_page_data_for_small_reads() {
2476        crate::stable::memory::reset_for_tests();
2477        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2478        invalidate_read_cache();
2479
2480        let page = vec![9_u8; page_len()];
2481        write_at(0, &page).unwrap();
2482        let block = Superblock::load().unwrap();
2483        let mut cache = PageOffsetCache::new();
2484        let mut first = [0_u8; 16];
2485        let mut second = [0_u8; 16];
2486
2487        crate::read_metrics::reset_read_metrics();
2488        read_base_at_with_page_cache(&block, 0, &mut first, &mut cache).unwrap();
2489        read_base_at_with_page_cache(&block, 8, &mut second, &mut cache).unwrap();
2490        let metrics = crate::read_metrics::read_metrics_snapshot();
2491
2492        assert_eq!(first, [9_u8; 16]);
2493        assert_eq!(second, [9_u8; 16]);
2494        assert_eq!(metrics.stable_data_read_calls, 1);
2495        assert_eq!(metrics.stable_data_read_bytes, page_size());
2496    }
2497}