graft_sqlite/file/
vol_file.rs

1use std::{
2    borrow::Cow,
3    fmt::Debug,
4    hash::{DefaultHasher, Hash, Hasher},
5    mem,
6    sync::Arc,
7};
8
9use bytes::BytesMut;
10use culprit::{Culprit, Result, ResultExt};
11use graft::core::{
12    PageIdx, VolumeId,
13    page::{PAGESIZE, Page},
14    page_count::PageCount,
15};
16use graft::{
17    rt::runtime::Runtime,
18    snapshot::Snapshot,
19    volume_reader::{VolumeRead, VolumeReadRef, VolumeReader},
20    volume_writer::{VolumeWrite, VolumeWriter},
21};
22use parking_lot::{Mutex, MutexGuard};
23use sqlite_plugin::flags::{LockLevel, OpenOpts};
24
25use crate::vfs::ErrCtx;
26
27use super::VfsFile;
28
29// The byte offset of the SQLite file change counter in the database file
30const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
31const VERSION_VALID_FOR_NUMBER_OFFSET: usize = 92;
32
33enum VolFileState {
34    Idle,
35    Shared { reader: VolumeReader },
36    Reserved { writer: VolumeWriter },
37    Committing,
38}
39
40impl VolFileState {
41    fn name(&self) -> &'static str {
42        match self {
43            VolFileState::Idle => "Idle",
44            VolFileState::Shared { .. } => "Shared",
45            VolFileState::Reserved { .. } => "Reserved",
46            VolFileState::Committing => "Committing",
47        }
48    }
49}
50
51impl Debug for VolFileState {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        match self {
54            VolFileState::Idle => f.write_str("Idle"),
55            VolFileState::Shared { reader } => {
56                f.debug_tuple("Shared").field(reader.snapshot()).finish()
57            }
58            VolFileState::Reserved { writer } => {
59                f.debug_tuple("Reserved").field(writer.snapshot()).finish()
60            }
61            VolFileState::Committing => f.write_str("Committing"),
62        }
63    }
64}
65
66pub struct VolFile {
67    runtime: Runtime,
68    pub tag: String,
69    pub vid: VolumeId,
70    opts: OpenOpts,
71
72    reserved: Arc<Mutex<()>>,
73    state: VolFileState,
74}
75
76impl Debug for VolFile {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("VolFile")
79            .field("tag", &self.tag)
80            .field("vid", &self.vid)
81            .field("state", &self.state)
82            .finish()
83    }
84}
85
86impl VolFile {
87    pub fn new(
88        runtime: Runtime,
89        tag: String,
90        vid: VolumeId,
91        opts: OpenOpts,
92        reserved: Arc<Mutex<()>>,
93    ) -> Self {
94        Self {
95            runtime,
96            tag,
97            vid,
98            opts,
99            reserved,
100            state: VolFileState::Idle,
101        }
102    }
103
104    pub fn snapshot_or_latest(&self) -> Result<Snapshot, ErrCtx> {
105        match &self.state {
106            VolFileState::Idle => self.runtime.volume_snapshot(&self.vid).or_into_ctx(),
107            VolFileState::Shared { reader } => Ok(reader.snapshot().clone()),
108            VolFileState::Reserved { writer } => Ok(writer.snapshot().clone()),
109            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
110        }
111    }
112
113    pub fn page_count(&self) -> Result<PageCount, ErrCtx> {
114        match &self.state {
115            VolFileState::Idle => {
116                let snapshot = self.runtime.volume_snapshot(&self.vid).or_into_ctx()?;
117                self.runtime.snapshot_pages(&snapshot).or_into_ctx()
118            }
119            VolFileState::Shared { reader } => reader.page_count().or_into_ctx(),
120            VolFileState::Reserved { writer } => writer.page_count().or_into_ctx(),
121            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
122        }
123    }
124
125    pub fn is_idle(&self) -> bool {
126        matches!(self.state, VolFileState::Idle)
127    }
128
129    pub fn opts(&self) -> OpenOpts {
130        self.opts
131    }
132
133    pub fn switch_volume(&mut self, vid: &VolumeId) -> Result<(), ErrCtx> {
134        self.runtime
135            .tag_replace(&self.tag, vid.clone())
136            .or_into_ctx()?;
137        self.vid = vid.clone();
138        Ok(())
139    }
140
141    pub fn reader(&self) -> Result<VolumeReadRef<'_>, ErrCtx> {
142        match &self.state {
143            VolFileState::Idle => Ok(VolumeReadRef::Reader(Cow::Owned(
144                self.runtime.volume_reader(self.vid.clone()).or_into_ctx()?,
145            ))),
146            VolFileState::Shared { reader, .. } => Ok(VolumeReadRef::Reader(Cow::Borrowed(reader))),
147            VolFileState::Reserved { writer, .. } => Ok(VolumeReadRef::Writer(writer)),
148            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
149        }
150    }
151}
152
153impl VfsFile for VolFile {
154    fn readonly(&self) -> bool {
155        false
156    }
157
158    fn in_memory(&self) -> bool {
159        false
160    }
161
162    fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
163        match level {
164            LockLevel::Unlocked => {
165                // SQLite should never request an Unlocked lock
166                unreachable!("bug: invalid request lock(Unlocked)");
167            }
168            LockLevel::Shared => {
169                if let VolFileState::Idle = self.state {
170                    // Transition Idle -> Shared
171                    let reader = self.runtime.volume_reader(self.vid.clone()).or_into_ctx()?;
172                    self.state = VolFileState::Shared { reader };
173                } else {
174                    return Err(Culprit::new_with_note(
175                        ErrCtx::InvalidLockTransition,
176                        format!("invalid lock request Shared in state {}", self.state.name()),
177                    ));
178                }
179            }
180            LockLevel::Reserved => {
181                if let VolFileState::Shared { ref reader } = self.state {
182                    // Transition Shared -> Reserved
183
184                    // Ensure that this VolFile is not readonly
185                    if self.opts.mode().is_readonly() {
186                        return Err(Culprit::new_with_note(
187                            ErrCtx::InvalidLockTransition,
188                            "invalid lock request: Shared -> Reserved: file is read-only",
189                        ));
190                    }
191
192                    // try to acquire the reserved lock or fail if another thread has it
193                    let Some(reserved) = self.reserved.try_lock() else {
194                        return Err(Culprit::new(ErrCtx::Busy));
195                    };
196
197                    // check to see if the snapshot is latest. if it
198                    // has changed we can immediately reject the lock upgrade
199                    if !self
200                        .runtime
201                        .snapshot_is_latest(&self.vid, reader.snapshot())
202                        .or_into_ctx()?
203                    {
204                        return Err(Culprit::new_with_note(
205                            ErrCtx::BusySnapshot,
206                            "unable to lock: Shared -> Reserved: snapshot changed",
207                        ));
208                    }
209
210                    // convert the reader into a writer
211                    self.state = VolFileState::Reserved {
212                        writer: VolumeWriter::try_from(reader.clone()).or_into_ctx()?,
213                    };
214
215                    // Explicitly leak the reserved lock
216                    // SAFETY: we depend on SQLite to release the lock when it's done
217                    MutexGuard::leak(reserved);
218                } else {
219                    return Err(Culprit::new_with_note(
220                        ErrCtx::InvalidLockTransition,
221                        format!(
222                            "invalid lock request Reserved in state {}",
223                            self.state.name()
224                        ),
225                    ));
226                }
227            }
228            LockLevel::Pending | LockLevel::Exclusive => {
229                // SQLite should only request these transitions while we are in the Reserved state
230                assert!(
231                    matches!(self.state, VolFileState::Reserved { .. }),
232                    "bug: invalid lock request {:?} in state {}",
233                    level,
234                    self.state.name()
235                );
236            }
237        }
238
239        Ok(())
240    }
241
242    fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
243        match level {
244            LockLevel::Unlocked => match self.state {
245                VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
246                    self.state = VolFileState::Idle;
247                }
248                VolFileState::Reserved { .. } => {
249                    return Err(Culprit::new_with_note(
250                        ErrCtx::InvalidLockTransition,
251                        "invalid unlock request Unlocked in state Reserved",
252                    ));
253                }
254            },
255            LockLevel::Shared => {
256                if let VolFileState::Reserved { writer } =
257                    mem::replace(&mut self.state, VolFileState::Committing)
258                {
259                    // Transition Reserved -> Shared through the Committing state
260                    // If we fail the commit, SQLite will subsequently issue an
261                    // Unlocked request after handling the error
262
263                    // Commit the writer, downgrading to a reader
264                    let reader = writer.commit().or_into_ctx()?;
265                    self.state = VolFileState::Shared { reader };
266
267                    // release the reserved lock
268                    // between threads while holding the lock
269                    // TODO: find a way to assert that this thread actually owns the lock
270                    assert!(self.reserved.is_locked(), "reserved lock must be locked");
271                    // SAFETY: we are in the Reserved state, thus we are holding the lock
272                    // SAFETY: we depend on the connection not being passed
273                    unsafe { self.reserved.force_unlock() };
274                } else {
275                    return Err(Culprit::new_with_note(
276                        ErrCtx::InvalidLockTransition,
277                        format!(
278                            "invalid unlock request Shared in state {}",
279                            self.state.name()
280                        ),
281                    ));
282                }
283            }
284            LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
285                // SQLite should only request these transitions using the lock method
286                unreachable!(
287                    "bug: invalid unlock request {:?} in state {}",
288                    level,
289                    self.state.name()
290                );
291            }
292        }
293
294        Ok(())
295    }
296
297    fn file_size(&mut self) -> Result<usize, ErrCtx> {
298        Ok(PAGESIZE.as_usize() * self.page_count()?.to_usize())
299    }
300
301    fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
302        // locate the page offset of the requested page
303        let pageidx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
304            .try_into()
305            .expect("offset out of volume range");
306        // local_offset is the offset *within* the requested page
307        let local_offset = offset % PAGESIZE;
308
309        assert!(
310            local_offset + data.len() <= PAGESIZE,
311            "read must not cross page boundary"
312        );
313
314        // load the page
315        let page = match &mut self.state {
316            VolFileState::Idle => {
317                // sqlite sometimes reads the database header without holding a
318                // lock, in this case we are expected to read from the latest
319                // snapshot
320                let reader = self.runtime.volume_reader(self.vid.clone()).or_into_ctx()?;
321                reader.read_page(pageidx).or_into_ctx()?
322            }
323            VolFileState::Shared { reader } => reader.read_page(pageidx).or_into_ctx()?,
324            VolFileState::Reserved { writer } => writer.read_page(pageidx).or_into_ctx()?,
325            VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
326        };
327
328        let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
329        data.copy_from_slice(&page[range]);
330
331        // check to see if SQLite is reading the file change counter, and if so,
332        // overwrite it with a counter derived from the current snapshot
333        if pageidx == PageIdx::FIRST
334            && local_offset <= FILE_CHANGE_COUNTER_OFFSET
335            && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
336        {
337            // find the location of the file change counter within the out buffer
338            let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
339
340            // compute the file change counter by hashing the snapshot.
341            // IMPORTANT: we use DefaultHasher which has a fixed seed/secret of
342            // 0 to ensure that the same snapshot gives the same result
343            let snapshot = self.snapshot_or_latest()?;
344            let mut hasher = DefaultHasher::new();
345            snapshot.hash(&mut hasher);
346            let hash = hasher.finish();
347            let change_counter = &hash.to_be_bytes()[..4];
348
349            // write the latest change counter to the buffer
350            data[fcc_offset..fcc_offset + 4].copy_from_slice(change_counter);
351        }
352
353        Ok(data.len())
354    }
355
356    fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
357        let VolFileState::Reserved { writer, .. } = &mut self.state else {
358            return Err(Culprit::new_with_note(
359                ErrCtx::InvalidVolumeState,
360                "must hold reserved lock to truncate",
361            ));
362        };
363
364        assert_eq!(
365            size % PAGESIZE.as_usize(),
366            0,
367            "size must be an even multiple of {PAGESIZE}"
368        );
369
370        let pages: PageCount = (size / PAGESIZE.as_usize())
371            .try_into()
372            .expect("size too large");
373
374        writer.truncate(pages).or_into_ctx()?;
375        Ok(())
376    }
377
378    fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
379        let VolFileState::Reserved { writer, .. } = &mut self.state else {
380            return Err(Culprit::new_with_note(
381                ErrCtx::InvalidVolumeState,
382                "must hold reserved lock to write",
383            ));
384        };
385
386        // locate the requested page index
387        let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
388            .try_into()
389            .expect("offset out of volume range");
390        // local_offset is the offset *within* the requested page
391        let local_offset = offset % PAGESIZE;
392
393        assert!(
394            local_offset + data.len() <= PAGESIZE,
395            "write must not cross page boundary"
396        );
397
398        // if this is a write to the first page, and the write only changes the
399        // file change counter and the version valid for number, we can ignore this write
400        if page_idx == PageIdx::FIRST && data.len() == PAGESIZE && local_offset == 0 {
401            let existing: Page = writer.read_page(page_idx).or_into_ctx()?;
402
403            debug_assert_eq!(data.len(), existing.len(), "page size mismatch");
404
405            let fcc = FILE_CHANGE_COUNTER_OFFSET..FILE_CHANGE_COUNTER_OFFSET + 4;
406            let vvf = VERSION_VALID_FOR_NUMBER_OFFSET..VERSION_VALID_FOR_NUMBER_OFFSET + 4;
407
408            // check the header page is unchanged while ignoring the file change
409            // counter and version valid for number
410            let unchanged =
411                // prefix [0,24)
412                data[..fcc.start]           == existing[..fcc.start] &&
413                // middle (28,92)
414                data[fcc.end..vvf.start]    == existing[fcc.end..vvf.start] &&
415                // suffix (96, end]
416                data[vvf.end..]             == existing[vvf.end..];
417
418            if unchanged {
419                tracing::trace!(
420                    "ignoring write to header page, as only file change counter and version valid for number changed"
421                );
422                return Ok(data.len());
423            }
424        }
425
426        let page = if data.len() == PAGESIZE {
427            // writing a full page
428            Page::try_from(data).expect("data is a full page")
429        } else {
430            // writing a partial page
431            // we need to read and then update the page
432            let mut page: BytesMut = writer.read_page(page_idx).or_into_ctx()?.into();
433            // SAFETY: we already verified that the write does not cross a page boundary
434            let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
435            page[range].copy_from_slice(data);
436            page.try_into().expect("we did not change the page size")
437        };
438
439        writer.write_page(page_idx, page).or_into_ctx()?;
440        Ok(data.len())
441    }
442}