Skip to main content

ic_sqlite_vfs/sqlite_vfs/
stable_blob.rs

1//! Logical `/main.db` access backed by segmented stable-memory page mapping.
2//!
3//! SQLite sees a contiguous file. Internally, the active superblock points to a
4//! root table. Each root entry points to a 256-page segment table.
5
6use crate::config::{SQLITE_PAGE_SIZE, STABLE_PAGE_SIZE, SUPERBLOCK_SIZE};
7use crate::sqlite_vfs::overlay::{self, Overlay};
8use crate::stable::memory::{self, ContextId, StableMemoryError};
9use crate::stable::meta::{
10    fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
11    PAGE_MAP_LAYOUT_VERSION,
12};
13use std::cell::RefCell;
14use std::collections::BTreeMap;
15use std::mem::MaybeUninit;
16
17const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
18const PAGE_TABLE_ENTRY_LEN: u64 = 8;
19const SEGMENT_PAGE_COUNT: u64 = 256;
20const SEGMENT_TABLE_BYTES: u64 = SEGMENT_PAGE_COUNT * PAGE_TABLE_ENTRY_LEN;
21const SINGLE_SEGMENT_PAGE_TABLE_BYTES: u64 = SEGMENT_TABLE_BYTES + PAGE_TABLE_ENTRY_LEN;
22const READ_SEGMENT_CACHE_CAPACITY: usize = 8;
23const FILE_PAGE_OFFSET_CACHE_CAPACITY: usize = 64;
24const FILE_PAGE_DATA_CACHE_CAPACITY: usize = 8;
25const COMPACT_MIN_ORPHAN_BYTES: u64 = 16 * 1024 * 1024;
26
27#[derive(Clone, Debug, Eq, PartialEq)]
28pub struct ChecksumRefresh {
29    pub complete: bool,
30    pub checksum: u64,
31    pub scanned_bytes: u64,
32    pub db_size: u64,
33}
34
35#[derive(Clone, Debug, Eq, PartialEq)]
36pub struct StorageStats {
37    pub layout_version: u64,
38    pub page_count: u64,
39    pub page_table_bytes: u64,
40    pub active_bytes: u64,
41    pub allocated_bytes: u64,
42    pub orphan_bytes_estimate: u64,
43    pub orphan_ratio_basis_points: u64,
44    pub compact_recommended: bool,
45}
46
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48pub(crate) enum StableBlobFailpoint {
49    OverlayWrite,
50    OverlayTruncate,
51    CommitCapacity,
52    CommitChunkWrite,
53    CommitPageTableWrite,
54    CommitSuperblockStore,
55}
56
57thread_local! {
58    #[cfg(test)]
59    static FAILPOINTS: RefCell<BTreeMap<ContextId, StableBlobFailpoint>> = const { RefCell::new(BTreeMap::new()) };
60    static READ_TABLE_CACHE: RefCell<Vec<(ContextId, ReadTableCache)>> = const { RefCell::new(Vec::new()) };
61    static COMMIT_SEGMENT_CACHE: RefCell<Vec<(ContextId, CommitSegmentCache)>> = const { RefCell::new(Vec::new()) };
62}
63
64#[derive(Clone, Copy, Debug, Eq, PartialEq)]
65struct ReadCacheKey {
66    page_table_offset: u64,
67    page_count: u64,
68    db_size: u64,
69    last_tx_id: u64,
70}
71
72#[derive(Debug)]
73struct ReadTableCache {
74    key: Option<ReadCacheKey>,
75    root: Vec<u64>,
76    segments: Vec<CachedSegment>,
77}
78
79#[derive(Debug)]
80struct CachedSegment {
81    segment_no: u64,
82    table: Vec<u64>,
83}
84
85#[derive(Debug)]
86struct CommitSegmentCache {
87    segment_no: u64,
88    segment_offset: u64,
89    table: Vec<u64>,
90}
91
92impl ReadTableCache {
93    fn new() -> Self {
94        Self {
95            key: None,
96            root: Vec::new(),
97            segments: Vec::new(),
98        }
99    }
100
101    fn clear(&mut self) {
102        self.key = None;
103        self.root.clear();
104        self.segments.clear();
105    }
106
107    fn ensure_key(&mut self, key: ReadCacheKey) {
108        if self.key == Some(key) {
109            return;
110        }
111        self.clear();
112        self.key = Some(key);
113    }
114
115    #[inline(always)]
116    fn segment_page_offset(&mut self, segment_no: u64, index: usize) -> Option<u64> {
117        if self.segments.is_empty() {
118            return None;
119        }
120        if self.segments.len() == 1 {
121            let segment = &self.segments[0];
122            if segment.segment_no == segment_no {
123                return Some(segment.table[index]);
124            }
125            return None;
126        }
127        let position = self
128            .segments
129            .iter()
130            .position(|segment| segment.segment_no == segment_no)?;
131        let offset = Some(self.segments[position].table[index]);
132        if position + 1 != self.segments.len() {
133            let segment = self.segments.remove(position);
134            self.segments.push(segment);
135        }
136        offset
137    }
138
139    fn insert_segment(&mut self, segment_no: u64, table: Vec<u64>) {
140        if let Some(position) = self
141            .segments
142            .iter()
143            .position(|segment| segment.segment_no == segment_no)
144        {
145            self.segments.remove(position);
146        }
147        self.segments.push(CachedSegment { segment_no, table });
148        while self.segments.len() > READ_SEGMENT_CACHE_CAPACITY {
149            self.segments.remove(0);
150        }
151    }
152}
153
154#[derive(Debug)]
155pub(crate) struct PageOffsetCache {
156    entries: Vec<(u64, u64)>,
157    pages: Vec<(u64, Vec<u8>)>,
158}
159
160impl PageOffsetCache {
161    pub(crate) fn new() -> Self {
162        Self {
163            entries: Vec::with_capacity(FILE_PAGE_OFFSET_CACHE_CAPACITY),
164            pages: Vec::new(),
165        }
166    }
167
168    fn get(&self, page_no: u64) -> Option<u64> {
169        match self.entries.as_slice() {
170            [] => None,
171            [(cached_page, physical)] => (*cached_page == page_no).then_some(*physical),
172            entries => {
173                for (cached_page, physical) in entries {
174                    if *cached_page == page_no {
175                        return Some(*physical);
176                    }
177                }
178                None
179            }
180        }
181    }
182
183    fn insert(&mut self, page_no: u64, physical: u64) {
184        if self.entries.len() == FILE_PAGE_OFFSET_CACHE_CAPACITY {
185            self.entries.remove(0);
186        }
187        self.entries.push((page_no, physical));
188    }
189
190    #[inline(always)]
191    fn copy_page_slice(&self, page_no: u64, in_page: usize, dst: &mut [u8]) -> bool {
192        if self.pages.is_empty() {
193            return false;
194        }
195        if self.pages.len() == 1 {
196            let (cached_page, page) = &self.pages[0];
197            if *cached_page == page_no {
198                let end = in_page + dst.len();
199                dst.copy_from_slice(&page[in_page..end]);
200                return true;
201            }
202            return false;
203        }
204        for (cached_page, page) in &self.pages {
205            if *cached_page == page_no {
206                let end = in_page + dst.len();
207                dst.copy_from_slice(&page[in_page..end]);
208                return true;
209            }
210        }
211        false
212    }
213
214    fn insert_page(&mut self, page_no: u64, page: Vec<u8>) {
215        if self.pages.len() == FILE_PAGE_DATA_CACHE_CAPACITY {
216            self.pages.remove(0);
217        }
218        self.pages.push((page_no, page));
219    }
220}
221
222#[cfg(test)]
223pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
224    if let Ok(context) = memory::active_context_id() {
225        FAILPOINTS.with(|slot| {
226            slot.borrow_mut().insert(context, failpoint);
227        });
228    }
229}
230
231#[cfg(test)]
232pub(crate) fn clear_failpoint() {
233    FAILPOINTS.with(|slot| slot.borrow_mut().clear());
234}
235
236pub(crate) fn ensure_page_map_layout() -> Result<(), StableMemoryError> {
237    let block = Superblock::load()?;
238    if block.layout_version >= PAGE_MAP_LAYOUT_VERSION {
239        return Ok(());
240    }
241    Err(StableMemoryError::UnsupportedLayoutVersion(
242        block.layout_version,
243    ))
244}
245
246pub(crate) fn begin_update() -> Result<u64, StableMemoryError> {
247    let block = Superblock::load()?;
248    if block.layout_version < PAGE_MAP_LAYOUT_VERSION {
249        return Err(StableMemoryError::UnsupportedLayoutVersion(
250            block.layout_version,
251        ));
252    }
253    if block.is_importing() {
254        return Err(StableMemoryError::ImportAlreadyStarted);
255    }
256    overlay::begin(block.db_size)?;
257    Ok(block.db_size)
258}
259
260pub(crate) fn rollback_update() {
261    overlay::rollback();
262}
263
264#[doc(hidden)]
265pub fn invalidate_read_cache() {
266    READ_TABLE_CACHE.with(|cache| cache.borrow_mut().clear());
267    COMMIT_SEGMENT_CACHE.with(|cache| cache.borrow_mut().clear());
268}
269
270pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
271    let Some(overlay) = overlay::take() else {
272        return Ok(());
273    };
274    if overlay.is_empty() {
275        return Ok(());
276    }
277    commit_overlay(overlay, true)
278}
279
280pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
281    if let Some(result) = overlay::read_at(offset, dst) {
282        return result;
283    }
284    read_base_at(offset, dst)
285}
286
287pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
288    if dst.is_empty() {
289        return Ok(true);
290    }
291    let block = Superblock::load()?;
292    read_base_at_with_block(&block, offset, dst)
293}
294
295pub(crate) fn read_base_at_with_block(
296    block: &Superblock,
297    offset: u64,
298    dst: &mut [u8],
299) -> Result<bool, StableMemoryError> {
300    if dst.is_empty() {
301        return Ok(true);
302    }
303    if offset >= block.db_size {
304        dst.fill(0);
305        return Ok(false);
306    }
307    let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
308    if requested <= block.db_size - offset {
309        read_logical_range(block, offset, dst)?;
310        return Ok(true);
311    }
312    let copied = requested.min(block.db_size - offset);
313    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
314    read_logical_range(block, offset, &mut dst[..copied_len])?;
315    dst[copied_len..].fill(0);
316    Ok(copied == requested)
317}
318
319#[inline(always)]
320pub(crate) fn read_base_at_with_page_cache(
321    block: &Superblock,
322    offset: u64,
323    dst: &mut [u8],
324    page_offsets: &mut PageOffsetCache,
325) -> Result<bool, StableMemoryError> {
326    if dst.is_empty() {
327        return Ok(true);
328    }
329    if offset >= block.db_size {
330        dst.fill(0);
331        return Ok(false);
332    }
333    let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
334    if requested <= block.db_size - offset {
335        read_logical_range_with_page_cache(block, offset, dst, page_offsets)?;
336        return Ok(true);
337    }
338    let copied = requested.min(block.db_size - offset);
339    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
340    read_logical_range_with_page_cache(block, offset, &mut dst[..copied_len], page_offsets)?;
341    dst[copied_len..].fill(0);
342    Ok(copied == requested)
343}
344
345pub(crate) fn read_base_page(page_no: u64) -> Result<Vec<u8>, StableMemoryError> {
346    let block = Superblock::load()?;
347    let mut page = zero_page();
348    if page_no >= active_page_count(&block)? {
349        return Ok(page);
350    }
351    let physical = page_offset_for(&block, page_no)?;
352    if physical != 0 {
353        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
354        crate::read_metrics::record_stable_data_read(page.len());
355        memory::read_preallocated(physical, &mut page)?;
356    }
357    Ok(page)
358}
359
360pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
361    if let Some(result) = overlay::write_at(offset, bytes) {
362        hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
363        return result;
364    }
365    if bytes.is_empty() {
366        return Ok(());
367    }
368    ensure_page_map_layout()?;
369    let mut direct = Overlay::new(Superblock::load()?.db_size);
370    direct.write_at(offset, bytes)?;
371    commit_overlay(direct, false)
372}
373
374pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
375    if let Some(result) = overlay::truncate(size) {
376        hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
377        return result;
378    }
379    ensure_page_map_layout()?;
380    let mut direct = Overlay::new(Superblock::load()?.db_size);
381    direct.truncate(size)?;
382    if direct.is_empty() {
383        return Ok(());
384    }
385    commit_overlay(direct, false)
386}
387
388pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
389    if let Some(size) = overlay::file_size() {
390        return Ok(size);
391    }
392    Ok(Superblock::load()?.db_size)
393}
394
395pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
396    reject_during_update()?;
397    let block = Superblock::load()?;
398    if offset >= block.db_size {
399        return Ok(Vec::new());
400    }
401    let copied = len.min(block.db_size - offset);
402    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
403    let mut out = vec![0_u8; copied_len];
404    read_logical_range(&block, offset, &mut out)?;
405    Ok(out)
406}
407
408pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
409    reject_during_update()?;
410    let mut block = Superblock::load()?;
411    if !block.is_importing() {
412        return Err(StableMemoryError::ImportNotStarted);
413    }
414    let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
415    if offset != block.import_written_until {
416        return Err(StableMemoryError::ImportOutOfOrder {
417            offset,
418            expected: block.import_written_until,
419        });
420    }
421    let end = checked_add(offset, len)?;
422    if end > block.import_total_size {
423        return Err(StableMemoryError::ImportOutOfBounds {
424            offset,
425            len,
426            db_size: block.import_total_size,
427        });
428    }
429    memory::write(import_offset(&block, offset)?, bytes)?;
430    block.import_written_until = end;
431    block.store()?;
432    invalidate_read_cache();
433    Ok(())
434}
435
436pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
437    reject_during_update()?;
438    let mut block = Superblock::load()?;
439    if block.is_importing() {
440        return Err(StableMemoryError::ImportAlreadyStarted);
441    }
442    let import_base_offset = append_base()?;
443    checked_add(import_base_offset, total_size)?;
444    block.flags |= FLAG_IMPORTING;
445    block.clear_checksum_refresh();
446    block.import_expected_checksum = expected_checksum;
447    block.import_written_until = 0;
448    block.import_total_size = total_size;
449    block.import_base_offset = import_base_offset;
450    block.store()?;
451    invalidate_read_cache();
452    Ok(())
453}
454
455pub fn finish_import() -> Result<(), StableMemoryError> {
456    reject_during_update()?;
457    let mut block = Superblock::load()?;
458    if !block.is_importing() {
459        return Err(StableMemoryError::ImportNotStarted);
460    }
461    if block.import_written_until != block.import_total_size {
462        return Err(StableMemoryError::ImportIncomplete {
463            written_until: block.import_written_until,
464            db_size: block.import_total_size,
465        });
466    }
467    let checksum = checksum_physical_range(block.import_base_offset, block.import_total_size)?;
468    if checksum != block.import_expected_checksum {
469        let expected = block.import_expected_checksum;
470        clear_import(&mut block)?;
471        return Err(StableMemoryError::ChecksumMismatch {
472            expected,
473            actual: checksum,
474        });
475    }
476    let entries = imported_page_table(&block)?;
477    let (root_offset, root_len) = write_segmented_tables(&entries)?;
478    block.db_size = block.import_total_size;
479    block.db_base_offset = block.import_base_offset;
480    block.page_table_offset = root_offset;
481    block.page_count = root_len;
482    block.layout_version = PAGE_MAP_LAYOUT_VERSION;
483    block.flags &= !FLAG_IMPORTING;
484    block.flags &= !FLAG_CHECKSUM_STALE;
485    block.clear_checksum_refresh();
486    block.checksum = checksum;
487    block.import_expected_checksum = 0;
488    block.import_written_until = 0;
489    block.import_total_size = 0;
490    block.import_base_offset = 0;
491    block.store()?;
492    invalidate_read_cache();
493    Ok(())
494}
495
496pub fn cancel_import() -> Result<(), StableMemoryError> {
497    reject_during_update()?;
498    let mut block = Superblock::load()?;
499    if !block.is_importing() {
500        return Err(StableMemoryError::ImportNotStarted);
501    }
502    clear_import(&mut block)
503}
504
505pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
506    reject_during_update()?;
507    let checksum = checksum()?;
508    let mut block = Superblock::load()?;
509    block.checksum = checksum;
510    block.flags &= !FLAG_CHECKSUM_STALE;
511    block.clear_checksum_refresh();
512    block.store()?;
513    invalidate_read_cache();
514    Ok(checksum)
515}
516
517pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
518    reject_during_update()?;
519    if max_bytes == 0 {
520        return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
521    }
522
523    let mut block = Superblock::load()?;
524    if block.is_importing() {
525        return Err(StableMemoryError::ImportAlreadyStarted);
526    }
527    if !block.is_checksum_refreshing() {
528        block.flags |= FLAG_CHECKSUM_REFRESHING;
529        block.checksum_refresh_offset = 0;
530        block.checksum_refresh_hash = fnv1a64(&[]);
531        block.checksum_refresh_tx_id = block.last_tx_id;
532    }
533    if block.checksum_refresh_tx_id != block.last_tx_id {
534        block.clear_checksum_refresh();
535        block.store()?;
536        invalidate_read_cache();
537        return refresh_checksum_chunk(max_bytes);
538    }
539
540    let start = block.checksum_refresh_offset;
541    let end = block.db_size.min(start.saturating_add(max_bytes));
542    let mut offset = start;
543    let mut hash = block.checksum_refresh_hash;
544    while offset < end {
545        let len = (end - offset).min(CHECKSUM_CHUNK_LEN);
546        let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
547        let mut bytes = vec![0_u8; copied_len];
548        read_logical_range(&block, offset, &mut bytes)?;
549        hash = fold_fnv1a64(hash, &bytes);
550        offset += len;
551    }
552
553    block.checksum_refresh_offset = offset;
554    block.checksum_refresh_hash = hash;
555    if offset == block.db_size {
556        block.checksum = hash;
557        block.flags &= !FLAG_CHECKSUM_STALE;
558        block.clear_checksum_refresh();
559    }
560    let out = ChecksumRefresh {
561        complete: offset == block.db_size,
562        checksum: hash,
563        scanned_bytes: offset,
564        db_size: block.db_size,
565    };
566    block.store()?;
567    invalidate_read_cache();
568    Ok(out)
569}
570
571pub fn checksum() -> Result<u64, StableMemoryError> {
572    reject_during_update()?;
573    let block = Superblock::load()?;
574    checksum_logical_range(&block, block.db_size)
575}
576
577pub fn compact() -> Result<(), StableMemoryError> {
578    reject_during_update()?;
579    ensure_page_map_layout()?;
580    let block = Superblock::load()?;
581    let table = read_page_table(&block)?;
582    let mut compacted = Vec::with_capacity(table.len());
583    let mut cursor = append_base()?;
584    let non_zero_pages = table.iter().filter(|offset| **offset != 0).count();
585    let data_bytes = u64::try_from(non_zero_pages)
586        .map_err(|_| StableMemoryError::OffsetOverflow)?
587        .checked_mul(page_size())
588        .ok_or(StableMemoryError::OffsetOverflow)?;
589    memory::ensure_capacity(checked_add(cursor, data_bytes)?)?;
590
591    for offset in table {
592        if offset == 0 {
593            compacted.push(0);
594            continue;
595        }
596        let mut page = zero_page();
597        memory::read_preallocated(offset, &mut page)?;
598        memory::write_preallocated(cursor, &page)?;
599        compacted.push(cursor);
600        cursor = checked_add(cursor, page_size())?;
601    }
602
603    let (root_offset, root_len) = write_segmented_tables(&compacted)?;
604    Superblock::store_page_map_without_tx(root_offset, root_len, block.db_size)?;
605    invalidate_read_cache();
606    Ok(())
607}
608
609pub fn storage_stats() -> Result<StorageStats, StableMemoryError> {
610    let block = Superblock::load()?;
611    let table = read_page_table(&block)?;
612    let non_zero_pages = u64::try_from(table.iter().filter(|offset| **offset != 0).count())
613        .map_err(|_| StableMemoryError::OffsetOverflow)?;
614    let segment_count = active_segment_count(&block)?;
615    let root_bytes = root_table_bytes(segment_count)?;
616    let segment_bytes = segment_count
617        .checked_mul(segment_table_bytes()?)
618        .ok_or(StableMemoryError::OffsetOverflow)?;
619    let page_table_bytes = checked_add(root_bytes, segment_bytes)?;
620    let active_bytes = SUPERBLOCK_SIZE
621        .checked_add(non_zero_pages.saturating_mul(page_size()))
622        .and_then(|value| value.checked_add(page_table_bytes))
623        .ok_or(StableMemoryError::OffsetOverflow)?;
624    let allocated_bytes = memory::size_pages()
625        .checked_mul(STABLE_PAGE_SIZE)
626        .ok_or(StableMemoryError::OffsetOverflow)?;
627    let orphan_bytes_estimate = allocated_bytes.saturating_sub(active_bytes);
628    let orphan_ratio_basis_points = if active_bytes == 0 {
629        0
630    } else {
631        orphan_bytes_estimate.saturating_mul(10_000) / active_bytes
632    };
633    Ok(StorageStats {
634        layout_version: block.layout_version,
635        page_count: active_page_count(&block)?,
636        page_table_bytes,
637        active_bytes,
638        allocated_bytes,
639        orphan_bytes_estimate,
640        orphan_ratio_basis_points,
641        compact_recommended: orphan_bytes_estimate >= active_bytes
642            && orphan_bytes_estimate >= COMPACT_MIN_ORPHAN_BYTES,
643    })
644}
645
646pub(crate) fn page_count_for_size(size: u64) -> Result<u64, StableMemoryError> {
647    Ok(size.div_ceil(page_size()))
648}
649
650#[cfg(test)]
651pub(crate) fn debug_root_table_for_tests() -> Result<Vec<u64>, StableMemoryError> {
652    let block = Superblock::load()?;
653    read_root_table(&block)
654}
655
656fn commit_overlay(overlay: Overlay, advance_tx: bool) -> Result<(), StableMemoryError> {
657    hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
658    let profile_enabled = commit_profile_enabled();
659    let block = Superblock::load()?;
660    let overlay_size = overlay.size();
661    let final_page_count = page_count_for_size(overlay_size)?;
662    let data_cursor = append_base()?;
663    debug_assert!(overlay
664        .dirty_pages()
665        .iter()
666        .all(|(page_no, _)| *page_no < final_page_count));
667    let dirty_pages = overlay.dirty_pages();
668    if let [(page_no, page)] = dirty_pages {
669        if overlay_size >= block.db_size
670            && *page_no < final_page_count
671            && final_page_count <= SEGMENT_PAGE_COUNT
672        {
673            let build_profile_start = commit_profile_start(profile_enabled);
674            let options = SinglePageCommitOptions {
675                advance_tx,
676                overlay_size,
677                data_cursor,
678                profile_enabled,
679                build_profile_start,
680            };
681            return commit_single_segment_page_overlay(&block, *page_no, page, options);
682        }
683    }
684
685    let final_segment_count = segment_count_for_pages(final_page_count)?;
686    let profile_start = commit_profile_start(profile_enabled);
687    let mut root = read_commit_root_table(&block)?;
688    commit_profile_record_load(profile_start);
689
690    let build_profile_start = commit_profile_start(profile_enabled);
691    let root_len =
692        usize::try_from(final_segment_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
693    if root.len() != root_len {
694        root.resize(root_len, 0);
695    }
696
697    if let [(page_no, page)] = dirty_pages {
698        if overlay_size >= block.db_size && *page_no < final_page_count {
699            let options = SinglePageCommitOptions {
700                advance_tx,
701                overlay_size,
702                data_cursor,
703                profile_enabled,
704                build_profile_start,
705            };
706            return commit_single_page_overlay(
707                &block,
708                final_segment_count,
709                root,
710                *page_no,
711                page,
712                options,
713            );
714        }
715    }
716
717    let mut segment_updates = BTreeMap::<u64, Vec<u64>>::new();
718    let mut page_cursor = data_cursor;
719
720    for (page_no, _) in dirty_pages {
721        if *page_no >= final_page_count {
722            continue;
723        }
724        let segment_no = segment_no(*page_no);
725        let index = segment_index(*page_no)?;
726        let table = load_segment_for_update(&block, &root, &mut segment_updates, segment_no)?;
727        table[index] = page_cursor;
728        page_cursor = checked_add(page_cursor, page_size())?;
729    }
730
731    if overlay_size < block.db_size {
732        clear_truncated_tail(&block, &root, &mut segment_updates, final_page_count)?;
733    }
734    commit_profile_record_build_segments(build_profile_start);
735
736    let mut table_cursor = page_cursor;
737    let root_entries_len = final_segment_count;
738    let segment_table_writes = segment_updates.len();
739    let segment_table_bytes = u64::try_from(segment_table_writes)
740        .map_err(|_| StableMemoryError::OffsetOverflow)?
741        .checked_mul(segment_table_bytes()?)
742        .ok_or(StableMemoryError::OffsetOverflow)?;
743    let page_table_bytes = checked_add(segment_table_bytes, root_table_bytes(root_entries_len)?)?;
744    let profile_start = commit_profile_start(profile_enabled);
745    memory::ensure_capacity(checked_add(table_cursor, page_table_bytes)?)?;
746    commit_profile_record_capacity(profile_start);
747
748    let profile_start = commit_profile_start(profile_enabled);
749    let mut cursor = data_cursor;
750    for (_, page) in dirty_pages {
751        hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
752        write_commit_page(cursor, page, profile_enabled)?;
753        cursor = checked_add(cursor, page_size())?;
754    }
755    commit_profile_record_page_write(profile_start);
756
757    hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
758    let profile_start = commit_profile_start(profile_enabled);
759    for (segment_no, table) in segment_updates {
760        let offset = write_commit_segment_table_at(&table, &mut table_cursor, profile_enabled)?;
761        let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
762        root[index] = offset;
763    }
764    let root_offset = write_commit_root_table_at(&root, &mut table_cursor, profile_enabled)?;
765    commit_profile_record_table_write(profile_start);
766
767    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
768    let profile_start = commit_profile_start(profile_enabled);
769    let result = store_commit_page_map(
770        advance_tx,
771        root_offset,
772        root_entries_len,
773        overlay_size,
774        profile_enabled,
775    );
776    commit_profile_record_superblock_store(profile_start);
777    result
778}
779
780#[derive(Clone, Copy)]
781struct SinglePageCommitOptions {
782    advance_tx: bool,
783    overlay_size: u64,
784    data_cursor: u64,
785    profile_enabled: bool,
786    build_profile_start: Option<u64>,
787}
788
789fn commit_single_page_overlay(
790    block: &Superblock,
791    final_segment_count: u64,
792    mut root: Vec<u64>,
793    page_no: u64,
794    page: &[u8],
795    options: SinglePageCommitOptions,
796) -> Result<(), StableMemoryError> {
797    let segment_no = segment_no(page_no);
798    let index = segment_index(page_no)?;
799    let mut table = read_commit_segment_table(block, &root, segment_no)?;
800    table[index] = options.data_cursor;
801    let page_cursor = checked_add(options.data_cursor, page_size())?;
802    commit_profile_record_build_segments(options.build_profile_start);
803
804    let root_entries_len = final_segment_count;
805    let page_table_bytes =
806        checked_add(segment_table_bytes()?, root_table_bytes(root_entries_len)?)?;
807    let profile_start = commit_profile_start(options.profile_enabled);
808    memory::ensure_capacity(checked_add(page_cursor, page_table_bytes)?)?;
809    commit_profile_record_capacity(profile_start);
810
811    hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
812    let profile_start = commit_profile_start(options.profile_enabled);
813    write_commit_page(options.data_cursor, page, options.profile_enabled)?;
814    commit_profile_record_page_write(profile_start);
815
816    hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
817    let profile_start = commit_profile_start(options.profile_enabled);
818    let mut table_cursor = page_cursor;
819    let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
820    let root_offset = if final_segment_count == 1 {
821        write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?
822    } else {
823        let root_index =
824            usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
825        root[root_index] = offset;
826        write_commit_root_table_at(&root, &mut table_cursor, options.profile_enabled)?
827    };
828    commit_profile_record_table_write(profile_start);
829
830    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
831    let profile_start = commit_profile_start(options.profile_enabled);
832    let result = store_commit_page_map(
833        options.advance_tx,
834        root_offset,
835        root_entries_len,
836        options.overlay_size,
837        options.profile_enabled,
838    );
839    commit_profile_record_superblock_store(profile_start);
840    if result.is_ok() {
841        cache_commit_segment_table(segment_no, offset, table);
842    }
843    result
844}
845
846fn commit_single_segment_page_overlay(
847    block: &Superblock,
848    page_no: u64,
849    page: &[u8],
850    options: SinglePageCommitOptions,
851) -> Result<(), StableMemoryError> {
852    let index = segment_index(page_no)?;
853    let root = read_commit_root_table(block)?;
854    let mut table = read_commit_segment_table(block, &root, 0)?;
855    table[index] = options.data_cursor;
856    let page_cursor = checked_add(options.data_cursor, page_size())?;
857    commit_profile_record_build_segments(options.build_profile_start);
858
859    let profile_start = commit_profile_start(options.profile_enabled);
860    memory::ensure_capacity(checked_add(page_cursor, SINGLE_SEGMENT_PAGE_TABLE_BYTES)?)?;
861    commit_profile_record_capacity(profile_start);
862
863    hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
864    let profile_start = commit_profile_start(options.profile_enabled);
865    memory::write_prechecked(options.data_cursor, page)?;
866    commit_profile_record_page_write(profile_start);
867
868    hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
869    let profile_start = commit_profile_start(options.profile_enabled);
870    let mut table_cursor = page_cursor;
871    let offset = write_commit_segment_table_at(&table, &mut table_cursor, options.profile_enabled)?;
872    let root_offset =
873        write_commit_root_table_at(&[offset], &mut table_cursor, options.profile_enabled)?;
874    commit_profile_record_table_write(profile_start);
875
876    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
877    let profile_start = commit_profile_start(options.profile_enabled);
878    let result = store_commit_page_map(
879        options.advance_tx,
880        root_offset,
881        1,
882        options.overlay_size,
883        options.profile_enabled,
884    );
885    commit_profile_record_superblock_store(profile_start);
886    if result.is_ok() {
887        cache_commit_segment_table(0, offset, table);
888    }
889    result
890}
891
892#[cfg(any(test, debug_assertions, feature = "bench-profile"))]
893#[inline(always)]
894fn commit_profile_enabled() -> bool {
895    crate::read_metrics::metrics_enabled()
896}
897
898#[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
899#[inline(always)]
900fn commit_profile_enabled() -> bool {
901    false
902}
903
904#[inline(always)]
905fn commit_profile_start(enabled: bool) -> Option<u64> {
906    if enabled {
907        Some(crate::read_metrics::instruction_counter())
908    } else {
909        None
910    }
911}
912
913macro_rules! commit_profile_recorder {
914    ($name:ident, $record:ident) => {
915        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
916        #[inline(always)]
917        fn $name(start: Option<u64>) {
918            if let Some(start) = start {
919                crate::read_metrics::$record(
920                    crate::read_metrics::instruction_counter().saturating_sub(start),
921                );
922            }
923        }
924
925        #[cfg(not(any(test, debug_assertions, feature = "bench-profile")))]
926        #[inline(always)]
927        fn $name(_start: Option<u64>) {}
928    };
929}
930
931commit_profile_recorder!(commit_profile_record_load, record_commit_load);
932commit_profile_recorder!(
933    commit_profile_record_build_segments,
934    record_commit_build_segments
935);
936commit_profile_recorder!(commit_profile_record_capacity, record_commit_capacity);
937commit_profile_recorder!(commit_profile_record_page_write, record_commit_page_write);
938commit_profile_recorder!(commit_profile_record_table_write, record_commit_table_write);
939commit_profile_recorder!(
940    commit_profile_record_superblock_store,
941    record_commit_superblock_store
942);
943
944#[inline(always)]
945fn write_commit_page(
946    offset: u64,
947    page: &[u8],
948    profile_enabled: bool,
949) -> Result<(), StableMemoryError> {
950    if profile_enabled {
951        memory::write_prechecked(offset, page)
952    } else {
953        memory::write_prechecked_unmetered(offset, page)
954    }
955}
956
957fn store_commit_page_map(
958    advance_tx: bool,
959    root_offset: u64,
960    root_entries_len: u64,
961    overlay_size: u64,
962    profile_enabled: bool,
963) -> Result<(), StableMemoryError> {
964    match (advance_tx, profile_enabled) {
965        (true, true) => Superblock::commit_page_map(root_offset, root_entries_len, overlay_size),
966        (true, false) => {
967            Superblock::commit_page_map_unmetered(root_offset, root_entries_len, overlay_size)
968        }
969        (false, true) => {
970            Superblock::store_page_map_without_tx(root_offset, root_entries_len, overlay_size)
971        }
972        (false, false) => Superblock::store_page_map_without_tx_unmetered(
973            root_offset,
974            root_entries_len,
975            overlay_size,
976        ),
977    }
978}
979
980fn load_segment_for_update<'a>(
981    block: &Superblock,
982    root: &[u64],
983    updates: &'a mut BTreeMap<u64, Vec<u64>>,
984    segment_no: u64,
985) -> Result<&'a mut Vec<u64>, StableMemoryError> {
986    match updates.entry(segment_no) {
987        std::collections::btree_map::Entry::Occupied(entry) => Ok(entry.into_mut()),
988        std::collections::btree_map::Entry::Vacant(entry) => {
989            let table = read_segment_table(block, root, segment_no)?;
990            Ok(entry.insert(table))
991        }
992    }
993}
994
995fn clear_truncated_tail(
996    block: &Superblock,
997    root: &[u64],
998    updates: &mut BTreeMap<u64, Vec<u64>>,
999    final_page_count: u64,
1000) -> Result<(), StableMemoryError> {
1001    let old_page_count = active_page_count(block)?;
1002    if final_page_count >= old_page_count || final_page_count == 0 {
1003        return Ok(());
1004    }
1005    let boundary_segment = segment_no(final_page_count);
1006    if boundary_segment >= segment_count_for_pages(final_page_count)? {
1007        return Ok(());
1008    }
1009    let start = segment_index(final_page_count)?;
1010    if start == 0 {
1011        return Ok(());
1012    }
1013    let table = load_segment_for_update(block, root, updates, boundary_segment)?;
1014    table[start..].fill(0);
1015    Ok(())
1016}
1017
1018fn reject_during_update() -> Result<(), StableMemoryError> {
1019    if overlay::is_active() {
1020        Err(StableMemoryError::UpdateInProgress)
1021    } else {
1022        Ok(())
1023    }
1024}
1025
1026fn read_logical_range(
1027    block: &Superblock,
1028    offset: u64,
1029    dst: &mut [u8],
1030) -> Result<(), StableMemoryError> {
1031    if dst.is_empty() {
1032        return Ok(());
1033    }
1034    let in_page =
1035        usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1036    if dst.len() <= page_len() - in_page {
1037        return read_logical_page_slice(block, offset / page_size(), in_page, dst);
1038    }
1039
1040    let mut copied_total = 0_usize;
1041    while copied_total < dst.len() {
1042        let absolute = checked_add(
1043            offset,
1044            u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1045        )?;
1046        let page_no = absolute / page_size();
1047        let in_page = usize::try_from(absolute % page_size())
1048            .map_err(|_| StableMemoryError::OffsetOverflow)?;
1049        let copied = (page_len() - in_page).min(dst.len() - copied_total);
1050        read_logical_page_slice(
1051            block,
1052            page_no,
1053            in_page,
1054            &mut dst[copied_total..copied_total + copied],
1055        )?;
1056        copied_total += copied;
1057    }
1058    Ok(())
1059}
1060
1061fn read_logical_range_with_page_cache(
1062    block: &Superblock,
1063    offset: u64,
1064    dst: &mut [u8],
1065    page_offsets: &mut PageOffsetCache,
1066) -> Result<(), StableMemoryError> {
1067    let in_page =
1068        usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
1069    if dst.len() <= page_len() - in_page {
1070        return read_logical_page_slice_with_page_cache(
1071            block,
1072            offset / page_size(),
1073            in_page,
1074            dst,
1075            page_offsets,
1076        );
1077    }
1078
1079    let mut copied_total = 0_usize;
1080    while copied_total < dst.len() {
1081        let absolute = checked_add(
1082            offset,
1083            u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
1084        )?;
1085        let page_no = absolute / page_size();
1086        let in_page = usize::try_from(absolute % page_size())
1087            .map_err(|_| StableMemoryError::OffsetOverflow)?;
1088        let copied = (page_len() - in_page).min(dst.len() - copied_total);
1089        read_logical_page_slice_with_page_cache(
1090            block,
1091            page_no,
1092            in_page,
1093            &mut dst[copied_total..copied_total + copied],
1094            page_offsets,
1095        )?;
1096        copied_total += copied;
1097    }
1098    Ok(())
1099}
1100
1101fn read_logical_page_slice(
1102    block: &Superblock,
1103    page_no: u64,
1104    in_page: usize,
1105    dst: &mut [u8],
1106) -> Result<(), StableMemoryError> {
1107    let physical = page_offset_for(block, page_no)?;
1108    if physical == 0 {
1109        dst.fill(0);
1110        return Ok(());
1111    }
1112    let stable_offset = checked_add(
1113        physical,
1114        u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1115    )?;
1116    #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1117    crate::read_metrics::record_stable_data_read(dst.len());
1118    memory::read_preallocated(stable_offset, dst)
1119}
1120
1121#[inline(always)]
1122fn read_logical_page_slice_with_page_cache(
1123    block: &Superblock,
1124    page_no: u64,
1125    in_page: usize,
1126    dst: &mut [u8],
1127    page_offsets: &mut PageOffsetCache,
1128) -> Result<(), StableMemoryError> {
1129    if dst.len() < page_len() && page_offsets.copy_page_slice(page_no, in_page, dst) {
1130        return Ok(());
1131    }
1132    let physical = match page_offsets.get(page_no) {
1133        Some(physical) => physical,
1134        None => {
1135            let physical = if block.page_table_offset == 0 {
1136                0
1137            } else {
1138                cached_page_offset_for(block, page_no)?
1139            };
1140            page_offsets.insert(page_no, physical);
1141            physical
1142        }
1143    };
1144    if physical == 0 {
1145        dst.fill(0);
1146        return Ok(());
1147    }
1148    if in_page == 0 && dst.len() == page_len() {
1149        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1150        crate::read_metrics::record_stable_data_read(dst.len());
1151        return memory::read_preallocated(physical, dst);
1152    }
1153    if dst.len() < page_len() {
1154        let mut page = zero_page();
1155        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1156        crate::read_metrics::record_stable_data_read(page.len());
1157        memory::read_preallocated(physical, &mut page)?;
1158        let end = in_page + dst.len();
1159        dst.copy_from_slice(&page[in_page..end]);
1160        page_offsets.insert_page(page_no, page);
1161        return Ok(());
1162    }
1163    let stable_offset = checked_add(
1164        physical,
1165        u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
1166    )?;
1167    #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1168    crate::read_metrics::record_stable_data_read(dst.len());
1169    memory::read_preallocated(stable_offset, dst)
1170}
1171
1172fn page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1173    if page_no >= active_page_count(block)? || block.page_table_offset == 0 {
1174        return Ok(0);
1175    }
1176    cached_page_offset_for(block, page_no)
1177}
1178
1179fn read_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1180    let root = read_root_table(block)?;
1181    let count = active_page_count(block)?;
1182    let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1183    let mut entries = Vec::with_capacity(capacity);
1184    for segment_no in 0..segment_count_for_pages(count)? {
1185        let table = read_segment_table(block, &root, segment_no)?;
1186        for entry in table {
1187            if entries.len() == capacity {
1188                break;
1189            }
1190            entries.push(entry);
1191        }
1192    }
1193    Ok(entries)
1194}
1195
1196fn cached_page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
1197    let context = memory::active_context_id()?;
1198    let key = read_cache_key(block);
1199    let segment_no = segment_no(page_no);
1200    let index = segment_index(page_no)?;
1201    READ_TABLE_CACHE.with(|cache| {
1202        let mut caches = cache.borrow_mut();
1203        let cache = match read_table_cache_index(&caches, context) {
1204            Some(index) => &mut caches[index].1,
1205            None => {
1206                caches.push((context, ReadTableCache::new()));
1207                &mut caches
1208                    .last_mut()
1209                    .ok_or(StableMemoryError::OffsetOverflow)?
1210                    .1
1211            }
1212        };
1213        cache.ensure_key(key);
1214        if cache.root.is_empty() {
1215            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1216            crate::read_metrics::record_page_table_root_miss();
1217            cache.root = read_root_table(block)?;
1218        } else {
1219            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1220            crate::read_metrics::record_page_table_root_hit();
1221        }
1222        let root_index =
1223            usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1224        let segment_offset = cache.root[root_index];
1225        if segment_offset == 0 {
1226            return Ok(0);
1227        }
1228        if let Some(offset) = cache.segment_page_offset(segment_no, index) {
1229            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1230            crate::read_metrics::record_page_table_segment_hit();
1231            return Ok(offset);
1232        }
1233        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
1234        crate::read_metrics::record_page_table_segment_miss();
1235        let table = read_segment_table_at(segment_offset)?;
1236        let offset = table[index];
1237        cache.insert_segment(segment_no, table);
1238        Ok(offset)
1239    })
1240}
1241
1242fn read_table_cache_index(
1243    caches: &[(ContextId, ReadTableCache)],
1244    context: ContextId,
1245) -> Option<usize> {
1246    caches
1247        .iter()
1248        .position(|(stored_context, _)| *stored_context == context)
1249}
1250
1251fn read_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1252    if block.page_count == 0 {
1253        return Ok(Vec::new());
1254    }
1255    let entries_len =
1256        usize::try_from(block.page_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1257    read_u64_table_at(block.page_table_offset, entries_len)
1258}
1259
1260fn read_commit_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1261    read_root_table(block)
1262}
1263
1264fn read_segment_table(
1265    _block: &Superblock,
1266    root: &[u64],
1267    segment_no: u64,
1268) -> Result<Vec<u64>, StableMemoryError> {
1269    let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1270    let Some(offset) = root.get(index).copied() else {
1271        return Ok(vec![0_u64; segment_page_count_usize()]);
1272    };
1273    if offset == 0 {
1274        return Ok(vec![0_u64; segment_page_count_usize()]);
1275    }
1276    read_segment_table_at(offset)
1277}
1278
1279fn read_commit_segment_table(
1280    _block: &Superblock,
1281    root: &[u64],
1282    segment_no: u64,
1283) -> Result<Vec<u64>, StableMemoryError> {
1284    let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
1285    let Some(offset) = root.get(index).copied() else {
1286        return Ok(vec![0_u64; segment_page_count_usize()]);
1287    };
1288    if offset == 0 {
1289        return Ok(vec![0_u64; segment_page_count_usize()]);
1290    }
1291    read_commit_segment_table_at(segment_no, offset)
1292}
1293
1294fn read_commit_segment_table_at(
1295    segment_no: u64,
1296    offset: u64,
1297) -> Result<Vec<u64>, StableMemoryError> {
1298    if offset == 0 {
1299        return Ok(vec![0_u64; segment_page_count_usize()]);
1300    }
1301    if let Some(table) = take_commit_segment_table(segment_no, offset) {
1302        return Ok(table);
1303    }
1304    read_segment_table_at(offset)
1305}
1306
1307fn take_commit_segment_table(segment_no: u64, segment_offset: u64) -> Option<Vec<u64>> {
1308    let Ok(context) = memory::active_context_id() else {
1309        return None;
1310    };
1311    COMMIT_SEGMENT_CACHE.with(|cache| {
1312        let mut cache = cache.borrow_mut();
1313        if cache.len() == 1 {
1314            let (stored_context, cached) = &cache[0];
1315            if *stored_context == context
1316                && cached.segment_no == segment_no
1317                && cached.segment_offset == segment_offset
1318            {
1319                return cache.pop().map(|(_, cached)| cached.table);
1320            }
1321            return None;
1322        }
1323        cache
1324            .iter()
1325            .position(|(stored_context, cached)| {
1326                *stored_context == context
1327                    && cached.segment_no == segment_no
1328                    && cached.segment_offset == segment_offset
1329            })
1330            .map(|position| cache.remove(position).1.table)
1331    })
1332}
1333
1334fn cache_commit_segment_table(segment_no: u64, segment_offset: u64, table: Vec<u64>) {
1335    let Ok(context) = memory::active_context_id() else {
1336        return;
1337    };
1338    COMMIT_SEGMENT_CACHE.with(|cache| {
1339        let mut cache = cache.borrow_mut();
1340        if cache.is_empty() {
1341            cache.push((
1342                context,
1343                CommitSegmentCache {
1344                    segment_no,
1345                    segment_offset,
1346                    table,
1347                },
1348            ));
1349            return;
1350        }
1351        if cache.len() == 1 {
1352            let (stored_context, cached) = &mut cache[0];
1353            if *stored_context == context {
1354                cached.segment_no = segment_no;
1355                cached.segment_offset = segment_offset;
1356                cached.table = table;
1357                return;
1358            }
1359        } else if let Some((_, cached)) = cache
1360            .iter_mut()
1361            .find(|(stored_context, _)| *stored_context == context)
1362        {
1363            cached.segment_no = segment_no;
1364            cached.segment_offset = segment_offset;
1365            cached.table = table;
1366            return;
1367        }
1368        cache.push((
1369            context,
1370            CommitSegmentCache {
1371                segment_no,
1372                segment_offset,
1373                table,
1374            },
1375        ));
1376    });
1377}
1378
1379fn read_segment_table_at(offset: u64) -> Result<Vec<u64>, StableMemoryError> {
1380    read_u64_table_at(offset, segment_page_count_usize())
1381}
1382
1383fn write_segmented_tables(entries: &[u64]) -> Result<(u64, u64), StableMemoryError> {
1384    if entries.is_empty() {
1385        return Ok((0, 0));
1386    }
1387    let root_len = segment_count_for_pages(entries_len_u64(entries)?)?;
1388    let mut cursor = append_base()?;
1389    let segment_bytes = root_len
1390        .checked_mul(segment_table_bytes()?)
1391        .ok_or(StableMemoryError::OffsetOverflow)?;
1392    let page_table_bytes = checked_add(segment_bytes, root_table_bytes(root_len)?)?;
1393    memory::ensure_capacity(checked_add(cursor, page_table_bytes)?)?;
1394    let mut root = Vec::with_capacity(
1395        usize::try_from(root_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1396    );
1397    for segment_no in 0..root_len {
1398        let start = usize::try_from(
1399            segment_no
1400                .checked_mul(SEGMENT_PAGE_COUNT)
1401                .ok_or(StableMemoryError::OffsetOverflow)?,
1402        )
1403        .map_err(|_| StableMemoryError::OffsetOverflow)?;
1404        let mut table = vec![0_u64; segment_page_count_usize()];
1405        for (offset, entry) in entries[start..]
1406            .iter()
1407            .take(segment_page_count_usize())
1408            .enumerate()
1409        {
1410            table[offset] = *entry;
1411        }
1412        root.push(write_segment_table_at(&table, &mut cursor)?);
1413    }
1414    let root_offset = write_root_table_at(&root, &mut cursor)?;
1415    Ok((root_offset, entries_len_u64(&root)?))
1416}
1417
1418#[inline(always)]
1419fn write_segment_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1420    if entries.len() == segment_page_count_usize() {
1421        return write_u64_table_at(entries, cursor);
1422    }
1423
1424    let mut table = vec![0_u64; segment_page_count_usize()];
1425    for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1426        table[index] = *entry;
1427    }
1428    write_u64_table_at(&table, cursor)
1429}
1430
1431fn write_root_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1432    write_u64_table_at(entries, cursor)
1433}
1434
1435#[inline(always)]
1436fn write_commit_segment_table_at(
1437    entries: &[u64],
1438    cursor: &mut u64,
1439    profile_enabled: bool,
1440) -> Result<u64, StableMemoryError> {
1441    if profile_enabled {
1442        write_segment_table_at(entries, cursor)
1443    } else {
1444        write_segment_table_at_unmetered(entries, cursor)
1445    }
1446}
1447
1448#[inline(always)]
1449fn write_commit_root_table_at(
1450    entries: &[u64],
1451    cursor: &mut u64,
1452    profile_enabled: bool,
1453) -> Result<u64, StableMemoryError> {
1454    if profile_enabled {
1455        write_root_table_at(entries, cursor)
1456    } else {
1457        write_u64_table_at_unmetered(entries, cursor)
1458    }
1459}
1460
1461fn write_segment_table_at_unmetered(
1462    entries: &[u64],
1463    cursor: &mut u64,
1464) -> Result<u64, StableMemoryError> {
1465    if entries.len() == segment_page_count_usize() {
1466        return write_u64_table_at_unmetered(entries, cursor);
1467    }
1468
1469    let mut table = vec![0_u64; segment_page_count_usize()];
1470    for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
1471        table[index] = *entry;
1472    }
1473    write_u64_table_at_unmetered(&table, cursor)
1474}
1475
1476fn write_u64_table_at(entries: &[u64], cursor: &mut u64) -> Result<u64, StableMemoryError> {
1477    if entries.is_empty() {
1478        return Ok(0);
1479    }
1480    let offset = *cursor;
1481    let byte_len = entries
1482        .len()
1483        .checked_mul(8)
1484        .ok_or(StableMemoryError::OffsetOverflow)?;
1485    #[cfg(target_endian = "little")]
1486    {
1487        // SAFETY: page-table encoding is little-endian u64 and the target is little-endian.
1488        let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1489        memory::write_prechecked(offset, bytes)?;
1490        *cursor = checked_add(
1491            offset,
1492            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1493        )?;
1494        Ok(offset)
1495    }
1496
1497    #[cfg(not(target_endian = "little"))]
1498    {
1499        let mut bytes = vec![0_u8; byte_len];
1500        for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1501            chunk.copy_from_slice(&entry.to_le_bytes());
1502        }
1503        memory::write_prechecked(offset, &bytes)?;
1504        *cursor = checked_add(
1505            offset,
1506            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1507        )?;
1508        Ok(offset)
1509    }
1510}
1511
1512fn read_u64_table_at(offset: u64, entries_len: usize) -> Result<Vec<u64>, StableMemoryError> {
1513    if entries_len == 0 {
1514        return Ok(Vec::new());
1515    }
1516    let byte_len = entries_len
1517        .checked_mul(8)
1518        .ok_or(StableMemoryError::OffsetOverflow)?;
1519    #[cfg(target_endian = "little")]
1520    {
1521        let mut entries = Vec::<MaybeUninit<u64>>::with_capacity(entries_len);
1522        unsafe {
1523            entries.set_len(entries_len);
1524        }
1525        // SAFETY: the buffer has `entries_len` u64 slots. The stable-memory read
1526        // fills every byte before conversion to initialized `u64` values.
1527        let bytes =
1528            unsafe { std::slice::from_raw_parts_mut(entries.as_mut_ptr().cast::<u8>(), byte_len) };
1529        memory::read_preallocated(offset, bytes)?;
1530        let ptr = entries.as_mut_ptr().cast::<u64>();
1531        let len = entries.len();
1532        let capacity = entries.capacity();
1533        std::mem::forget(entries);
1534        // SAFETY: all bytes were just initialized by `read_preallocated`, and
1535        // every bit pattern is valid for `u64`.
1536        unsafe { Ok(Vec::from_raw_parts(ptr, len, capacity)) }
1537    }
1538
1539    #[cfg(not(target_endian = "little"))]
1540    {
1541        let mut bytes = vec![0_u8; byte_len];
1542        memory::read_preallocated(offset, &mut bytes)?;
1543        decode_u64_table(&bytes)
1544    }
1545}
1546
1547fn write_u64_table_at_unmetered(
1548    entries: &[u64],
1549    cursor: &mut u64,
1550) -> Result<u64, StableMemoryError> {
1551    if entries.is_empty() {
1552        return Ok(0);
1553    }
1554    let offset = *cursor;
1555    let byte_len = entries
1556        .len()
1557        .checked_mul(8)
1558        .ok_or(StableMemoryError::OffsetOverflow)?;
1559    #[cfg(target_endian = "little")]
1560    {
1561        // SAFETY: page-table encoding is little-endian u64 and the target is little-endian.
1562        let bytes = unsafe { std::slice::from_raw_parts(entries.as_ptr().cast::<u8>(), byte_len) };
1563        memory::write_prechecked_unmetered(offset, bytes)?;
1564        *cursor = checked_add(
1565            offset,
1566            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1567        )?;
1568        Ok(offset)
1569    }
1570
1571    #[cfg(not(target_endian = "little"))]
1572    {
1573        let mut bytes = vec![0_u8; byte_len];
1574        for (chunk, entry) in bytes.chunks_exact_mut(8).zip(entries) {
1575            chunk.copy_from_slice(&entry.to_le_bytes());
1576        }
1577        memory::write_prechecked_unmetered(offset, &bytes)?;
1578        *cursor = checked_add(
1579            offset,
1580            u64::try_from(byte_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
1581        )?;
1582        Ok(offset)
1583    }
1584}
1585
1586#[cfg(not(target_endian = "little"))]
1587fn decode_u64_table(bytes: &[u8]) -> Result<Vec<u64>, StableMemoryError> {
1588    if !bytes.len().is_multiple_of(8) {
1589        return Err(StableMemoryError::OffsetOverflow);
1590    }
1591    let mut entries = Vec::with_capacity(bytes.len() / 8);
1592    for chunk in bytes.chunks_exact(8) {
1593        entries.push(u64::from_le_bytes([
1594            chunk[0], chunk[1], chunk[2], chunk[3], chunk[4], chunk[5], chunk[6], chunk[7],
1595        ]));
1596    }
1597    Ok(entries)
1598}
1599
1600fn imported_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
1601    let count = page_count_for_size(block.import_total_size)?;
1602    let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
1603    let mut entries = Vec::with_capacity(capacity);
1604    for page_no in 0..count {
1605        entries.push(checked_add(
1606            block.import_base_offset,
1607            page_no
1608                .checked_mul(page_size())
1609                .ok_or(StableMemoryError::OffsetOverflow)?,
1610        )?);
1611    }
1612    Ok(entries)
1613}
1614
1615fn checksum_logical_range(block: &Superblock, len: u64) -> Result<u64, StableMemoryError> {
1616    let mut offset = 0_u64;
1617    let mut hash = fnv1a64(&[]);
1618    while offset < len {
1619        let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1620        let copied_len =
1621            usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1622        let mut bytes = vec![0_u8; copied_len];
1623        read_logical_range(block, offset, &mut bytes)?;
1624        hash = fold_fnv1a64(hash, &bytes);
1625        offset += chunk_len;
1626    }
1627    Ok(hash)
1628}
1629
1630fn checksum_physical_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
1631    let mut offset = 0_u64;
1632    let mut hash = fnv1a64(&[]);
1633    while offset < len {
1634        let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
1635        let copied_len =
1636            usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
1637        let mut bytes = vec![0_u8; copied_len];
1638        memory::read_preallocated(checked_add(base_offset, offset)?, &mut bytes)?;
1639        hash = fold_fnv1a64(hash, &bytes);
1640        offset += chunk_len;
1641    }
1642    Ok(hash)
1643}
1644
1645fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
1646    block.flags &= !FLAG_IMPORTING;
1647    block.import_expected_checksum = 0;
1648    block.import_written_until = 0;
1649    block.import_total_size = 0;
1650    block.import_base_offset = 0;
1651    block.store()?;
1652    invalidate_read_cache();
1653    Ok(())
1654}
1655
1656fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
1657    checked_add(block.import_base_offset, offset)
1658}
1659
1660fn active_page_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1661    page_count_for_size(block.db_size)
1662}
1663
1664fn active_segment_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1665    Ok(block.page_count)
1666}
1667
1668fn read_cache_key(block: &Superblock) -> ReadCacheKey {
1669    ReadCacheKey {
1670        page_table_offset: block.page_table_offset,
1671        page_count: block.page_count,
1672        db_size: block.db_size,
1673        last_tx_id: block.last_tx_id,
1674    }
1675}
1676
1677fn segment_count_for_pages(page_count: u64) -> Result<u64, StableMemoryError> {
1678    Ok(page_count.div_ceil(SEGMENT_PAGE_COUNT))
1679}
1680
1681fn segment_no(page_no: u64) -> u64 {
1682    page_no / SEGMENT_PAGE_COUNT
1683}
1684
1685fn segment_index(page_no: u64) -> Result<usize, StableMemoryError> {
1686    usize::try_from(page_no % SEGMENT_PAGE_COUNT).map_err(|_| StableMemoryError::OffsetOverflow)
1687}
1688
1689fn segment_page_count_usize() -> usize {
1690    usize::try_from(SEGMENT_PAGE_COUNT).expect("segment page count fits usize")
1691}
1692
1693fn segment_table_len() -> usize {
1694    segment_page_count_usize() * 8
1695}
1696
1697fn segment_table_bytes() -> Result<u64, StableMemoryError> {
1698    u64::try_from(segment_table_len()).map_err(|_| StableMemoryError::OffsetOverflow)
1699}
1700
1701fn root_table_bytes(entry_count: u64) -> Result<u64, StableMemoryError> {
1702    entry_count
1703        .checked_mul(PAGE_TABLE_ENTRY_LEN)
1704        .ok_or(StableMemoryError::OffsetOverflow)
1705}
1706
1707fn entries_len_u64<T>(entries: &[T]) -> Result<u64, StableMemoryError> {
1708    u64::try_from(entries.len()).map_err(|_| StableMemoryError::OffsetOverflow)
1709}
1710
1711fn append_base() -> Result<u64, StableMemoryError> {
1712    memory::size_pages()
1713        .checked_mul(STABLE_PAGE_SIZE)
1714        .ok_or(StableMemoryError::OffsetOverflow)
1715}
1716
1717fn page_size() -> u64 {
1718    u64::from(SQLITE_PAGE_SIZE)
1719}
1720
1721fn page_len() -> usize {
1722    usize::try_from(SQLITE_PAGE_SIZE).expect("SQLite page size fits usize")
1723}
1724
1725fn zero_page() -> Vec<u8> {
1726    vec![0_u8; page_len()]
1727}
1728
1729fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
1730    left.checked_add(right)
1731        .ok_or(StableMemoryError::OffsetOverflow)
1732}
1733
1734fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
1735    for byte in bytes {
1736        hash ^= u64::from(*byte);
1737        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
1738    }
1739    hash
1740}
1741
1742#[cfg(test)]
1743fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1744    let Ok(context) = memory::active_context_id() else {
1745        return Ok(());
1746    };
1747    FAILPOINTS.with(|slot| {
1748        let mut slot = slot.borrow_mut();
1749        if slot.get(&context).copied() == Some(failpoint) {
1750            slot.remove(&context);
1751            Err(StableMemoryError::Failpoint(failpoint.name()))
1752        } else {
1753            Ok(())
1754        }
1755    })
1756}
1757
1758#[cfg(not(test))]
1759fn hit_failpoint(_failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1760    Ok(())
1761}
1762
1763#[cfg(test)]
1764impl StableBlobFailpoint {
1765    fn name(self) -> &'static str {
1766        match self {
1767            Self::OverlayWrite => "before overlay write",
1768            Self::OverlayTruncate => "before overlay truncate",
1769            Self::CommitCapacity => "before commit capacity",
1770            Self::CommitChunkWrite => "before commit page write",
1771            Self::CommitPageTableWrite => "before commit page table write",
1772            Self::CommitSuperblockStore => "before commit superblock store",
1773        }
1774    }
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779    use super::*;
1780    use proptest::prelude::*;
1781    use proptest::test_runner::{Config, TestRunner};
1782    use std::collections::BTreeSet;
1783
1784    #[test]
1785    fn layout_math_matches_expected_boundaries() {
1786        assert_eq!(page_count_for_size(0).unwrap(), 0);
1787        assert_eq!(page_count_for_size(1).unwrap(), 1);
1788        assert_eq!(page_count_for_size(page_size()).unwrap(), 1);
1789        assert_eq!(page_count_for_size(page_size() + 1).unwrap(), 2);
1790
1791        assert_eq!(segment_count_for_pages(0).unwrap(), 0);
1792        assert_eq!(segment_count_for_pages(1).unwrap(), 1);
1793        assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT).unwrap(), 1);
1794        assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT + 1).unwrap(), 2);
1795
1796        assert_eq!(segment_no(SEGMENT_PAGE_COUNT), 1);
1797        assert_eq!(segment_index(SEGMENT_PAGE_COUNT - 1).unwrap(), 255);
1798        assert_eq!(segment_index(SEGMENT_PAGE_COUNT).unwrap(), 0);
1799        assert_eq!(root_table_bytes(2).unwrap(), 16);
1800    }
1801
1802    #[test]
1803    fn layout_math_rejects_u64_max_overflow_boundaries() {
1804        assert!(matches!(
1805            root_table_bytes(u64::MAX),
1806            Err(StableMemoryError::OffsetOverflow)
1807        ));
1808        assert!(matches!(
1809            checked_add(u64::MAX, 1),
1810            Err(StableMemoryError::OffsetOverflow)
1811        ));
1812
1813        let mut block = Superblock::fresh();
1814        block.import_base_offset = u64::MAX;
1815        assert!(matches!(
1816            import_offset(&block, 1),
1817            Err(StableMemoryError::OffsetOverflow)
1818        ));
1819
1820        block.import_base_offset = u64::MAX - page_size() + 1;
1821        block.import_total_size = page_size() + 1;
1822        assert!(matches!(
1823            imported_page_table(&block),
1824            Err(StableMemoryError::OffsetOverflow)
1825        ));
1826    }
1827
1828    #[test]
1829    fn pbt_layout_math_matches_verus_model() {
1830        let mut runner = TestRunner::new(Config {
1831            cases: 512,
1832            ..Config::default()
1833        });
1834
1835        runner
1836            .run(
1837                &(
1838                    boundary_size_strategy(),
1839                    boundary_page_strategy(),
1840                    boundary_entry_strategy(),
1841                ),
1842                |(size, page_no, entries)| {
1843                    let page_count = page_count_for_size(size).unwrap();
1844                    let page_size = u128::from(page_size());
1845                    if size == 0 {
1846                        prop_assert_eq!(page_count, 0);
1847                    } else {
1848                        prop_assert!(u128::from(page_count - 1) * page_size < u128::from(size));
1849                        prop_assert!(u128::from(size) <= u128::from(page_count) * page_size);
1850                    }
1851
1852                    let segment_count = segment_count_for_pages(page_count).unwrap();
1853                    if page_count == 0 {
1854                        prop_assert_eq!(segment_count, 0);
1855                    } else {
1856                        prop_assert!(
1857                            u128::from(segment_count - 1) * u128::from(SEGMENT_PAGE_COUNT)
1858                                < u128::from(page_count)
1859                        );
1860                        prop_assert!(
1861                            u128::from(page_count)
1862                                <= u128::from(segment_count) * u128::from(SEGMENT_PAGE_COUNT)
1863                        );
1864                    }
1865
1866                    let index = segment_index(page_no).unwrap();
1867                    prop_assert!(index < segment_page_count_usize());
1868                    prop_assert_eq!(
1869                        u128::from(segment_no(page_no)) * u128::from(SEGMENT_PAGE_COUNT)
1870                            + index as u128,
1871                        u128::from(page_no)
1872                    );
1873
1874                    match root_table_bytes(entries) {
1875                        Ok(bytes) => prop_assert_eq!(bytes, entries * PAGE_TABLE_ENTRY_LEN),
1876                        Err(StableMemoryError::OffsetOverflow) => {
1877                            prop_assert!(entries.checked_mul(PAGE_TABLE_ENTRY_LEN).is_none());
1878                        }
1879                        Err(error) => return Err(TestCaseError::fail(error.to_string())),
1880                    }
1881                    Ok(())
1882                },
1883            )
1884            .unwrap();
1885    }
1886
1887    fn boundary_size_strategy() -> impl Strategy<Value = u64> {
1888        let page = page_size();
1889        let segment_bytes = SEGMENT_PAGE_COUNT * page;
1890        prop_oneof![
1891            any::<u64>(),
1892            prop::sample::select(boundary_values(&[
1893                0,
1894                1,
1895                page - 1,
1896                page,
1897                page + 1,
1898                segment_bytes - 1,
1899                segment_bytes,
1900                segment_bytes + 1,
1901                u64::MAX,
1902            ])),
1903        ]
1904    }
1905
1906    fn boundary_page_strategy() -> impl Strategy<Value = u64> {
1907        prop_oneof![
1908            any::<u64>(),
1909            prop::sample::select(boundary_values(&[
1910                0,
1911                1,
1912                SEGMENT_PAGE_COUNT - 1,
1913                SEGMENT_PAGE_COUNT,
1914                SEGMENT_PAGE_COUNT + 1,
1915                u64::MAX,
1916            ])),
1917        ]
1918    }
1919
1920    fn boundary_entry_strategy() -> impl Strategy<Value = u64> {
1921        let max_without_overflow = u64::MAX / PAGE_TABLE_ENTRY_LEN;
1922        prop_oneof![
1923            any::<u64>(),
1924            prop::sample::select(boundary_values(&[
1925                0,
1926                1,
1927                SEGMENT_PAGE_COUNT - 1,
1928                SEGMENT_PAGE_COUNT,
1929                SEGMENT_PAGE_COUNT + 1,
1930                max_without_overflow - 1,
1931                max_without_overflow,
1932                max_without_overflow + 1,
1933                u64::MAX - 1,
1934                u64::MAX,
1935            ])),
1936        ]
1937    }
1938
1939    fn boundary_values(values: &[u64]) -> Vec<u64> {
1940        values
1941            .iter()
1942            .flat_map(|value| [value.saturating_sub(1), *value, value.saturating_add(1)])
1943            .collect()
1944    }
1945
1946    #[test]
1947    fn fnv_fold_matches_one_pass_for_multiple_partitions() {
1948        let bytes: Vec<u8> = (0..97)
1949            .map(|index| (index as u8).wrapping_mul(37).wrapping_add(11))
1950            .collect();
1951        let expected = fnv1a64(&bytes);
1952
1953        for split in [0_usize, 1, 2, 7, 31, 64, bytes.len()] {
1954            let split = split.min(bytes.len());
1955            let mut hash = fnv1a64(&[]);
1956            hash = fold_fnv1a64(hash, &bytes[..split]);
1957            hash = fold_fnv1a64(hash, &bytes[split..]);
1958            assert_eq!(hash, expected);
1959        }
1960
1961        let mut hash = fnv1a64(&[]);
1962        for chunk in bytes.chunks(13) {
1963            hash = fold_fnv1a64(hash, chunk);
1964        }
1965        assert_eq!(hash, expected);
1966    }
1967
1968    #[test]
1969    #[serial_test::serial]
1970    fn page_map_commit_tracks_dirty_page_offsets() {
1971        crate::stable::memory::reset_for_tests();
1972        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
1973        invalidate_read_cache();
1974
1975        let page_zero = vec![1_u8; page_len()];
1976        let page_later = vec![2_u8; page_len()];
1977        let later_page_no = SEGMENT_PAGE_COUNT + 1;
1978        write_at(0, &page_zero).unwrap();
1979        write_at(later_page_no * page_size(), &page_later).unwrap();
1980
1981        let block = Superblock::load().unwrap();
1982        let root = read_root_table(&block).unwrap();
1983        let table = read_page_table(&block).unwrap();
1984        let expected_pages = active_page_count(&block).unwrap();
1985        let expected_segments = segment_count_for_pages(expected_pages).unwrap();
1986
1987        assert_eq!(root.len() as u64, expected_segments);
1988        assert_eq!(table.len() as u64, expected_pages);
1989        assert_ne!(table[0], 0);
1990        assert_ne!(table[later_page_no as usize], 0);
1991
1992        let old_page_zero_offset = table[0];
1993        let updated_page_zero = vec![3_u8; page_len()];
1994        write_at(0, &updated_page_zero).unwrap();
1995        let updated_table = read_page_table(&Superblock::load().unwrap()).unwrap();
1996        let mut out = vec![0_u8; page_len()];
1997        read_base_at(0, &mut out).unwrap();
1998
1999        assert_ne!(updated_table[0], old_page_zero_offset);
2000        assert_eq!(out, updated_page_zero);
2001    }
2002
2003    #[test]
2004    #[serial_test::serial]
2005    fn page_map_commit_tracks_multi_segment_dirty_and_clean_pages() {
2006        crate::stable::memory::reset_for_tests();
2007        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2008        invalidate_read_cache();
2009
2010        let clean_page_no = 1;
2011        let later_page_no = SEGMENT_PAGE_COUNT + 1;
2012        write_at(0, &vec![1_u8; page_len()]).unwrap();
2013        write_at(clean_page_no * page_size(), &vec![2_u8; page_len()]).unwrap();
2014        write_at(later_page_no * page_size(), &vec![3_u8; page_len()]).unwrap();
2015
2016        let before = Superblock::load().unwrap();
2017        let before_root = read_root_table(&before).unwrap();
2018        let before_table = read_page_table(&before).unwrap();
2019
2020        begin_update().unwrap();
2021        write_at(0, &vec![4_u8; page_len()]).unwrap();
2022        write_at(later_page_no * page_size(), &vec![5_u8; page_len()]).unwrap();
2023        commit_update().unwrap();
2024
2025        let after = Superblock::load().unwrap();
2026        let after_root = read_root_table(&after).unwrap();
2027        let after_table = read_page_table(&after).unwrap();
2028
2029        assert_eq!(after_root.len(), after.page_count as usize);
2030        assert_eq!(after_root.len(), before_root.len());
2031        assert_ne!(after_table[0], before_table[0]);
2032        assert_eq!(
2033            after_table[clean_page_no as usize],
2034            before_table[clean_page_no as usize]
2035        );
2036        assert_ne!(
2037            after_table[later_page_no as usize],
2038            before_table[later_page_no as usize]
2039        );
2040    }
2041
2042    #[test]
2043    #[serial_test::serial]
2044    fn page_map_commit_zeroes_truncated_tail_slots() {
2045        crate::stable::memory::reset_for_tests();
2046        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2047        invalidate_read_cache();
2048
2049        write_at(0, &vec![1_u8; page_len()]).unwrap();
2050        write_at(page_size(), &vec![2_u8; page_len()]).unwrap();
2051        write_at(2 * page_size(), &vec![3_u8; page_len()]).unwrap();
2052        truncate(page_size()).unwrap();
2053
2054        let block = Superblock::load().unwrap();
2055        let root = read_root_table(&block).unwrap();
2056        let segment = read_segment_table(&block, &root, 0).unwrap();
2057
2058        assert_eq!(block.db_size, page_size());
2059        assert_eq!(segment[0] != 0, true);
2060        assert_eq!(segment[1], 0);
2061        assert_eq!(segment[2], 0);
2062    }
2063
2064    #[test]
2065    #[serial_test::serial]
2066    fn compact_keeps_zero_pages_and_densifies_offsets_across_segments() {
2067        crate::stable::memory::reset_for_tests();
2068        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2069        invalidate_read_cache();
2070
2071        let later_page_no = SEGMENT_PAGE_COUNT + 2;
2072        let first_page = vec![7_u8; page_len()];
2073        let later_page = vec![9_u8; page_len()];
2074        write_at(0, &first_page).unwrap();
2075        write_at(later_page_no * page_size(), &later_page).unwrap();
2076
2077        compact().unwrap();
2078
2079        let block = Superblock::load().unwrap();
2080        let root = read_root_table(&block).unwrap();
2081        let table = read_page_table(&block).unwrap();
2082        let mut first_out = vec![0_u8; page_len()];
2083        let mut later_out = vec![0_u8; page_len()];
2084
2085        read_base_at(0, &mut first_out).unwrap();
2086        read_base_at(later_page_no * page_size(), &mut later_out).unwrap();
2087
2088        assert_eq!(root.len() as u64, block.page_count);
2089        assert_eq!(table.len() as u64, active_page_count(&block).unwrap());
2090        assert_ne!(table[0], 0);
2091        assert_eq!(table[1], 0);
2092        assert_eq!(table[later_page_no as usize], table[0] + page_size());
2093        assert_eq!(first_out, first_page);
2094        assert_eq!(later_out, later_page);
2095    }
2096
2097    #[test]
2098    #[serial_test::serial]
2099    fn single_segment_fast_path_preserves_table_after_expand_only_commit() {
2100        crate::stable::memory::reset_for_tests();
2101        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2102        invalidate_read_cache();
2103
2104        write_at(0, &[0]).unwrap();
2105        truncate(page_size() * 4).unwrap();
2106        truncate(page_size() * 4 + 1).unwrap();
2107
2108        let block = Superblock::load().unwrap();
2109        let table = read_page_table(&block).unwrap();
2110        let mut first = [1_u8; 1];
2111        let mut expanded_tail = [1_u8; 1];
2112
2113        read_base_at(0, &mut first).unwrap();
2114        read_base_at(page_size() * 4, &mut expanded_tail).unwrap();
2115
2116        assert_eq!(block.db_size, page_size() * 4 + 1);
2117        assert_ne!(table[0], 0);
2118        assert_eq!(table[1], 0);
2119        assert_ne!(table[4], 0);
2120        assert_eq!(first, [0]);
2121        assert_eq!(expanded_tail, [0]);
2122    }
2123
2124    #[test]
2125    #[serial_test::serial]
2126    fn page_table_u64_encoding_is_little_endian_and_round_trips() {
2127        crate::stable::memory::reset_for_tests();
2128        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2129        invalidate_read_cache();
2130
2131        let entries = [
2132            0_u64,
2133            1,
2134            0x0102_0304_0506_0708,
2135            0xf1f2_f3f4_f5f6_f7f8,
2136            u64::MAX,
2137        ];
2138        let mut cursor = 128_u64;
2139        let expected_len = u64::try_from(entries.len() * 8).unwrap();
2140        crate::stable::memory::ensure_capacity(cursor + expected_len).unwrap();
2141
2142        let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2143        let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2144        let mut encoded = vec![0_u8; entries.len() * 8];
2145        crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2146        let expected = entries
2147            .iter()
2148            .flat_map(|entry| entry.to_le_bytes())
2149            .collect::<Vec<_>>();
2150
2151        assert_eq!(offset, 128);
2152        assert_eq!(cursor, 128 + expected_len);
2153        assert_eq!(decoded, entries);
2154        assert_eq!(encoded, expected);
2155
2156        let mut empty_cursor = cursor;
2157        assert_eq!(write_u64_table_at(&[], &mut empty_cursor).unwrap(), 0);
2158        assert_eq!(empty_cursor, cursor);
2159        assert!(read_u64_table_at(cursor, 0).unwrap().is_empty());
2160    }
2161
2162    #[test]
2163    #[serial_test::serial]
2164    fn pbt_page_table_u64_encoding_round_trips() {
2165        let mut runner = TestRunner::new(Config {
2166            cases: 128,
2167            ..Config::default()
2168        });
2169
2170        runner
2171            .run(
2172                &proptest::collection::vec(any::<u64>(), 0..=512),
2173                |entries| {
2174                    crate::stable::memory::reset_for_tests();
2175                    crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2176                    invalidate_read_cache();
2177
2178                    let mut cursor = 128_u64;
2179                    let byte_len = entries.len().checked_mul(8).unwrap();
2180                    let end = cursor + u64::try_from(byte_len).unwrap();
2181                    crate::stable::memory::ensure_capacity(end).unwrap();
2182
2183                    let offset = write_u64_table_at(&entries, &mut cursor).unwrap();
2184                    let decoded = read_u64_table_at(offset, entries.len()).unwrap();
2185                    prop_assert_eq!(decoded, entries.clone());
2186                    prop_assert_eq!(cursor, end);
2187
2188                    let mut encoded = vec![0_u8; byte_len];
2189                    crate::stable::memory::read_preallocated(offset, &mut encoded).unwrap();
2190                    let expected = entries
2191                        .iter()
2192                        .flat_map(|entry| entry.to_le_bytes())
2193                        .collect::<Vec<_>>();
2194                    prop_assert_eq!(encoded, expected);
2195                    Ok(())
2196                },
2197            )
2198            .unwrap();
2199    }
2200
2201    #[test]
2202    #[serial_test::serial]
2203    fn pbt_compact_preserves_sparse_page_model() {
2204        let mut runner = TestRunner::new(Config {
2205            cases: 32,
2206            ..Config::default()
2207        });
2208
2209        runner
2210            .run(
2211                &proptest::collection::vec(prop::option::of(any::<u8>()), 0..=300),
2212                |pages| {
2213                    crate::stable::memory::reset_for_tests();
2214                    crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2215                    invalidate_read_cache();
2216
2217                    let active_len = pages
2218                        .iter()
2219                        .rposition(Option::is_some)
2220                        .map(|index| index + 1)
2221                        .unwrap_or(0);
2222                    for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2223                        if let Some(byte) = byte {
2224                            write_at(
2225                                u64::try_from(page_no).unwrap() * page_size(),
2226                                &vec![*byte; page_len()],
2227                            )
2228                            .unwrap();
2229                        }
2230                    }
2231
2232                    compact().unwrap();
2233                    let block = Superblock::load().unwrap();
2234                    prop_assert_eq!(
2235                        block.db_size,
2236                        u64::try_from(active_len).unwrap() * page_size()
2237                    );
2238                    let table = read_page_table(&block).unwrap();
2239                    prop_assert_eq!(table.len(), active_len);
2240
2241                    let mut first_compacted_offset = None;
2242                    let mut non_zero_seen = 0_u64;
2243                    for (page_no, byte) in pages.iter().take(active_len).enumerate() {
2244                        let entry = table[page_no];
2245                        let mut page = vec![0_u8; page_len()];
2246                        read_base_at(u64::try_from(page_no).unwrap() * page_size(), &mut page)
2247                            .unwrap();
2248
2249                        if let Some(byte) = byte {
2250                            let base = *first_compacted_offset.get_or_insert(entry);
2251                            prop_assert_ne!(entry, 0);
2252                            prop_assert_eq!(entry, base + non_zero_seen * page_size());
2253                            prop_assert_eq!(page, vec![*byte; page_len()]);
2254                            non_zero_seen += 1;
2255                        } else {
2256                            prop_assert_eq!(entry, 0);
2257                            prop_assert_eq!(page, vec![0_u8; page_len()]);
2258                        }
2259                    }
2260                    Ok(())
2261                },
2262            )
2263            .unwrap();
2264    }
2265
2266    #[derive(Clone, Debug)]
2267    enum BlobOp {
2268        Write { offset: u64, len: usize, byte: u8 },
2269        Truncate { size: u64 },
2270        Compact,
2271    }
2272
2273    #[test]
2274    #[serial_test::serial]
2275    fn pbt_blob_operations_match_logical_model_across_compact() {
2276        let mut runner = TestRunner::new(Config {
2277            cases: 48,
2278            ..Config::default()
2279        });
2280
2281        runner
2282            .run(&blob_operation_sequence(), |operations| {
2283                crate::stable::memory::reset_for_tests();
2284                crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2285                invalidate_read_cache();
2286
2287                let mut model = Vec::new();
2288                let mut materialized = BTreeSet::new();
2289                assert_blob_model(&model, &materialized, false)?;
2290
2291                for operation in operations {
2292                    let compacted = apply_blob_op(operation, &mut model, &mut materialized)?;
2293                    assert_blob_model(&model, &materialized, compacted)?;
2294                }
2295                Ok(())
2296            })
2297            .unwrap();
2298    }
2299
2300    fn blob_operation_sequence() -> impl Strategy<Value = Vec<BlobOp>> {
2301        let write = (blob_offset_strategy(), blob_len_strategy(), any::<u8>())
2302            .prop_map(|(offset, len, byte)| BlobOp::Write { offset, len, byte });
2303        let truncate = blob_offset_strategy().prop_map(|size| BlobOp::Truncate { size });
2304        proptest::collection::vec(prop_oneof![write, truncate, Just(BlobOp::Compact)], 0..=48)
2305    }
2306
2307    fn blob_offset_strategy() -> impl Strategy<Value = u64> {
2308        let limit = blob_model_limit();
2309        let page = page_size();
2310        let segment = SEGMENT_PAGE_COUNT * page;
2311        prop_oneof![
2312            0_u64..=limit,
2313            prop::sample::select(boundary_values(&[
2314                0,
2315                1,
2316                page - 1,
2317                page,
2318                page + 1,
2319                segment - 1,
2320                segment,
2321                segment + 1,
2322                limit - 1,
2323                limit,
2324            ]))
2325            .prop_map(move |value| value.min(limit)),
2326        ]
2327    }
2328
2329    fn blob_len_strategy() -> impl Strategy<Value = usize> {
2330        prop_oneof![
2331            0_usize..=(page_len() * 2 + 17),
2332            prop::sample::select(vec![
2333                0,
2334                1,
2335                page_len() - 1,
2336                page_len(),
2337                page_len() + 1,
2338                page_len() * 2 + 1,
2339            ]),
2340        ]
2341    }
2342
2343    fn blob_model_limit() -> u64 {
2344        (SEGMENT_PAGE_COUNT + 3) * page_size()
2345    }
2346
2347    fn apply_blob_op(
2348        operation: BlobOp,
2349        model: &mut Vec<u8>,
2350        materialized: &mut BTreeSet<u64>,
2351    ) -> Result<bool, TestCaseError> {
2352        match operation {
2353            BlobOp::Write { offset, len, byte } => {
2354                let len = len.min(usize::try_from(blob_model_limit() - offset).unwrap());
2355                let bytes = vec![byte; len];
2356                write_at(offset, &bytes).map_err(|error| TestCaseError::fail(error.to_string()))?;
2357                if len == 0 {
2358                    return Ok(false);
2359                }
2360
2361                let start = usize::try_from(offset).unwrap();
2362                let end = start + len;
2363                if model.len() < start {
2364                    model.resize(start, 0);
2365                }
2366                if model.len() < end {
2367                    model.resize(end, 0);
2368                }
2369                model[start..end].copy_from_slice(&bytes);
2370                mark_materialized_range(offset, len, materialized);
2371                Ok(false)
2372            }
2373            BlobOp::Truncate { size } => {
2374                truncate(size).map_err(|error| TestCaseError::fail(error.to_string()))?;
2375                let new_len = usize::try_from(size).unwrap();
2376                model.resize(new_len, 0);
2377                let active_pages = page_count_for_size(size)
2378                    .map_err(|error| TestCaseError::fail(error.to_string()))?;
2379                materialized.retain(|page_no| *page_no < active_pages);
2380                if size > 0 && !size.is_multiple_of(page_size()) {
2381                    materialized.insert(size / page_size());
2382                }
2383                Ok(false)
2384            }
2385            BlobOp::Compact => {
2386                compact().map_err(|error| TestCaseError::fail(error.to_string()))?;
2387                Ok(true)
2388            }
2389        }
2390    }
2391
2392    fn mark_materialized_range(offset: u64, len: usize, materialized: &mut BTreeSet<u64>) {
2393        let end = offset + u64::try_from(len).unwrap();
2394        let first_page = offset / page_size();
2395        let last_page = (end - 1) / page_size();
2396        for page_no in first_page..=last_page {
2397            materialized.insert(page_no);
2398        }
2399    }
2400
2401    fn assert_blob_model(
2402        model: &[u8],
2403        materialized: &BTreeSet<u64>,
2404        expect_compacted: bool,
2405    ) -> Result<(), TestCaseError> {
2406        let block = Superblock::load().map_err(|error| TestCaseError::fail(error.to_string()))?;
2407        prop_assert_eq!(block.db_size, u64::try_from(model.len()).unwrap());
2408
2409        if !model.is_empty() {
2410            let mut out = vec![0_u8; model.len()];
2411            read_base_at(0, &mut out).map_err(|error| TestCaseError::fail(error.to_string()))?;
2412            prop_assert_eq!(out, model);
2413        }
2414
2415        let mut tail = vec![1_u8; 32];
2416        read_base_at(u64::try_from(model.len()).unwrap(), &mut tail)
2417            .map_err(|error| TestCaseError::fail(error.to_string()))?;
2418        prop_assert_eq!(tail, vec![0_u8; 32]);
2419
2420        let table =
2421            read_page_table(&block).map_err(|error| TestCaseError::fail(error.to_string()))?;
2422        let active_pages = page_count_for_size(u64::try_from(model.len()).unwrap())
2423            .map_err(|error| TestCaseError::fail(error.to_string()))?;
2424        prop_assert_eq!(table.len(), usize::try_from(active_pages).unwrap());
2425
2426        let mut first_compacted_offset = None;
2427        let mut non_zero_seen = 0_u64;
2428        for (index, entry) in table.iter().enumerate() {
2429            let page_no = u64::try_from(index).unwrap();
2430            if materialized.contains(&page_no) {
2431                prop_assert_ne!(*entry, 0);
2432                if expect_compacted {
2433                    let base = *first_compacted_offset.get_or_insert(*entry);
2434                    prop_assert_eq!(*entry, base + non_zero_seen * page_size());
2435                }
2436                non_zero_seen += 1;
2437            } else {
2438                prop_assert_eq!(*entry, 0);
2439            }
2440        }
2441        Ok(())
2442    }
2443
2444    #[test]
2445    #[serial_test::serial]
2446    fn read_metrics_separate_table_cache_from_data_reads() {
2447        crate::stable::memory::reset_for_tests();
2448        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2449        invalidate_read_cache();
2450
2451        let page = vec![7_u8; page_len()];
2452        write_at(0, &page).unwrap();
2453        invalidate_read_cache();
2454        crate::read_metrics::reset_read_metrics();
2455
2456        let first = read_base_page(0).unwrap();
2457        let second = read_base_page(0).unwrap();
2458        let metrics = crate::read_metrics::read_metrics_snapshot();
2459
2460        assert_eq!(first, page);
2461        assert_eq!(second, page);
2462        assert!(metrics.stable_data_read_calls >= 2);
2463        assert!(metrics.stable_data_read_bytes >= page_size() * 2);
2464        assert!(metrics.page_table_root_misses >= 1);
2465        assert!(metrics.page_table_root_hits >= 1);
2466        assert!(metrics.page_table_segment_misses >= 1);
2467        assert!(metrics.page_table_segment_hits >= 1);
2468        #[cfg(feature = "bench-profile")]
2469        assert!(metrics.superblock_loads <= 1);
2470        #[cfg(not(feature = "bench-profile"))]
2471        assert_eq!(metrics.superblock_loads, 0);
2472    }
2473
2474    #[test]
2475    #[serial_test::serial]
2476    fn page_offset_cache_reuses_page_data_for_small_reads() {
2477        crate::stable::memory::reset_for_tests();
2478        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
2479        invalidate_read_cache();
2480
2481        let page = vec![9_u8; page_len()];
2482        write_at(0, &page).unwrap();
2483        let block = Superblock::load().unwrap();
2484        let mut cache = PageOffsetCache::new();
2485        let mut first = [0_u8; 16];
2486        let mut second = [0_u8; 16];
2487
2488        crate::read_metrics::reset_read_metrics();
2489        read_base_at_with_page_cache(&block, 0, &mut first, &mut cache).unwrap();
2490        read_base_at_with_page_cache(&block, 8, &mut second, &mut cache).unwrap();
2491        let metrics = crate::read_metrics::read_metrics_snapshot();
2492
2493        assert_eq!(first, [9_u8; 16]);
2494        assert_eq!(second, [9_u8; 16]);
2495        assert_eq!(metrics.stable_data_read_calls, 1);
2496        assert_eq!(metrics.stable_data_read_bytes, page_size());
2497    }
2498}