graft_sqlite/file/
vol_file.rs

1use std::{
2    borrow::Cow,
3    fmt::Debug,
4    hash::{DefaultHasher, Hash, Hasher},
5    mem,
6    sync::Arc,
7};
8
9use bytes::BytesMut;
10use culprit::{Culprit, Result, ResultExt};
11use graft_client::{
12    oracle::LeapOracle,
13    runtime::{
14        storage::snapshot::Snapshot,
15        volume_handle::VolumeHandle,
16        volume_reader::{VolumeRead, VolumeReadRef, VolumeReader},
17        volume_writer::{VolumeWrite, VolumeWriter},
18    },
19};
20use graft_core::{
21    PageIdx, VolumeId,
22    page::{PAGESIZE, Page},
23    page_count::PageCount,
24};
25use parking_lot::{Mutex, MutexGuard};
26use sqlite_plugin::flags::{LockLevel, OpenOpts};
27
28use crate::vfs::ErrCtx;
29
30use super::VfsFile;
31
32// The byte offset of the SQLite file change counter in the database file
33const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
34const VERSION_VALID_FOR_NUMBER_OFFSET: usize = 92;
35
36#[derive(Debug)]
37enum VolFileState {
38    Idle,
39    Shared { reader: VolumeReader },
40    Reserved { writer: VolumeWriter },
41    Committing,
42}
43
44impl VolFileState {
45    fn name(&self) -> &'static str {
46        match self {
47            VolFileState::Idle => "Idle",
48            VolFileState::Shared { .. } => "Shared",
49            VolFileState::Reserved { .. } => "Reserved",
50            VolFileState::Committing => "Committing",
51        }
52    }
53}
54
55pub struct VolFile {
56    handle: VolumeHandle,
57    opts: OpenOpts,
58
59    reserved: Arc<Mutex<()>>,
60    state: VolFileState,
61    oracle: Box<LeapOracle>,
62}
63
64impl Debug for VolFile {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.write_str(&self.handle.vid().pretty())
67    }
68}
69
70impl VolFile {
71    pub fn new(handle: VolumeHandle, opts: OpenOpts, reserved: Arc<Mutex<()>>) -> Self {
72        Self {
73            handle,
74            opts,
75            reserved,
76            state: VolFileState::Idle,
77            oracle: Default::default(),
78        }
79    }
80
81    pub fn snapshot_or_latest(&self) -> Result<Option<Snapshot>, ErrCtx> {
82        match &self.state {
83            VolFileState::Idle => self.handle.snapshot().or_into_ctx(),
84            VolFileState::Shared { reader, .. } => Ok(reader.snapshot().cloned()),
85            VolFileState::Reserved { writer, .. } => Ok(writer.snapshot().cloned()),
86            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
87        }
88    }
89
90    pub fn reader(&self) -> Result<VolumeReadRef<'_>, ErrCtx> {
91        match &self.state {
92            VolFileState::Idle => Ok(VolumeReadRef::Reader(Cow::Owned(
93                self.handle.reader().or_into_ctx()?,
94            ))),
95            VolFileState::Shared { reader, .. } => Ok(VolumeReadRef::Reader(Cow::Borrowed(reader))),
96            VolFileState::Reserved { writer, .. } => Ok(VolumeReadRef::Writer(writer)),
97            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
98        }
99    }
100
101    /// Pull all of the pages accessible in the current snapshot or latest
102    pub fn pull(&mut self) -> Result<(), ErrCtx> {
103        let mut oracle = self.oracle.as_ref().clone();
104        let reader = self.reader()?;
105        let pages = reader.snapshot().map(|s| s.pages()).unwrap_or_default();
106        for pageidx in pages.iter() {
107            reader.read(&mut oracle, pageidx).or_into_ctx()?;
108        }
109        Ok(())
110    }
111
112    pub fn vid(&self) -> &VolumeId {
113        self.handle.vid()
114    }
115
116    pub fn handle(&self) -> &VolumeHandle {
117        &self.handle
118    }
119
120    pub fn opts(&self) -> OpenOpts {
121        self.opts
122    }
123
124    pub fn close(self) -> VolumeHandle {
125        self.handle
126    }
127}
128
129impl VfsFile for VolFile {
130    fn readonly(&self) -> bool {
131        false
132    }
133
134    fn in_memory(&self) -> bool {
135        false
136    }
137
138    fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
139        match level {
140            LockLevel::Unlocked => {
141                // SQLite should never request an Unlocked lock
142                unreachable!("bug: invalid request lock(Unlocked)");
143            }
144            LockLevel::Shared => {
145                if let VolFileState::Idle = self.state {
146                    // Transition Idle -> Shared
147                    let reader = self.handle.reader().or_into_ctx()?;
148                    self.state = VolFileState::Shared { reader };
149                } else {
150                    return Err(Culprit::new_with_note(
151                        ErrCtx::InvalidLockTransition,
152                        format!("invalid lock request Shared in state {}", self.state.name()),
153                    ));
154                }
155            }
156            LockLevel::Reserved => {
157                if let VolFileState::Shared { ref reader } = self.state {
158                    // Transition Shared -> Reserved
159
160                    // Ensure that this VolFile is not readonly
161                    if self.opts.mode().is_readonly() {
162                        return Err(Culprit::new_with_note(
163                            ErrCtx::InvalidLockTransition,
164                            "invalid lock request: Shared -> Reserved: file is read-only",
165                        ));
166                    }
167
168                    // try to acquire the reserved lock or fail if another thread has it
169                    let Some(reserved) = self.reserved.try_lock() else {
170                        return Err(Culprit::new(ErrCtx::Busy));
171                    };
172
173                    // upgrade the reader to a writer if possible
174                    let latest_snapshot = self.handle.snapshot().or_into_ctx()?;
175
176                    // check to see if the local LSN has changed since the transaction started.
177                    // We can ignore checking the remote lsn because:
178                    //  -> if the remote lsn changes due to a Pull, the local LSN will also change
179                    //  -> if the remote lsn changes due to a Push, the logical state will not change
180                    let writer = if reader.snapshot().map(|s| s.local())
181                        != latest_snapshot.as_ref().map(|s| s.local())
182                    {
183                        // if a read occurred in this transaction, we can't
184                        // upgrade to a reserved state
185                        return Err(Culprit::new_with_note(
186                            ErrCtx::BusySnapshot,
187                            "unable to lock: Shared -> Reserved: snapshot changed",
188                        ));
189                    } else {
190                        // The snapshot has not changed
191                        self.handle.writer_at(latest_snapshot)
192                    };
193
194                    self.state = VolFileState::Reserved { writer };
195
196                    // Explicitly leak the reserved lock
197                    // SAFETY: we depend on SQLite to release the lock when it's done
198                    MutexGuard::leak(reserved);
199                } else {
200                    return Err(Culprit::new_with_note(
201                        ErrCtx::InvalidLockTransition,
202                        format!(
203                            "invalid lock request Reserved in state {}",
204                            self.state.name()
205                        ),
206                    ));
207                }
208            }
209            LockLevel::Pending | LockLevel::Exclusive => {
210                // SQLite should only request these transitions while we are in the Reserved state
211                assert!(
212                    matches!(self.state, VolFileState::Reserved { .. }),
213                    "bug: invalid lock request {:?} in state {}",
214                    level,
215                    self.state.name()
216                );
217            }
218        }
219
220        Ok(())
221    }
222
223    fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
224        match level {
225            LockLevel::Unlocked => match self.state {
226                VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
227                    self.state = VolFileState::Idle;
228                }
229                VolFileState::Reserved { .. } => {
230                    return Err(Culprit::new_with_note(
231                        ErrCtx::InvalidLockTransition,
232                        "invalid unlock request Unlocked in state Reserved",
233                    ));
234                }
235            },
236            LockLevel::Shared => {
237                if let VolFileState::Reserved { writer } =
238                    mem::replace(&mut self.state, VolFileState::Committing)
239                {
240                    // Transition Reserved -> Shared through the Committing state
241                    // If we fail the commit, SQLite will subsequently issue an
242                    // Unlocked request after handling the error
243
244                    // Commit the writer, downgrading to a reader
245                    let reader = writer.commit().or_into_ctx()?;
246                    self.state = VolFileState::Shared { reader };
247
248                    // release the reserved lock
249                    // SAFETY: we are in the Reserved state, thus we are holding the lock
250                    // SAFETY: we depend on the connection not being passed
251                    // between threads while holding the lock
252                    // TODO: find a way to assert that this thread actually owns the lock
253                    assert!(self.reserved.is_locked(), "reserved lock must be locked");
254                    unsafe { self.reserved.force_unlock() };
255                } else {
256                    return Err(Culprit::new_with_note(
257                        ErrCtx::InvalidLockTransition,
258                        format!(
259                            "invalid unlock request Shared in state {}",
260                            self.state.name()
261                        ),
262                    ));
263                }
264            }
265            LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
266                // SQLite should only request these transitions using the lock method
267                unreachable!(
268                    "bug: invalid unlock request {:?} in state {}",
269                    level,
270                    self.state.name()
271                );
272            }
273        }
274
275        Ok(())
276    }
277
278    fn file_size(&mut self) -> Result<usize, ErrCtx> {
279        let pages = match &self.state {
280            VolFileState::Idle => self
281                .handle
282                .snapshot()
283                .or_into_ctx()?
284                .map_or(PageCount::ZERO, |snapshot| snapshot.pages()),
285            VolFileState::Shared { reader, .. } => {
286                reader.snapshot().map_or(PageCount::ZERO, |s| s.pages())
287            }
288            VolFileState::Reserved { writer, .. } => writer.pages(),
289            VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
290        };
291        Ok((PAGESIZE * pages.to_usize()).as_usize())
292    }
293
294    fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
295        // locate the page offset of the requested page
296        let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
297            .try_into()
298            .expect("offset out of volume range");
299        // local_offset is the offset *within* the requested page
300        let local_offset = offset % PAGESIZE;
301
302        assert!(
303            local_offset + data.len() <= PAGESIZE,
304            "read must not cross page boundary"
305        );
306
307        // load the page
308        let page = match &mut self.state {
309            VolFileState::Idle => {
310                // sqlite sometimes reads the database header without holding a
311                // lock, in this case we are expected to read from the latest
312                // snapshot
313                self.handle
314                    .reader()
315                    .or_into_ctx()?
316                    .read(self.oracle.as_mut(), page_idx)
317                    .or_into_ctx()?
318            }
319            VolFileState::Shared { reader } => {
320                reader.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
321            }
322            VolFileState::Reserved { writer } => {
323                writer.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
324            }
325            VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
326        };
327
328        let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
329        data.copy_from_slice(&page[range]);
330
331        // check to see if SQLite is reading the file change counter, and if so,
332        // overwrite it with a counter derived from the current snapshot
333        if page_idx == PageIdx::FIRST
334            && local_offset <= FILE_CHANGE_COUNTER_OFFSET
335            && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
336        {
337            // find the location of the file change counter within the out buffer
338            let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
339
340            // we derive the change counter from the snapshot via hashing
341            let snapshot = self.snapshot_or_latest()?;
342            let mut hasher = DefaultHasher::new();
343            snapshot.hash(&mut hasher);
344            let change_counter = hasher.finish() as u32;
345
346            // write the latest change counter to the buffer
347            data[fcc_offset..fcc_offset + 4].copy_from_slice(&change_counter.to_be_bytes());
348        }
349
350        Ok(data.len())
351    }
352
353    fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
354        let VolFileState::Reserved { writer, .. } = &mut self.state else {
355            return Err(Culprit::new_with_note(
356                ErrCtx::InvalidVolumeState,
357                "must hold reserved lock to truncate",
358            ));
359        };
360
361        assert_eq!(
362            size % PAGESIZE.as_usize(),
363            0,
364            "size must be an even multiple of {PAGESIZE}"
365        );
366
367        let pages: PageCount = (size / PAGESIZE.as_usize())
368            .try_into()
369            .expect("size too large");
370
371        writer.truncate(pages);
372        Ok(())
373    }
374
375    fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
376        let VolFileState::Reserved { writer, .. } = &mut self.state else {
377            return Err(Culprit::new_with_note(
378                ErrCtx::InvalidVolumeState,
379                "must hold reserved lock to write",
380            ));
381        };
382
383        // locate the requested page index
384        let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
385            .try_into()
386            .expect("offset out of volume range");
387        // local_offset is the offset *within* the requested page
388        let local_offset = offset % PAGESIZE;
389
390        assert!(
391            local_offset + data.len() <= PAGESIZE,
392            "write must not cross page boundary"
393        );
394
395        // if this is a write to the first page, and the write only changes the
396        // file change counter and the version valid for number, we can ignore this write
397        if page_idx == PageIdx::FIRST && data.len() == PAGESIZE && local_offset == 0 {
398            let existing: Page = writer.read(self.oracle.as_mut(), page_idx).or_into_ctx()?;
399
400            debug_assert_eq!(data.len(), existing.len(), "page size mismatch");
401
402            let fcc = FILE_CHANGE_COUNTER_OFFSET..FILE_CHANGE_COUNTER_OFFSET + 4;
403            let vvf = VERSION_VALID_FOR_NUMBER_OFFSET..VERSION_VALID_FOR_NUMBER_OFFSET + 4;
404
405            // check the header page is unchanged while ignoring the file change
406            // counter and version valid for number
407            let unchanged =
408                // prefix [0,24)
409                data[..fcc.start]           == existing[..fcc.start] &&
410                // middle (28,92)
411                data[fcc.end..vvf.start]    == existing[fcc.end..vvf.start] &&
412                // suffix (96, end]
413                data[vvf.end..]             == existing[vvf.end..];
414
415            if unchanged {
416                tracing::trace!(
417                    "ignoring write to header page, file change counter and version valid for number unchanged"
418                );
419                return Ok(data.len());
420            }
421        }
422
423        let page = if data.len() == PAGESIZE {
424            // writing a full page
425            Page::try_from(data).expect("data is a full page")
426        } else {
427            // writing a partial page
428            // we need to read and then update the page
429            let mut page: BytesMut = writer
430                .read(self.oracle.as_mut(), page_idx)
431                .or_into_ctx()?
432                .into();
433            // SAFETY: we already verified that the write does not cross a page boundary
434            let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
435            page[range].copy_from_slice(data);
436            page.try_into().expect("we did not change the page size")
437        };
438
439        writer.write(page_idx, page);
440        Ok(data.len())
441    }
442}