Skip to main content

ic_sqlite_vfs/sqlite_vfs/
stable_blob.rs

1//! Logical `/main.db` access backed by segmented stable-memory page mapping.
2//!
3//! SQLite sees a contiguous file. Internally, the active superblock points to a
4//! root table. Each root entry points to a 256-page segment table.
5
6use crate::config::{SQLITE_PAGE_SIZE, STABLE_PAGE_SIZE, SUPERBLOCK_SIZE};
7use crate::sqlite_vfs::overlay::{self, Overlay};
8use crate::stable::memory::{self, ContextId, StableMemoryError};
9use crate::stable::meta::{
10    fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
11    PAGE_MAP_LAYOUT_VERSION,
12};
13use std::cell::RefCell;
14use std::collections::{BTreeMap, VecDeque};
15
16const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
17const PAGE_TABLE_ENTRY_LEN: u64 = 8;
18const SEGMENT_PAGE_COUNT: u64 = 256;
19const READ_SEGMENT_CACHE_CAPACITY: usize = 8;
20const FILE_PAGE_OFFSET_CACHE_CAPACITY: usize = 64;
21const COMPACT_MIN_ORPHAN_BYTES: u64 = 16 * 1024 * 1024;
22
23#[derive(Clone, Debug, Eq, PartialEq)]
24pub struct ChecksumRefresh {
25    pub complete: bool,
26    pub checksum: u64,
27    pub scanned_bytes: u64,
28    pub db_size: u64,
29}
30
31#[derive(Clone, Debug, Eq, PartialEq)]
32pub struct StorageStats {
33    pub layout_version: u64,
34    pub page_count: u64,
35    pub page_table_bytes: u64,
36    pub active_bytes: u64,
37    pub allocated_bytes: u64,
38    pub orphan_bytes_estimate: u64,
39    pub orphan_ratio_basis_points: u64,
40    pub compact_recommended: bool,
41}
42
43#[derive(Clone, Copy, Debug, Eq, PartialEq)]
44pub(crate) enum StableBlobFailpoint {
45    OverlayWrite,
46    OverlayTruncate,
47    CommitCapacity,
48    CommitChunkWrite,
49    CommitPageTableWrite,
50    CommitSuperblockStore,
51}
52
53thread_local! {
54    static FAILPOINTS: RefCell<BTreeMap<ContextId, StableBlobFailpoint>> = const { RefCell::new(BTreeMap::new()) };
55    static READ_TABLE_CACHE: RefCell<BTreeMap<ContextId, ReadTableCache>> = const { RefCell::new(BTreeMap::new()) };
56}
57
58#[derive(Clone, Copy, Debug, Eq, PartialEq)]
59struct ReadCacheKey {
60    page_table_offset: u64,
61    page_count: u64,
62    db_size: u64,
63    last_tx_id: u64,
64}
65
66#[derive(Debug)]
67struct ReadTableCache {
68    key: Option<ReadCacheKey>,
69    root: Vec<u64>,
70    segments: BTreeMap<u64, Vec<u64>>,
71    segment_lru: VecDeque<u64>,
72}
73
74impl ReadTableCache {
75    fn new() -> Self {
76        Self {
77            key: None,
78            root: Vec::new(),
79            segments: BTreeMap::new(),
80            segment_lru: VecDeque::new(),
81        }
82    }
83
84    fn clear(&mut self) {
85        self.key = None;
86        self.root.clear();
87        self.segments.clear();
88        self.segment_lru.clear();
89    }
90
91    fn ensure_key(&mut self, key: ReadCacheKey) {
92        if self.key == Some(key) {
93            return;
94        }
95        self.clear();
96        self.key = Some(key);
97    }
98
99    fn touch_segment(&mut self, segment_no: u64) {
100        self.segment_lru.retain(|cached| *cached != segment_no);
101        self.segment_lru.push_back(segment_no);
102    }
103
104    fn insert_segment(&mut self, segment_no: u64, table: Vec<u64>) {
105        self.segments.insert(segment_no, table);
106        self.touch_segment(segment_no);
107        while self.segments.len() > READ_SEGMENT_CACHE_CAPACITY {
108            let Some(evicted) = self.segment_lru.pop_front() else {
109                return;
110            };
111            self.segments.remove(&evicted);
112        }
113    }
114}
115
116#[derive(Debug)]
117pub(crate) struct PageOffsetCache {
118    entries: Vec<(u64, u64)>,
119}
120
121impl PageOffsetCache {
122    pub(crate) fn new() -> Self {
123        Self {
124            entries: Vec::with_capacity(FILE_PAGE_OFFSET_CACHE_CAPACITY),
125        }
126    }
127
128    fn get(&self, page_no: u64) -> Option<u64> {
129        self.entries
130            .iter()
131            .find_map(|(cached_page, physical)| (*cached_page == page_no).then_some(*physical))
132    }
133
134    fn insert(&mut self, page_no: u64, physical: u64) {
135        if self
136            .entries
137            .iter()
138            .any(|(cached_page, _)| *cached_page == page_no)
139        {
140            return;
141        }
142        if self.entries.len() == FILE_PAGE_OFFSET_CACHE_CAPACITY {
143            self.entries.remove(0);
144        }
145        self.entries.push((page_no, physical));
146    }
147}
148
149#[cfg(test)]
150pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
151    if let Ok(context) = memory::active_context_id() {
152        FAILPOINTS.with(|slot| {
153            slot.borrow_mut().insert(context, failpoint);
154        });
155    }
156}
157
158#[cfg(test)]
159pub(crate) fn clear_failpoint() {
160    FAILPOINTS.with(|slot| slot.borrow_mut().clear());
161}
162
163pub(crate) fn ensure_page_map_layout() -> Result<(), StableMemoryError> {
164    let block = Superblock::load()?;
165    if block.layout_version >= PAGE_MAP_LAYOUT_VERSION {
166        return Ok(());
167    }
168    Err(StableMemoryError::UnsupportedLayoutVersion(
169        block.layout_version,
170    ))
171}
172
173pub(crate) fn begin_update() -> Result<(), StableMemoryError> {
174    ensure_page_map_layout()?;
175    overlay::begin(Superblock::load()?.db_size)
176}
177
178pub(crate) fn rollback_update() {
179    overlay::rollback();
180}
181
182#[doc(hidden)]
183pub fn invalidate_read_cache() {
184    READ_TABLE_CACHE.with(|cache| cache.borrow_mut().clear());
185}
186
187pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
188    let Some(overlay) = overlay::take() else {
189        return Ok(());
190    };
191    if overlay.is_empty() {
192        return Ok(());
193    }
194    commit_overlay(overlay, true)
195}
196
197pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
198    if let Some(result) = overlay::read_at(offset, dst) {
199        return result;
200    }
201    read_base_at(offset, dst)
202}
203
204pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
205    if dst.is_empty() {
206        return Ok(true);
207    }
208    let block = Superblock::load()?;
209    read_base_at_with_block(&block, offset, dst)
210}
211
212pub(crate) fn read_base_at_with_block(
213    block: &Superblock,
214    offset: u64,
215    dst: &mut [u8],
216) -> Result<bool, StableMemoryError> {
217    if dst.is_empty() {
218        return Ok(true);
219    }
220    if offset >= block.db_size {
221        dst.fill(0);
222        return Ok(false);
223    }
224    let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
225    let copied = requested.min(block.db_size - offset);
226    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
227    read_logical_range(block, offset, &mut dst[..copied_len])?;
228    dst[copied_len..].fill(0);
229    Ok(copied == requested)
230}
231
232pub(crate) fn read_base_at_with_page_cache(
233    block: &Superblock,
234    offset: u64,
235    dst: &mut [u8],
236    page_offsets: &mut PageOffsetCache,
237) -> Result<bool, StableMemoryError> {
238    if dst.is_empty() {
239        return Ok(true);
240    }
241    if offset >= block.db_size {
242        dst.fill(0);
243        return Ok(false);
244    }
245    let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
246    let copied = requested.min(block.db_size - offset);
247    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
248    read_logical_range_with_page_cache(block, offset, &mut dst[..copied_len], page_offsets)?;
249    dst[copied_len..].fill(0);
250    Ok(copied == requested)
251}
252
253pub(crate) fn read_base_page(page_no: u64) -> Result<Vec<u8>, StableMemoryError> {
254    let block = Superblock::load()?;
255    let mut page = zero_page();
256    if page_no >= active_page_count(&block)? {
257        return Ok(page);
258    }
259    let physical = page_offset_for(&block, page_no)?;
260    if physical != 0 {
261        #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
262        crate::read_metrics::record_stable_data_read(page.len());
263        memory::read(physical, &mut page)?;
264    }
265    Ok(page)
266}
267
268pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
269    if let Some(result) = overlay::write_at(offset, bytes) {
270        hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
271        return result;
272    }
273    if bytes.is_empty() {
274        return Ok(());
275    }
276    ensure_page_map_layout()?;
277    let mut direct = Overlay::new(Superblock::load()?.db_size);
278    direct.write_at(offset, bytes)?;
279    commit_overlay(direct, false)
280}
281
282pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
283    if let Some(result) = overlay::truncate(size) {
284        hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
285        return result;
286    }
287    ensure_page_map_layout()?;
288    let mut direct = Overlay::new(Superblock::load()?.db_size);
289    direct.truncate(size)?;
290    if direct.is_empty() {
291        return Ok(());
292    }
293    commit_overlay(direct, false)
294}
295
296pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
297    if let Some(size) = overlay::file_size() {
298        return Ok(size);
299    }
300    Ok(Superblock::load()?.db_size)
301}
302
303pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
304    reject_during_update()?;
305    let block = Superblock::load()?;
306    if offset >= block.db_size {
307        return Ok(Vec::new());
308    }
309    let copied = len.min(block.db_size - offset);
310    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
311    let mut out = vec![0_u8; copied_len];
312    read_logical_range(&block, offset, &mut out)?;
313    Ok(out)
314}
315
316pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
317    reject_during_update()?;
318    let mut block = Superblock::load()?;
319    if !block.is_importing() {
320        return Err(StableMemoryError::ImportNotStarted);
321    }
322    let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
323    if offset != block.import_written_until {
324        return Err(StableMemoryError::ImportOutOfOrder {
325            offset,
326            expected: block.import_written_until,
327        });
328    }
329    let end = checked_add(offset, len)?;
330    if end > block.import_total_size {
331        return Err(StableMemoryError::ImportOutOfBounds {
332            offset,
333            len,
334            db_size: block.import_total_size,
335        });
336    }
337    memory::write(import_offset(&block, offset)?, bytes)?;
338    block.import_written_until = end;
339    block.store()?;
340    invalidate_read_cache();
341    Ok(())
342}
343
344pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
345    reject_during_update()?;
346    let mut block = Superblock::load()?;
347    if block.is_importing() {
348        return Err(StableMemoryError::ImportAlreadyStarted);
349    }
350    let import_base_offset = append_base()?;
351    checked_add(import_base_offset, total_size)?;
352    block.flags |= FLAG_IMPORTING;
353    block.clear_checksum_refresh();
354    block.import_expected_checksum = expected_checksum;
355    block.import_written_until = 0;
356    block.import_total_size = total_size;
357    block.import_base_offset = import_base_offset;
358    block.store()?;
359    invalidate_read_cache();
360    Ok(())
361}
362
363pub fn finish_import() -> Result<(), StableMemoryError> {
364    reject_during_update()?;
365    let mut block = Superblock::load()?;
366    if !block.is_importing() {
367        return Err(StableMemoryError::ImportNotStarted);
368    }
369    if block.import_written_until != block.import_total_size {
370        return Err(StableMemoryError::ImportIncomplete {
371            written_until: block.import_written_until,
372            db_size: block.import_total_size,
373        });
374    }
375    let checksum = checksum_physical_range(block.import_base_offset, block.import_total_size)?;
376    if checksum != block.import_expected_checksum {
377        let expected = block.import_expected_checksum;
378        clear_import(&mut block)?;
379        return Err(StableMemoryError::ChecksumMismatch {
380            expected,
381            actual: checksum,
382        });
383    }
384    let entries = imported_page_table(&block)?;
385    let (root_offset, root_len) = write_segmented_tables(&entries)?;
386    block.db_size = block.import_total_size;
387    block.db_base_offset = block.import_base_offset;
388    block.page_table_offset = root_offset;
389    block.page_count = root_len;
390    block.layout_version = PAGE_MAP_LAYOUT_VERSION;
391    block.flags &= !FLAG_IMPORTING;
392    block.flags &= !FLAG_CHECKSUM_STALE;
393    block.clear_checksum_refresh();
394    block.checksum = checksum;
395    block.import_expected_checksum = 0;
396    block.import_written_until = 0;
397    block.import_total_size = 0;
398    block.import_base_offset = 0;
399    block.store()?;
400    invalidate_read_cache();
401    Ok(())
402}
403
404pub fn cancel_import() -> Result<(), StableMemoryError> {
405    reject_during_update()?;
406    let mut block = Superblock::load()?;
407    if !block.is_importing() {
408        return Err(StableMemoryError::ImportNotStarted);
409    }
410    clear_import(&mut block)
411}
412
413pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
414    reject_during_update()?;
415    let checksum = checksum()?;
416    let mut block = Superblock::load()?;
417    block.checksum = checksum;
418    block.flags &= !FLAG_CHECKSUM_STALE;
419    block.clear_checksum_refresh();
420    block.store()?;
421    invalidate_read_cache();
422    Ok(checksum)
423}
424
425pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
426    reject_during_update()?;
427    if max_bytes == 0 {
428        return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
429    }
430
431    let mut block = Superblock::load()?;
432    if block.is_importing() {
433        return Err(StableMemoryError::ImportAlreadyStarted);
434    }
435    if !block.is_checksum_refreshing() {
436        block.flags |= FLAG_CHECKSUM_REFRESHING;
437        block.checksum_refresh_offset = 0;
438        block.checksum_refresh_hash = fnv1a64(&[]);
439        block.checksum_refresh_tx_id = block.last_tx_id;
440    }
441    if block.checksum_refresh_tx_id != block.last_tx_id {
442        block.clear_checksum_refresh();
443        block.store()?;
444        invalidate_read_cache();
445        return refresh_checksum_chunk(max_bytes);
446    }
447
448    let start = block.checksum_refresh_offset;
449    let end = block.db_size.min(start.saturating_add(max_bytes));
450    let mut offset = start;
451    let mut hash = block.checksum_refresh_hash;
452    while offset < end {
453        let len = (end - offset).min(CHECKSUM_CHUNK_LEN);
454        let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
455        let mut bytes = vec![0_u8; copied_len];
456        read_logical_range(&block, offset, &mut bytes)?;
457        hash = fold_fnv1a64(hash, &bytes);
458        offset += len;
459    }
460
461    block.checksum_refresh_offset = offset;
462    block.checksum_refresh_hash = hash;
463    if offset == block.db_size {
464        block.checksum = hash;
465        block.flags &= !FLAG_CHECKSUM_STALE;
466        block.clear_checksum_refresh();
467    }
468    let out = ChecksumRefresh {
469        complete: offset == block.db_size,
470        checksum: hash,
471        scanned_bytes: offset,
472        db_size: block.db_size,
473    };
474    block.store()?;
475    invalidate_read_cache();
476    Ok(out)
477}
478
479pub fn checksum() -> Result<u64, StableMemoryError> {
480    reject_during_update()?;
481    let block = Superblock::load()?;
482    checksum_logical_range(&block, block.db_size)
483}
484
485pub fn compact() -> Result<(), StableMemoryError> {
486    reject_during_update()?;
487    ensure_page_map_layout()?;
488    let block = Superblock::load()?;
489    let table = read_page_table(&block)?;
490    let mut compacted = Vec::with_capacity(table.len());
491    let mut cursor = append_base()?;
492
493    for offset in table {
494        if offset == 0 {
495            compacted.push(0);
496            continue;
497        }
498        let mut page = zero_page();
499        memory::read(offset, &mut page)?;
500        memory::write(cursor, &page)?;
501        compacted.push(cursor);
502        cursor = checked_add(cursor, page_size())?;
503    }
504
505    let (root_offset, root_len) = write_segmented_tables(&compacted)?;
506    Superblock::store_page_map_without_tx(root_offset, root_len, block.db_size)?;
507    invalidate_read_cache();
508    Ok(())
509}
510
511pub fn storage_stats() -> Result<StorageStats, StableMemoryError> {
512    let block = Superblock::load()?;
513    let table = read_page_table(&block)?;
514    let non_zero_pages = u64::try_from(table.iter().filter(|offset| **offset != 0).count())
515        .map_err(|_| StableMemoryError::OffsetOverflow)?;
516    let segment_count = active_segment_count(&block)?;
517    let root_bytes = root_table_bytes(segment_count)?;
518    let segment_bytes = segment_count
519        .checked_mul(segment_table_bytes()?)
520        .ok_or(StableMemoryError::OffsetOverflow)?;
521    let page_table_bytes = checked_add(root_bytes, segment_bytes)?;
522    let active_bytes = SUPERBLOCK_SIZE
523        .checked_add(non_zero_pages.saturating_mul(page_size()))
524        .and_then(|value| value.checked_add(page_table_bytes))
525        .ok_or(StableMemoryError::OffsetOverflow)?;
526    let allocated_bytes = memory::size_pages()
527        .checked_mul(STABLE_PAGE_SIZE)
528        .ok_or(StableMemoryError::OffsetOverflow)?;
529    let orphan_bytes_estimate = allocated_bytes.saturating_sub(active_bytes);
530    let orphan_ratio_basis_points = if active_bytes == 0 {
531        0
532    } else {
533        orphan_bytes_estimate.saturating_mul(10_000) / active_bytes
534    };
535    Ok(StorageStats {
536        layout_version: block.layout_version,
537        page_count: active_page_count(&block)?,
538        page_table_bytes,
539        active_bytes,
540        allocated_bytes,
541        orphan_bytes_estimate,
542        orphan_ratio_basis_points,
543        compact_recommended: orphan_bytes_estimate >= active_bytes
544            && orphan_bytes_estimate >= COMPACT_MIN_ORPHAN_BYTES,
545    })
546}
547
548pub(crate) fn page_count_for_size(size: u64) -> Result<u64, StableMemoryError> {
549    Ok(size.div_ceil(page_size()))
550}
551
552#[cfg(test)]
553pub(crate) fn debug_root_table_for_tests() -> Result<Vec<u64>, StableMemoryError> {
554    let block = Superblock::load()?;
555    read_root_table(&block)
556}
557
558fn commit_overlay(overlay: Overlay, advance_tx: bool) -> Result<(), StableMemoryError> {
559    hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
560    let block = Superblock::load()?;
561    let mut root = read_root_table(&block)?;
562    let final_page_count = page_count_for_size(overlay.size())?;
563    let final_segment_count = segment_count_for_pages(final_page_count)?;
564    let root_len =
565        usize::try_from(final_segment_count).map_err(|_| StableMemoryError::OffsetOverflow)?;
566    root.resize(root_len, 0);
567    root.truncate(root_len);
568
569    let mut segment_updates = BTreeMap::<u64, Vec<u64>>::new();
570    let mut cursor = append_base()?;
571    for (page_no, page) in overlay.dirty_pages() {
572        if *page_no >= final_page_count {
573            continue;
574        }
575        hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
576        memory::write(cursor, page)?;
577        let segment_no = segment_no(*page_no);
578        let index = segment_index(*page_no)?;
579        let table = load_segment_for_update(&block, &root, &mut segment_updates, segment_no)?;
580        table[index] = cursor;
581        cursor = checked_add(cursor, page_size())?;
582    }
583
584    clear_truncated_tail(&block, &root, &mut segment_updates, final_page_count)?;
585
586    hit_failpoint(StableBlobFailpoint::CommitPageTableWrite)?;
587    for (segment_no, table) in segment_updates {
588        if segment_no >= final_segment_count {
589            continue;
590        }
591        let offset = write_segment_table(&table)?;
592        let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
593        root[index] = offset;
594    }
595    let root_offset = write_root_table(&root)?;
596    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
597    let result = if advance_tx {
598        Superblock::commit_page_map(root_offset, entries_len_u64(&root)?, overlay.size())
599    } else {
600        Superblock::store_page_map_without_tx(root_offset, entries_len_u64(&root)?, overlay.size())
601    };
602    if result.is_ok() {
603        invalidate_read_cache();
604    }
605    result
606}
607
608fn load_segment_for_update<'a>(
609    block: &Superblock,
610    root: &[u64],
611    updates: &'a mut BTreeMap<u64, Vec<u64>>,
612    segment_no: u64,
613) -> Result<&'a mut Vec<u64>, StableMemoryError> {
614    if let std::collections::btree_map::Entry::Vacant(entry) = updates.entry(segment_no) {
615        let table = read_segment_table(block, root, segment_no)?;
616        entry.insert(table);
617    }
618    updates
619        .get_mut(&segment_no)
620        .ok_or(StableMemoryError::OffsetOverflow)
621}
622
623fn clear_truncated_tail(
624    block: &Superblock,
625    root: &[u64],
626    updates: &mut BTreeMap<u64, Vec<u64>>,
627    final_page_count: u64,
628) -> Result<(), StableMemoryError> {
629    let old_page_count = active_page_count(block)?;
630    if final_page_count >= old_page_count || final_page_count == 0 {
631        return Ok(());
632    }
633    let boundary_segment = segment_no(final_page_count);
634    if boundary_segment >= segment_count_for_pages(final_page_count)? {
635        return Ok(());
636    }
637    let start = segment_index(final_page_count)?;
638    if start == 0 {
639        return Ok(());
640    }
641    let table = load_segment_for_update(block, root, updates, boundary_segment)?;
642    table[start..].fill(0);
643    Ok(())
644}
645
646fn reject_during_update() -> Result<(), StableMemoryError> {
647    if overlay::is_active() {
648        Err(StableMemoryError::UpdateInProgress)
649    } else {
650        Ok(())
651    }
652}
653
654fn read_logical_range(
655    block: &Superblock,
656    offset: u64,
657    dst: &mut [u8],
658) -> Result<(), StableMemoryError> {
659    if dst.is_empty() {
660        return Ok(());
661    }
662    let in_page =
663        usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
664    if dst.len() <= page_len() - in_page {
665        return read_logical_page_slice(block, offset / page_size(), in_page, dst);
666    }
667
668    let mut copied_total = 0_usize;
669    while copied_total < dst.len() {
670        let absolute = checked_add(
671            offset,
672            u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
673        )?;
674        let page_no = absolute / page_size();
675        let in_page = usize::try_from(absolute % page_size())
676            .map_err(|_| StableMemoryError::OffsetOverflow)?;
677        let copied = (page_len() - in_page).min(dst.len() - copied_total);
678        read_logical_page_slice(
679            block,
680            page_no,
681            in_page,
682            &mut dst[copied_total..copied_total + copied],
683        )?;
684        copied_total += copied;
685    }
686    Ok(())
687}
688
689fn read_logical_range_with_page_cache(
690    block: &Superblock,
691    offset: u64,
692    dst: &mut [u8],
693    page_offsets: &mut PageOffsetCache,
694) -> Result<(), StableMemoryError> {
695    if dst.is_empty() {
696        return Ok(());
697    }
698    let in_page =
699        usize::try_from(offset % page_size()).map_err(|_| StableMemoryError::OffsetOverflow)?;
700    if dst.len() <= page_len() - in_page {
701        return read_logical_page_slice_with_page_cache(
702            block,
703            offset / page_size(),
704            in_page,
705            dst,
706            page_offsets,
707        );
708    }
709
710    let mut copied_total = 0_usize;
711    while copied_total < dst.len() {
712        let absolute = checked_add(
713            offset,
714            u64::try_from(copied_total).map_err(|_| StableMemoryError::OffsetOverflow)?,
715        )?;
716        let page_no = absolute / page_size();
717        let in_page = usize::try_from(absolute % page_size())
718            .map_err(|_| StableMemoryError::OffsetOverflow)?;
719        let copied = (page_len() - in_page).min(dst.len() - copied_total);
720        read_logical_page_slice_with_page_cache(
721            block,
722            page_no,
723            in_page,
724            &mut dst[copied_total..copied_total + copied],
725            page_offsets,
726        )?;
727        copied_total += copied;
728    }
729    Ok(())
730}
731
732fn read_logical_page_slice(
733    block: &Superblock,
734    page_no: u64,
735    in_page: usize,
736    dst: &mut [u8],
737) -> Result<(), StableMemoryError> {
738    let physical = page_offset_for(block, page_no)?;
739    if physical == 0 {
740        dst.fill(0);
741        return Ok(());
742    }
743    let stable_offset = checked_add(
744        physical,
745        u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
746    )?;
747    #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
748    crate::read_metrics::record_stable_data_read(dst.len());
749    memory::read(stable_offset, dst)
750}
751
752fn read_logical_page_slice_with_page_cache(
753    block: &Superblock,
754    page_no: u64,
755    in_page: usize,
756    dst: &mut [u8],
757    page_offsets: &mut PageOffsetCache,
758) -> Result<(), StableMemoryError> {
759    let physical = match page_offsets.get(page_no) {
760        Some(physical) => physical,
761        None => {
762            let physical = page_offset_for(block, page_no)?;
763            page_offsets.insert(page_no, physical);
764            physical
765        }
766    };
767    if physical == 0 {
768        dst.fill(0);
769        return Ok(());
770    }
771    let stable_offset = checked_add(
772        physical,
773        u64::try_from(in_page).map_err(|_| StableMemoryError::OffsetOverflow)?,
774    )?;
775    #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
776    crate::read_metrics::record_stable_data_read(dst.len());
777    memory::read(stable_offset, dst)
778}
779
780fn page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
781    if page_no >= active_page_count(block)? || block.page_table_offset == 0 {
782        return Ok(0);
783    }
784    cached_page_offset_for(block, page_no)
785}
786
787fn read_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
788    let root = read_root_table(block)?;
789    let count = active_page_count(block)?;
790    let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
791    let mut entries = Vec::with_capacity(capacity);
792    for segment_no in 0..segment_count_for_pages(count)? {
793        let table = read_segment_table(block, &root, segment_no)?;
794        for entry in table {
795            if entries.len() == capacity {
796                break;
797            }
798            entries.push(entry);
799        }
800    }
801    Ok(entries)
802}
803
804fn cached_page_offset_for(block: &Superblock, page_no: u64) -> Result<u64, StableMemoryError> {
805    let context = memory::active_context_id()?;
806    let key = read_cache_key(block);
807    let segment_no = segment_no(page_no);
808    let index = segment_index(page_no)?;
809    READ_TABLE_CACHE.with(|cache| {
810        let mut caches = cache.borrow_mut();
811        let cache = caches.entry(context).or_insert_with(ReadTableCache::new);
812        cache.ensure_key(key);
813        if cache.root.is_empty() {
814            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
815            crate::read_metrics::record_page_table_root_miss();
816            cache.root = read_root_table(block)?;
817        } else {
818            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
819            crate::read_metrics::record_page_table_root_hit();
820        }
821        let Some(segment_offset) = cache
822            .root
823            .get(usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?)
824            .copied()
825        else {
826            return Ok(0);
827        };
828        if segment_offset == 0 {
829            return Ok(0);
830        }
831        if cache.segments.contains_key(&segment_no) {
832            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
833            crate::read_metrics::record_page_table_segment_hit();
834            cache.touch_segment(segment_no);
835        } else {
836            #[cfg(any(test, debug_assertions, feature = "bench-profile"))]
837            crate::read_metrics::record_page_table_segment_miss();
838            let table = read_segment_table_at(segment_offset)?;
839            cache.insert_segment(segment_no, table);
840        }
841        Ok(cache
842            .segments
843            .get(&segment_no)
844            .and_then(|table| table.get(index))
845            .copied()
846            .unwrap_or(0))
847    })
848}
849
850fn read_root_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
851    if block.page_count == 0 {
852        return Ok(Vec::new());
853    }
854    let bytes_len = usize::try_from(root_table_bytes(block.page_count)?)
855        .map_err(|_| StableMemoryError::OffsetOverflow)?;
856    let mut bytes = vec![0_u8; bytes_len];
857    memory::read(block.page_table_offset, &mut bytes)?;
858    decode_u64_table(&bytes)
859}
860
861fn read_segment_table(
862    _block: &Superblock,
863    root: &[u64],
864    segment_no: u64,
865) -> Result<Vec<u64>, StableMemoryError> {
866    let table = vec![0_u64; segment_page_count_usize()];
867    let index = usize::try_from(segment_no).map_err(|_| StableMemoryError::OffsetOverflow)?;
868    let Some(offset) = root.get(index).copied() else {
869        return Ok(table);
870    };
871    if offset == 0 {
872        return Ok(table);
873    }
874    read_segment_table_at(offset)
875}
876
877fn read_segment_table_at(offset: u64) -> Result<Vec<u64>, StableMemoryError> {
878    let mut bytes = vec![0_u8; segment_table_len()];
879    memory::read(offset, &mut bytes)?;
880    let mut table = decode_u64_table(&bytes)?;
881    table.resize(segment_page_count_usize(), 0);
882    Ok(table)
883}
884
885fn write_segmented_tables(entries: &[u64]) -> Result<(u64, u64), StableMemoryError> {
886    if entries.is_empty() {
887        return Ok((0, 0));
888    }
889    let root_len = segment_count_for_pages(entries_len_u64(entries)?)?;
890    let mut root = Vec::with_capacity(
891        usize::try_from(root_len).map_err(|_| StableMemoryError::OffsetOverflow)?,
892    );
893    for segment_no in 0..root_len {
894        let start = usize::try_from(
895            segment_no
896                .checked_mul(SEGMENT_PAGE_COUNT)
897                .ok_or(StableMemoryError::OffsetOverflow)?,
898        )
899        .map_err(|_| StableMemoryError::OffsetOverflow)?;
900        let mut table = vec![0_u64; segment_page_count_usize()];
901        for (offset, entry) in entries[start..]
902            .iter()
903            .take(segment_page_count_usize())
904            .enumerate()
905        {
906            table[offset] = *entry;
907        }
908        root.push(write_segment_table(&table)?);
909    }
910    let root_offset = write_root_table(&root)?;
911    Ok((root_offset, entries_len_u64(&root)?))
912}
913
914fn write_segment_table(entries: &[u64]) -> Result<u64, StableMemoryError> {
915    let mut table = vec![0_u64; segment_page_count_usize()];
916    for (index, entry) in entries.iter().take(segment_page_count_usize()).enumerate() {
917        table[index] = *entry;
918    }
919    write_u64_table(&table)
920}
921
922fn write_root_table(entries: &[u64]) -> Result<u64, StableMemoryError> {
923    write_u64_table(entries)
924}
925
926fn write_u64_table(entries: &[u64]) -> Result<u64, StableMemoryError> {
927    if entries.is_empty() {
928        return Ok(0);
929    }
930    let offset = append_base()?;
931    let mut bytes = Vec::with_capacity(entries.len() * 8);
932    for entry in entries {
933        bytes.extend_from_slice(&entry.to_le_bytes());
934    }
935    memory::write(offset, &bytes)?;
936    Ok(offset)
937}
938
939fn decode_u64_table(bytes: &[u8]) -> Result<Vec<u64>, StableMemoryError> {
940    if !bytes.len().is_multiple_of(8) {
941        return Err(StableMemoryError::OffsetOverflow);
942    }
943    let mut entries = Vec::with_capacity(bytes.len() / 8);
944    for chunk in bytes.chunks_exact(8) {
945        let mut entry = [0_u8; 8];
946        entry.copy_from_slice(chunk);
947        entries.push(u64::from_le_bytes(entry));
948    }
949    Ok(entries)
950}
951
952fn imported_page_table(block: &Superblock) -> Result<Vec<u64>, StableMemoryError> {
953    let count = page_count_for_size(block.import_total_size)?;
954    let capacity = usize::try_from(count).map_err(|_| StableMemoryError::OffsetOverflow)?;
955    let mut entries = Vec::with_capacity(capacity);
956    for page_no in 0..count {
957        entries.push(checked_add(
958            block.import_base_offset,
959            page_no
960                .checked_mul(page_size())
961                .ok_or(StableMemoryError::OffsetOverflow)?,
962        )?);
963    }
964    Ok(entries)
965}
966
967fn checksum_logical_range(block: &Superblock, len: u64) -> Result<u64, StableMemoryError> {
968    let mut offset = 0_u64;
969    let mut hash = fnv1a64(&[]);
970    while offset < len {
971        let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
972        let copied_len =
973            usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
974        let mut bytes = vec![0_u8; copied_len];
975        read_logical_range(block, offset, &mut bytes)?;
976        hash = fold_fnv1a64(hash, &bytes);
977        offset += chunk_len;
978    }
979    Ok(hash)
980}
981
982fn checksum_physical_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
983    let mut offset = 0_u64;
984    let mut hash = fnv1a64(&[]);
985    while offset < len {
986        let chunk_len = (len - offset).min(CHECKSUM_CHUNK_LEN);
987        let copied_len =
988            usize::try_from(chunk_len).map_err(|_| StableMemoryError::OffsetOverflow)?;
989        let mut bytes = vec![0_u8; copied_len];
990        memory::read(checked_add(base_offset, offset)?, &mut bytes)?;
991        hash = fold_fnv1a64(hash, &bytes);
992        offset += chunk_len;
993    }
994    Ok(hash)
995}
996
997fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
998    block.flags &= !FLAG_IMPORTING;
999    block.import_expected_checksum = 0;
1000    block.import_written_until = 0;
1001    block.import_total_size = 0;
1002    block.import_base_offset = 0;
1003    block.store()?;
1004    invalidate_read_cache();
1005    Ok(())
1006}
1007
1008fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
1009    checked_add(block.import_base_offset, offset)
1010}
1011
1012fn active_page_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1013    page_count_for_size(block.db_size)
1014}
1015
1016fn active_segment_count(block: &Superblock) -> Result<u64, StableMemoryError> {
1017    Ok(block.page_count)
1018}
1019
1020fn read_cache_key(block: &Superblock) -> ReadCacheKey {
1021    ReadCacheKey {
1022        page_table_offset: block.page_table_offset,
1023        page_count: block.page_count,
1024        db_size: block.db_size,
1025        last_tx_id: block.last_tx_id,
1026    }
1027}
1028
1029fn segment_count_for_pages(page_count: u64) -> Result<u64, StableMemoryError> {
1030    Ok(page_count.div_ceil(SEGMENT_PAGE_COUNT))
1031}
1032
1033fn segment_no(page_no: u64) -> u64 {
1034    page_no / SEGMENT_PAGE_COUNT
1035}
1036
1037fn segment_index(page_no: u64) -> Result<usize, StableMemoryError> {
1038    usize::try_from(page_no % SEGMENT_PAGE_COUNT).map_err(|_| StableMemoryError::OffsetOverflow)
1039}
1040
1041fn segment_page_count_usize() -> usize {
1042    usize::try_from(SEGMENT_PAGE_COUNT).expect("segment page count fits usize")
1043}
1044
1045fn segment_table_len() -> usize {
1046    segment_page_count_usize() * 8
1047}
1048
1049fn segment_table_bytes() -> Result<u64, StableMemoryError> {
1050    u64::try_from(segment_table_len()).map_err(|_| StableMemoryError::OffsetOverflow)
1051}
1052
1053fn root_table_bytes(entry_count: u64) -> Result<u64, StableMemoryError> {
1054    entry_count
1055        .checked_mul(PAGE_TABLE_ENTRY_LEN)
1056        .ok_or(StableMemoryError::OffsetOverflow)
1057}
1058
1059fn entries_len_u64<T>(entries: &[T]) -> Result<u64, StableMemoryError> {
1060    u64::try_from(entries.len()).map_err(|_| StableMemoryError::OffsetOverflow)
1061}
1062
1063fn append_base() -> Result<u64, StableMemoryError> {
1064    memory::size_pages()
1065        .checked_mul(STABLE_PAGE_SIZE)
1066        .ok_or(StableMemoryError::OffsetOverflow)
1067}
1068
1069fn page_size() -> u64 {
1070    u64::from(SQLITE_PAGE_SIZE)
1071}
1072
1073fn page_len() -> usize {
1074    usize::try_from(SQLITE_PAGE_SIZE).expect("SQLite page size fits usize")
1075}
1076
1077fn zero_page() -> Vec<u8> {
1078    vec![0_u8; page_len()]
1079}
1080
1081fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
1082    left.checked_add(right)
1083        .ok_or(StableMemoryError::OffsetOverflow)
1084}
1085
1086fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
1087    for byte in bytes {
1088        hash ^= u64::from(*byte);
1089        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
1090    }
1091    hash
1092}
1093
1094fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
1095    let Ok(context) = memory::active_context_id() else {
1096        return Ok(());
1097    };
1098    FAILPOINTS.with(|slot| {
1099        let mut slot = slot.borrow_mut();
1100        if slot.get(&context).copied() == Some(failpoint) {
1101            slot.remove(&context);
1102            Err(StableMemoryError::Failpoint(failpoint.name()))
1103        } else {
1104            Ok(())
1105        }
1106    })
1107}
1108
1109impl StableBlobFailpoint {
1110    fn name(self) -> &'static str {
1111        match self {
1112            Self::OverlayWrite => "before overlay write",
1113            Self::OverlayTruncate => "before overlay truncate",
1114            Self::CommitCapacity => "before commit capacity",
1115            Self::CommitChunkWrite => "before commit page write",
1116            Self::CommitPageTableWrite => "before commit page table write",
1117            Self::CommitSuperblockStore => "before commit superblock store",
1118        }
1119    }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124    use super::*;
1125
1126    #[test]
1127    fn layout_math_matches_expected_boundaries() {
1128        assert_eq!(page_count_for_size(0).unwrap(), 0);
1129        assert_eq!(page_count_for_size(1).unwrap(), 1);
1130        assert_eq!(page_count_for_size(page_size()).unwrap(), 1);
1131        assert_eq!(page_count_for_size(page_size() + 1).unwrap(), 2);
1132
1133        assert_eq!(segment_count_for_pages(0).unwrap(), 0);
1134        assert_eq!(segment_count_for_pages(1).unwrap(), 1);
1135        assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT).unwrap(), 1);
1136        assert_eq!(segment_count_for_pages(SEGMENT_PAGE_COUNT + 1).unwrap(), 2);
1137
1138        assert_eq!(segment_no(SEGMENT_PAGE_COUNT), 1);
1139        assert_eq!(segment_index(SEGMENT_PAGE_COUNT - 1).unwrap(), 255);
1140        assert_eq!(segment_index(SEGMENT_PAGE_COUNT).unwrap(), 0);
1141        assert_eq!(root_table_bytes(2).unwrap(), 16);
1142    }
1143
1144    #[test]
1145    fn layout_math_rejects_u64_max_overflow_boundaries() {
1146        assert!(matches!(
1147            root_table_bytes(u64::MAX),
1148            Err(StableMemoryError::OffsetOverflow)
1149        ));
1150        assert!(matches!(
1151            checked_add(u64::MAX, 1),
1152            Err(StableMemoryError::OffsetOverflow)
1153        ));
1154
1155        let mut block = Superblock::fresh();
1156        block.import_base_offset = u64::MAX;
1157        assert!(matches!(
1158            import_offset(&block, 1),
1159            Err(StableMemoryError::OffsetOverflow)
1160        ));
1161
1162        block.import_base_offset = u64::MAX - page_size() + 1;
1163        block.import_total_size = page_size() + 1;
1164        assert!(matches!(
1165            imported_page_table(&block),
1166            Err(StableMemoryError::OffsetOverflow)
1167        ));
1168    }
1169
1170    #[test]
1171    #[serial_test::serial]
1172    fn read_metrics_separate_table_cache_from_data_reads() {
1173        crate::stable::memory::reset_for_tests();
1174        crate::stable::memory::init(crate::stable::memory::memory_for_tests()).unwrap();
1175        invalidate_read_cache();
1176
1177        let page = vec![7_u8; page_len()];
1178        write_at(0, &page).unwrap();
1179        invalidate_read_cache();
1180        crate::read_metrics::reset_read_metrics();
1181
1182        let first = read_base_page(0).unwrap();
1183        let second = read_base_page(0).unwrap();
1184        let metrics = crate::read_metrics::read_metrics_snapshot();
1185
1186        assert_eq!(first, page);
1187        assert_eq!(second, page);
1188        assert!(metrics.stable_data_read_calls >= 2);
1189        assert!(metrics.stable_data_read_bytes >= page_size() * 2);
1190        assert!(metrics.page_table_root_misses >= 1);
1191        assert!(metrics.page_table_root_hits >= 1);
1192        assert!(metrics.page_table_segment_misses >= 1);
1193        assert!(metrics.page_table_segment_hits >= 1);
1194        #[cfg(feature = "bench-profile")]
1195        assert!(metrics.superblock_loads <= 1);
1196        #[cfg(not(feature = "bench-profile"))]
1197        assert_eq!(metrics.superblock_loads, 0);
1198    }
1199}