Skip to main content

ic_sqlite_vfs/sqlite_vfs/
stable_blob.rs

1//! Offset-based access to the `/main.db` image in stable memory.
2//! SQLite sees a single logical file. Physical stable memory grows by 64KiB
3//! pages and never shrinks; `Superblock::db_size` is the logical file length.
4
5use crate::config::STABLE_PAGE_SIZE;
6use crate::sqlite_vfs::overlay;
7use crate::stable::memory::{self, StableMemoryError};
8use crate::stable::meta::{
9    fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
10};
11use std::cell::RefCell;
12
13const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
14const ZERO_CHUNK_LEN: u64 = 16 * 1024;
15
16#[derive(Clone, Debug, Eq, PartialEq)]
17pub struct ChecksumRefresh {
18    pub complete: bool,
19    pub checksum: u64,
20    pub scanned_bytes: u64,
21    pub db_size: u64,
22}
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub(crate) enum StableBlobFailpoint {
26    OverlayWrite,
27    OverlayTruncate,
28    CommitCapacity,
29    CommitChunkWrite,
30    CommitSuperblockStore,
31}
32
33thread_local! {
34    static FAILPOINT: RefCell<Option<StableBlobFailpoint>> = const { RefCell::new(None) };
35}
36
37#[cfg(test)]
38pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
39    FAILPOINT.with(|slot| *slot.borrow_mut() = Some(failpoint));
40}
41
42#[cfg(test)]
43pub(crate) fn clear_failpoint() {
44    FAILPOINT.with(|slot| *slot.borrow_mut() = None);
45}
46
47pub(crate) fn begin_update() -> Result<(), StableMemoryError> {
48    overlay::begin(Superblock::load()?.db_size)
49}
50
51pub(crate) fn rollback_update() {
52    overlay::rollback();
53}
54
55pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
56    let Some(overlay) = overlay::take() else {
57        return Superblock::record_committed_tx();
58    };
59    if overlay.is_empty() {
60        return Superblock::record_committed_tx();
61    }
62
63    hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
64    let final_size = overlay.size();
65    let shadow_base = append_base()?;
66    memory::ensure_capacity(checked_add(shadow_base, overlay.max_end()?)?)?;
67
68    let mut offset = 0_u64;
69    while offset < final_size {
70        let remaining = final_size - offset;
71        let len = remaining.min(CHECKSUM_CHUNK_LEN);
72        let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
73        let mut bytes = vec![0_u8; copied_len];
74        overlay.read_merged_chunk(offset, &mut bytes)?;
75        hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
76        memory::write(checked_add(shadow_base, offset)?, &bytes)?;
77        offset += len;
78    }
79
80    hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
81    Superblock::commit_db_image(shadow_base, final_size)
82}
83
84pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
85    if let Some(result) = overlay::read_at(offset, dst) {
86        return result;
87    }
88    read_base_at(offset, dst)
89}
90
91pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
92    dst.fill(0);
93    let block = Superblock::load()?;
94    if dst.is_empty() {
95        return Ok(true);
96    }
97    if offset >= block.db_size {
98        return Ok(false);
99    }
100    let available = block.db_size - offset;
101    let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
102    let copied = requested.min(available);
103    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
104    memory::read(active_offset(&block, offset)?, &mut dst[..copied_len])?;
105    Ok(copied == requested)
106}
107
108pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
109    if let Some(result) = overlay::write_at(offset, bytes) {
110        hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
111        return result;
112    }
113    if bytes.is_empty() {
114        return Ok(());
115    }
116    let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
117    let end = offset
118        .checked_add(len)
119        .ok_or(StableMemoryError::OffsetOverflow)?;
120    let mut block = Superblock::load()?;
121    if offset > block.db_size {
122        zero_fill_range(block.db_size, offset)?;
123    }
124    memory::write(active_offset(&block, offset)?, bytes)?;
125    if end > block.db_size {
126        block.db_size = end;
127        block.store()?;
128    }
129    Ok(())
130}
131
132pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
133    if let Some(result) = overlay::truncate(size) {
134        hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
135        return result;
136    }
137    let block = Superblock::load()?;
138    if size > block.db_size {
139        zero_fill_range(block.db_size, size)?;
140    }
141    Superblock::set_db_size(size)
142}
143
144pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
145    if let Some(size) = overlay::file_size() {
146        return Ok(size);
147    }
148    Ok(Superblock::load()?.db_size)
149}
150
151pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
152    let block = Superblock::load()?;
153    if offset >= block.db_size {
154        return Ok(Vec::new());
155    }
156    let available = block.db_size - offset;
157    let copied = len.min(available);
158    let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
159    let mut out = vec![0_u8; copied_len];
160    memory::read(active_offset(&block, offset)?, &mut out)?;
161    Ok(out)
162}
163
164pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
165    let mut block = Superblock::load()?;
166    if !block.is_importing() {
167        return Err(StableMemoryError::ImportNotStarted);
168    }
169    let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
170    if offset != block.import_written_until {
171        return Err(StableMemoryError::ImportOutOfOrder {
172            offset,
173            expected: block.import_written_until,
174        });
175    }
176    let end = offset
177        .checked_add(len)
178        .ok_or(StableMemoryError::OffsetOverflow)?;
179    if end > block.import_total_size {
180        return Err(StableMemoryError::ImportOutOfBounds {
181            offset,
182            len,
183            db_size: block.import_total_size,
184        });
185    }
186    memory::write(import_offset(&block, offset)?, bytes)?;
187    block.import_written_until = end;
188    block.store()
189}
190
191pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
192    let mut block = Superblock::load()?;
193    if block.is_importing() {
194        return Err(StableMemoryError::ImportAlreadyStarted);
195    }
196    let import_base_offset = append_base()?;
197    checked_add(import_base_offset, total_size)?;
198    block.flags |= FLAG_IMPORTING;
199    block.clear_checksum_refresh();
200    block.import_expected_checksum = expected_checksum;
201    block.import_written_until = 0;
202    block.import_total_size = total_size;
203    block.import_base_offset = import_base_offset;
204    block.store()
205}
206
207pub fn finish_import() -> Result<(), StableMemoryError> {
208    let mut block = Superblock::load()?;
209    if !block.is_importing() {
210        return Err(StableMemoryError::ImportNotStarted);
211    }
212    if block.import_written_until != block.import_total_size {
213        return Err(StableMemoryError::ImportIncomplete {
214            written_until: block.import_written_until,
215            db_size: block.import_total_size,
216        });
217    }
218    let checksum = checksum_range(block.import_base_offset, block.import_total_size)?;
219    if checksum != block.import_expected_checksum {
220        let expected = block.import_expected_checksum;
221        clear_import(&mut block)?;
222        return Err(StableMemoryError::ChecksumMismatch {
223            expected,
224            actual: checksum,
225        });
226    }
227    block.db_size = block.import_total_size;
228    block.db_base_offset = block.import_base_offset;
229    block.flags &= !FLAG_IMPORTING;
230    block.flags &= !FLAG_CHECKSUM_STALE;
231    block.clear_checksum_refresh();
232    block.checksum = checksum;
233    block.import_expected_checksum = 0;
234    block.import_written_until = 0;
235    block.import_total_size = 0;
236    block.import_base_offset = 0;
237    block.store()
238}
239
240pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
241    let checksum = checksum()?;
242    let mut block = Superblock::load()?;
243    block.checksum = checksum;
244    block.flags &= !FLAG_CHECKSUM_STALE;
245    block.clear_checksum_refresh();
246    block.store()?;
247    Ok(checksum)
248}
249
250pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
251    if max_bytes == 0 {
252        return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
253    }
254
255    let mut block = Superblock::load()?;
256    if block.is_importing() {
257        return Err(StableMemoryError::ImportAlreadyStarted);
258    }
259    if !block.is_checksum_refreshing() {
260        block.flags |= FLAG_CHECKSUM_REFRESHING;
261        block.checksum_refresh_offset = 0;
262        block.checksum_refresh_hash = fnv1a64(&[]);
263        block.checksum_refresh_tx_id = block.last_tx_id;
264    }
265
266    if block.checksum_refresh_tx_id != block.last_tx_id {
267        block.clear_checksum_refresh();
268        block.store()?;
269        return refresh_checksum_chunk(max_bytes);
270    }
271    let start = block.checksum_refresh_offset;
272    let mut offset = start;
273    let mut hash = block.checksum_refresh_hash;
274    let end = block.db_size.min(start.saturating_add(max_bytes));
275
276    while offset < end {
277        let remaining = end - offset;
278        let len = remaining.min(CHECKSUM_CHUNK_LEN);
279        let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
280        let mut bytes = vec![0_u8; copied_len];
281        memory::read(active_offset(&block, offset)?, &mut bytes)?;
282        hash = fold_fnv1a64(hash, &bytes);
283        offset += len;
284    }
285
286    block.checksum_refresh_offset = offset;
287    block.checksum_refresh_hash = hash;
288
289    if offset == block.db_size {
290        block.checksum = hash;
291        block.flags &= !FLAG_CHECKSUM_STALE;
292        block.clear_checksum_refresh();
293    }
294
295    let out = ChecksumRefresh {
296        complete: offset == block.db_size,
297        checksum: hash,
298        scanned_bytes: offset,
299        db_size: block.db_size,
300    };
301    block.store()?;
302    Ok(out)
303}
304
305pub fn checksum() -> Result<u64, StableMemoryError> {
306    let block = Superblock::load()?;
307    checksum_range(block.db_base_offset, block.db_size)
308}
309
310fn checksum_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
311    let mut offset = 0_u64;
312    let mut hash = fnv1a64(&[]);
313    while offset < len {
314        let remaining = len - offset;
315        let len = remaining.min(CHECKSUM_CHUNK_LEN);
316        let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
317        let mut bytes = vec![0_u8; copied_len];
318        memory::read(checked_add(base_offset, offset)?, &mut bytes)?;
319        hash = fold_fnv1a64(hash, &bytes);
320        offset += len;
321    }
322    Ok(hash)
323}
324
325fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
326    block.flags &= !FLAG_IMPORTING;
327    block.import_expected_checksum = 0;
328    block.import_written_until = 0;
329    block.import_total_size = 0;
330    block.import_base_offset = 0;
331    block.store()
332}
333
334fn zero_fill_range(start: u64, end: u64) -> Result<(), StableMemoryError> {
335    let block = Superblock::load()?;
336    let mut offset = start;
337    while offset < end {
338        let remaining = end - offset;
339        let len = remaining.min(ZERO_CHUNK_LEN);
340        let zero_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
341        let zeros = vec![0_u8; zero_len];
342        memory::write(active_offset(&block, offset)?, &zeros)?;
343        offset += len;
344    }
345    Ok(())
346}
347
348fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
349    checked_add(block.import_base_offset, offset)
350}
351
352fn active_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
353    checked_add(block.db_base_offset, offset)
354}
355
356fn append_base() -> Result<u64, StableMemoryError> {
357    memory::size_pages()
358        .checked_mul(STABLE_PAGE_SIZE)
359        .ok_or(StableMemoryError::OffsetOverflow)
360}
361
362fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
363    left.checked_add(right)
364        .ok_or(StableMemoryError::OffsetOverflow)
365}
366
367fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
368    for byte in bytes {
369        hash ^= u64::from(*byte);
370        hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
371    }
372    hash
373}
374
375fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
376    FAILPOINT.with(|slot| {
377        let mut slot = slot.borrow_mut();
378        if *slot == Some(failpoint) {
379            *slot = None;
380            Err(StableMemoryError::Failpoint(failpoint.name()))
381        } else {
382            Ok(())
383        }
384    })
385}
386
387impl StableBlobFailpoint {
388    fn name(self) -> &'static str {
389        match self {
390            Self::OverlayWrite => "before overlay write",
391            Self::OverlayTruncate => "before overlay truncate",
392            Self::CommitCapacity => "before commit capacity",
393            Self::CommitChunkWrite => "before commit chunk write",
394            Self::CommitSuperblockStore => "before commit superblock store",
395        }
396    }
397}