ic_sqlite_vfs/sqlite_vfs/
stable_blob.rs1use crate::config::STABLE_PAGE_SIZE;
6use crate::sqlite_vfs::overlay;
7use crate::stable::memory::{self, StableMemoryError};
8use crate::stable::meta::{
9 fnv1a64, Superblock, FLAG_CHECKSUM_REFRESHING, FLAG_CHECKSUM_STALE, FLAG_IMPORTING,
10};
11use std::cell::RefCell;
12
13const CHECKSUM_CHUNK_LEN: u64 = 16 * 1024;
14const ZERO_CHUNK_LEN: u64 = 16 * 1024;
15
16#[derive(Clone, Debug, Eq, PartialEq)]
17pub struct ChecksumRefresh {
18 pub complete: bool,
19 pub checksum: u64,
20 pub scanned_bytes: u64,
21 pub db_size: u64,
22}
23
24#[derive(Clone, Copy, Debug, Eq, PartialEq)]
25pub(crate) enum StableBlobFailpoint {
26 OverlayWrite,
27 OverlayTruncate,
28 CommitCapacity,
29 CommitChunkWrite,
30 CommitSuperblockStore,
31}
32
33thread_local! {
34 static FAILPOINT: RefCell<Option<StableBlobFailpoint>> = const { RefCell::new(None) };
35}
36
37#[cfg(test)]
38pub(crate) fn set_failpoint(failpoint: StableBlobFailpoint) {
39 FAILPOINT.with(|slot| *slot.borrow_mut() = Some(failpoint));
40}
41
42#[cfg(test)]
43pub(crate) fn clear_failpoint() {
44 FAILPOINT.with(|slot| *slot.borrow_mut() = None);
45}
46
47pub(crate) fn begin_update() -> Result<(), StableMemoryError> {
48 overlay::begin(Superblock::load()?.db_size)
49}
50
51pub(crate) fn rollback_update() {
52 overlay::rollback();
53}
54
55pub(crate) fn commit_update() -> Result<(), StableMemoryError> {
56 let Some(overlay) = overlay::take() else {
57 return Superblock::record_committed_tx();
58 };
59 if overlay.is_empty() {
60 return Superblock::record_committed_tx();
61 }
62
63 hit_failpoint(StableBlobFailpoint::CommitCapacity)?;
64 let final_size = overlay.size();
65 let shadow_base = append_base()?;
66 memory::ensure_capacity(checked_add(shadow_base, overlay.max_end()?)?)?;
67
68 let mut offset = 0_u64;
69 while offset < final_size {
70 let remaining = final_size - offset;
71 let len = remaining.min(CHECKSUM_CHUNK_LEN);
72 let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
73 let mut bytes = vec![0_u8; copied_len];
74 overlay.read_merged_chunk(offset, &mut bytes)?;
75 hit_failpoint(StableBlobFailpoint::CommitChunkWrite)?;
76 memory::write(checked_add(shadow_base, offset)?, &bytes)?;
77 offset += len;
78 }
79
80 hit_failpoint(StableBlobFailpoint::CommitSuperblockStore)?;
81 Superblock::commit_db_image(shadow_base, final_size)
82}
83
84pub(crate) fn read_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
85 if let Some(result) = overlay::read_at(offset, dst) {
86 return result;
87 }
88 read_base_at(offset, dst)
89}
90
91pub(crate) fn read_base_at(offset: u64, dst: &mut [u8]) -> Result<bool, StableMemoryError> {
92 dst.fill(0);
93 let block = Superblock::load()?;
94 if dst.is_empty() {
95 return Ok(true);
96 }
97 if offset >= block.db_size {
98 return Ok(false);
99 }
100 let available = block.db_size - offset;
101 let requested = u64::try_from(dst.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
102 let copied = requested.min(available);
103 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
104 memory::read(active_offset(&block, offset)?, &mut dst[..copied_len])?;
105 Ok(copied == requested)
106}
107
108pub(crate) fn write_at(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
109 if let Some(result) = overlay::write_at(offset, bytes) {
110 hit_failpoint(StableBlobFailpoint::OverlayWrite)?;
111 return result;
112 }
113 if bytes.is_empty() {
114 return Ok(());
115 }
116 let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
117 let end = offset
118 .checked_add(len)
119 .ok_or(StableMemoryError::OffsetOverflow)?;
120 let mut block = Superblock::load()?;
121 if offset > block.db_size {
122 zero_fill_range(block.db_size, offset)?;
123 }
124 memory::write(active_offset(&block, offset)?, bytes)?;
125 if end > block.db_size {
126 block.db_size = end;
127 block.store()?;
128 }
129 Ok(())
130}
131
132pub(crate) fn truncate(size: u64) -> Result<(), StableMemoryError> {
133 if let Some(result) = overlay::truncate(size) {
134 hit_failpoint(StableBlobFailpoint::OverlayTruncate)?;
135 return result;
136 }
137 let block = Superblock::load()?;
138 if size > block.db_size {
139 zero_fill_range(block.db_size, size)?;
140 }
141 Superblock::set_db_size(size)
142}
143
144pub(crate) fn file_size() -> Result<u64, StableMemoryError> {
145 if let Some(size) = overlay::file_size() {
146 return Ok(size);
147 }
148 Ok(Superblock::load()?.db_size)
149}
150
151pub fn export_chunk(offset: u64, len: u64) -> Result<Vec<u8>, StableMemoryError> {
152 let block = Superblock::load()?;
153 if offset >= block.db_size {
154 return Ok(Vec::new());
155 }
156 let available = block.db_size - offset;
157 let copied = len.min(available);
158 let copied_len = usize::try_from(copied).map_err(|_| StableMemoryError::OffsetOverflow)?;
159 let mut out = vec![0_u8; copied_len];
160 memory::read(active_offset(&block, offset)?, &mut out)?;
161 Ok(out)
162}
163
164pub fn import_chunk(offset: u64, bytes: &[u8]) -> Result<(), StableMemoryError> {
165 let mut block = Superblock::load()?;
166 if !block.is_importing() {
167 return Err(StableMemoryError::ImportNotStarted);
168 }
169 let len = u64::try_from(bytes.len()).map_err(|_| StableMemoryError::OffsetOverflow)?;
170 if offset != block.import_written_until {
171 return Err(StableMemoryError::ImportOutOfOrder {
172 offset,
173 expected: block.import_written_until,
174 });
175 }
176 let end = offset
177 .checked_add(len)
178 .ok_or(StableMemoryError::OffsetOverflow)?;
179 if end > block.import_total_size {
180 return Err(StableMemoryError::ImportOutOfBounds {
181 offset,
182 len,
183 db_size: block.import_total_size,
184 });
185 }
186 memory::write(import_offset(&block, offset)?, bytes)?;
187 block.import_written_until = end;
188 block.store()
189}
190
191pub fn begin_import(total_size: u64, expected_checksum: u64) -> Result<(), StableMemoryError> {
192 let mut block = Superblock::load()?;
193 if block.is_importing() {
194 return Err(StableMemoryError::ImportAlreadyStarted);
195 }
196 let import_base_offset = append_base()?;
197 checked_add(import_base_offset, total_size)?;
198 block.flags |= FLAG_IMPORTING;
199 block.clear_checksum_refresh();
200 block.import_expected_checksum = expected_checksum;
201 block.import_written_until = 0;
202 block.import_total_size = total_size;
203 block.import_base_offset = import_base_offset;
204 block.store()
205}
206
207pub fn finish_import() -> Result<(), StableMemoryError> {
208 let mut block = Superblock::load()?;
209 if !block.is_importing() {
210 return Err(StableMemoryError::ImportNotStarted);
211 }
212 if block.import_written_until != block.import_total_size {
213 return Err(StableMemoryError::ImportIncomplete {
214 written_until: block.import_written_until,
215 db_size: block.import_total_size,
216 });
217 }
218 let checksum = checksum_range(block.import_base_offset, block.import_total_size)?;
219 if checksum != block.import_expected_checksum {
220 let expected = block.import_expected_checksum;
221 clear_import(&mut block)?;
222 return Err(StableMemoryError::ChecksumMismatch {
223 expected,
224 actual: checksum,
225 });
226 }
227 block.db_size = block.import_total_size;
228 block.db_base_offset = block.import_base_offset;
229 block.flags &= !FLAG_IMPORTING;
230 block.flags &= !FLAG_CHECKSUM_STALE;
231 block.clear_checksum_refresh();
232 block.checksum = checksum;
233 block.import_expected_checksum = 0;
234 block.import_written_until = 0;
235 block.import_total_size = 0;
236 block.import_base_offset = 0;
237 block.store()
238}
239
240pub fn refresh_checksum() -> Result<u64, StableMemoryError> {
241 let checksum = checksum()?;
242 let mut block = Superblock::load()?;
243 block.checksum = checksum;
244 block.flags &= !FLAG_CHECKSUM_STALE;
245 block.clear_checksum_refresh();
246 block.store()?;
247 Ok(checksum)
248}
249
250pub fn refresh_checksum_chunk(max_bytes: u64) -> Result<ChecksumRefresh, StableMemoryError> {
251 if max_bytes == 0 {
252 return Err(StableMemoryError::ChecksumRefreshChunkEmpty);
253 }
254
255 let mut block = Superblock::load()?;
256 if block.is_importing() {
257 return Err(StableMemoryError::ImportAlreadyStarted);
258 }
259 if !block.is_checksum_refreshing() {
260 block.flags |= FLAG_CHECKSUM_REFRESHING;
261 block.checksum_refresh_offset = 0;
262 block.checksum_refresh_hash = fnv1a64(&[]);
263 block.checksum_refresh_tx_id = block.last_tx_id;
264 }
265
266 if block.checksum_refresh_tx_id != block.last_tx_id {
267 block.clear_checksum_refresh();
268 block.store()?;
269 return refresh_checksum_chunk(max_bytes);
270 }
271 let start = block.checksum_refresh_offset;
272 let mut offset = start;
273 let mut hash = block.checksum_refresh_hash;
274 let end = block.db_size.min(start.saturating_add(max_bytes));
275
276 while offset < end {
277 let remaining = end - offset;
278 let len = remaining.min(CHECKSUM_CHUNK_LEN);
279 let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
280 let mut bytes = vec![0_u8; copied_len];
281 memory::read(active_offset(&block, offset)?, &mut bytes)?;
282 hash = fold_fnv1a64(hash, &bytes);
283 offset += len;
284 }
285
286 block.checksum_refresh_offset = offset;
287 block.checksum_refresh_hash = hash;
288
289 if offset == block.db_size {
290 block.checksum = hash;
291 block.flags &= !FLAG_CHECKSUM_STALE;
292 block.clear_checksum_refresh();
293 }
294
295 let out = ChecksumRefresh {
296 complete: offset == block.db_size,
297 checksum: hash,
298 scanned_bytes: offset,
299 db_size: block.db_size,
300 };
301 block.store()?;
302 Ok(out)
303}
304
305pub fn checksum() -> Result<u64, StableMemoryError> {
306 let block = Superblock::load()?;
307 checksum_range(block.db_base_offset, block.db_size)
308}
309
310fn checksum_range(base_offset: u64, len: u64) -> Result<u64, StableMemoryError> {
311 let mut offset = 0_u64;
312 let mut hash = fnv1a64(&[]);
313 while offset < len {
314 let remaining = len - offset;
315 let len = remaining.min(CHECKSUM_CHUNK_LEN);
316 let copied_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
317 let mut bytes = vec![0_u8; copied_len];
318 memory::read(checked_add(base_offset, offset)?, &mut bytes)?;
319 hash = fold_fnv1a64(hash, &bytes);
320 offset += len;
321 }
322 Ok(hash)
323}
324
325fn clear_import(block: &mut Superblock) -> Result<(), StableMemoryError> {
326 block.flags &= !FLAG_IMPORTING;
327 block.import_expected_checksum = 0;
328 block.import_written_until = 0;
329 block.import_total_size = 0;
330 block.import_base_offset = 0;
331 block.store()
332}
333
334fn zero_fill_range(start: u64, end: u64) -> Result<(), StableMemoryError> {
335 let block = Superblock::load()?;
336 let mut offset = start;
337 while offset < end {
338 let remaining = end - offset;
339 let len = remaining.min(ZERO_CHUNK_LEN);
340 let zero_len = usize::try_from(len).map_err(|_| StableMemoryError::OffsetOverflow)?;
341 let zeros = vec![0_u8; zero_len];
342 memory::write(active_offset(&block, offset)?, &zeros)?;
343 offset += len;
344 }
345 Ok(())
346}
347
348fn import_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
349 checked_add(block.import_base_offset, offset)
350}
351
352fn active_offset(block: &Superblock, offset: u64) -> Result<u64, StableMemoryError> {
353 checked_add(block.db_base_offset, offset)
354}
355
356fn append_base() -> Result<u64, StableMemoryError> {
357 memory::size_pages()
358 .checked_mul(STABLE_PAGE_SIZE)
359 .ok_or(StableMemoryError::OffsetOverflow)
360}
361
362fn checked_add(left: u64, right: u64) -> Result<u64, StableMemoryError> {
363 left.checked_add(right)
364 .ok_or(StableMemoryError::OffsetOverflow)
365}
366
367fn fold_fnv1a64(mut hash: u64, bytes: &[u8]) -> u64 {
368 for byte in bytes {
369 hash ^= u64::from(*byte);
370 hash = hash.wrapping_mul(0x0000_0100_0000_01b3);
371 }
372 hash
373}
374
375fn hit_failpoint(failpoint: StableBlobFailpoint) -> Result<(), StableMemoryError> {
376 FAILPOINT.with(|slot| {
377 let mut slot = slot.borrow_mut();
378 if *slot == Some(failpoint) {
379 *slot = None;
380 Err(StableMemoryError::Failpoint(failpoint.name()))
381 } else {
382 Ok(())
383 }
384 })
385}
386
387impl StableBlobFailpoint {
388 fn name(self) -> &'static str {
389 match self {
390 Self::OverlayWrite => "before overlay write",
391 Self::OverlayTruncate => "before overlay truncate",
392 Self::CommitCapacity => "before commit capacity",
393 Self::CommitChunkWrite => "before commit chunk write",
394 Self::CommitSuperblockStore => "before commit superblock store",
395 }
396 }
397}