graft_sqlite/file/
vol_file.rs

1use std::{
2    fmt::Debug,
3    hash::{DefaultHasher, Hash, Hasher},
4    mem,
5    sync::Arc,
6};
7
8use bytes::BytesMut;
9use culprit::{Culprit, Result, ResultExt};
10use graft_client::{
11    oracle::LeapOracle,
12    runtime::{
13        storage::snapshot::Snapshot,
14        volume_handle::VolumeHandle,
15        volume_reader::{VolumeRead, VolumeReader},
16        volume_writer::{VolumeWrite, VolumeWriter},
17    },
18};
19use graft_core::{
20    PageIdx,
21    page::{PAGESIZE, Page},
22    page_count::PageCount,
23};
24use parking_lot::{Mutex, MutexGuard};
25use sqlite_plugin::flags::{LockLevel, OpenOpts};
26
27use crate::vfs::ErrCtx;
28
29use super::VfsFile;
30
31// The byte offset of the SQLite file change counter in the database file
32const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
33
34#[derive(Debug)]
35enum VolFileState {
36    Idle,
37    Shared { reader: VolumeReader },
38    Reserved { writer: VolumeWriter },
39    Committing,
40}
41
42impl VolFileState {
43    fn name(&self) -> &'static str {
44        match self {
45            VolFileState::Idle => "Idle",
46            VolFileState::Shared { .. } => "Shared",
47            VolFileState::Reserved { .. } => "Reserved",
48            VolFileState::Committing => "Committing",
49        }
50    }
51}
52
53pub struct VolFile {
54    handle: VolumeHandle,
55    opts: OpenOpts,
56
57    reserved: Arc<Mutex<()>>,
58    state: VolFileState,
59    oracle: Box<LeapOracle>,
60}
61
62impl Debug for VolFile {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.write_str(&self.handle.vid().pretty())
65    }
66}
67
68impl VolFile {
69    pub fn new(handle: VolumeHandle, opts: OpenOpts, reserved: Arc<Mutex<()>>) -> Self {
70        Self {
71            handle,
72            opts,
73            reserved,
74            state: VolFileState::Idle,
75            oracle: Default::default(),
76        }
77    }
78
79    pub fn snapshot_or_latest(&self) -> Result<Option<Snapshot>, ErrCtx> {
80        match &self.state {
81            VolFileState::Idle => self.handle().snapshot().or_into_ctx(),
82            VolFileState::Shared { reader, .. } => Ok(reader.snapshot().cloned()),
83            VolFileState::Reserved { writer, .. } => Ok(writer.snapshot().cloned()),
84            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
85        }
86    }
87
88    pub fn handle(&self) -> &VolumeHandle {
89        &self.handle
90    }
91
92    pub fn opts(&self) -> OpenOpts {
93        self.opts
94    }
95
96    pub fn close(self) -> VolumeHandle {
97        self.handle
98    }
99}
100
101impl VfsFile for VolFile {
102    fn readonly(&self) -> bool {
103        false
104    }
105
106    fn in_memory(&self) -> bool {
107        false
108    }
109
110    fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
111        match level {
112            LockLevel::Unlocked => {
113                // SQLite should never request an Unlocked lock
114                unreachable!("bug: invalid request lock(Unlocked)");
115            }
116            LockLevel::Shared => {
117                if let VolFileState::Idle = self.state {
118                    // Transition Idle -> Shared
119                    let reader = self.handle.reader().or_into_ctx()?;
120                    self.state = VolFileState::Shared { reader };
121                } else {
122                    return Err(Culprit::new_with_note(
123                        ErrCtx::InvalidLockTransition,
124                        format!("invalid lock request Shared in state {}", self.state.name()),
125                    ));
126                }
127            }
128            LockLevel::Reserved => {
129                if let VolFileState::Shared { ref reader } = self.state {
130                    // Transition Shared -> Reserved
131
132                    // Ensure that this VolFile is not readonly
133                    if self.opts.mode().is_readonly() {
134                        return Err(Culprit::new_with_note(
135                            ErrCtx::InvalidLockTransition,
136                            "invalid lock request: Shared -> Reserved: file is read-only",
137                        ));
138                    }
139
140                    // try to acquire the reserved lock or fail if another thread has it
141                    let Some(reserved) = self.reserved.try_lock() else {
142                        return Err(Culprit::new(ErrCtx::Busy));
143                    };
144
145                    // upgrade the reader to a writer if possible
146                    let latest_snapshot = self.handle.snapshot().or_into_ctx()?;
147
148                    // check to see if the local LSN has changed since the transaction started.
149                    // We can ignore checking the remote lsn because:
150                    //  -> if the remote lsn changes due to a Pull, the local LSN will also change
151                    //  -> if the remote lsn changes due to a Push, the logical state will not change
152                    let writer = if reader.snapshot().map(|s| s.local())
153                        != latest_snapshot.as_ref().map(|s| s.local())
154                    {
155                        // if a read occurred in this transaction, we can't
156                        // upgrade to a reserved state
157                        return Err(Culprit::new_with_note(
158                            ErrCtx::BusySnapshot,
159                            "unable to lock: Shared -> Reserved: snapshot changed",
160                        ));
161                    } else {
162                        // The snapshot has not changed
163                        self.handle.writer_at(latest_snapshot)
164                    };
165
166                    self.state = VolFileState::Reserved { writer };
167
168                    // Explicitly leak the reserved lock
169                    // SAFETY: we depend on SQLite to release the lock when it's done
170                    MutexGuard::leak(reserved);
171                } else {
172                    return Err(Culprit::new_with_note(
173                        ErrCtx::InvalidLockTransition,
174                        format!(
175                            "invalid lock request Reserved in state {}",
176                            self.state.name()
177                        ),
178                    ));
179                }
180            }
181            LockLevel::Pending | LockLevel::Exclusive => {
182                // SQLite should only request these transitions while we are in the Reserved state
183                assert!(
184                    matches!(self.state, VolFileState::Reserved { .. }),
185                    "bug: invalid lock request {:?} in state {}",
186                    level,
187                    self.state.name()
188                );
189            }
190        }
191
192        Ok(())
193    }
194
195    fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
196        match level {
197            LockLevel::Unlocked => match self.state {
198                VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
199                    self.state = VolFileState::Idle;
200                }
201                VolFileState::Reserved { .. } => {
202                    return Err(Culprit::new_with_note(
203                        ErrCtx::InvalidLockTransition,
204                        "invalid unlock request Unlocked in state Reserved",
205                    ));
206                }
207            },
208            LockLevel::Shared => {
209                if let VolFileState::Reserved { writer } =
210                    mem::replace(&mut self.state, VolFileState::Committing)
211                {
212                    // Transition Reserved -> Shared through the Committing state
213                    // If we fail the commit, SQLite will subsequently issue an
214                    // Unlocked request after handling the error
215
216                    // Commit the writer, downgrading to a reader
217                    let reader = writer.commit().or_into_ctx()?;
218                    self.state = VolFileState::Shared { reader };
219
220                    // release the reserved lock
221                    // SAFETY: we are in the Reserved state, thus we are holding the lock
222                    // SAFETY: we depend on the connection not being passed
223                    // between threads while holding the lock
224                    // TODO: find a way to assert that this thread actually owns the lock
225                    assert!(self.reserved.is_locked(), "reserved lock must be locked");
226                    unsafe { self.reserved.force_unlock() };
227                } else {
228                    return Err(Culprit::new_with_note(
229                        ErrCtx::InvalidLockTransition,
230                        format!(
231                            "invalid unlock request Shared in state {}",
232                            self.state.name()
233                        ),
234                    ));
235                }
236            }
237            LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
238                // SQLite should only request these transitions using the lock method
239                unreachable!(
240                    "bug: invalid unlock request {:?} in state {}",
241                    level,
242                    self.state.name()
243                );
244            }
245        }
246
247        Ok(())
248    }
249
250    fn file_size(&mut self) -> Result<usize, ErrCtx> {
251        let pages = match &self.state {
252            VolFileState::Idle => self
253                .handle
254                .snapshot()
255                .or_into_ctx()?
256                .map_or(PageCount::ZERO, |snapshot| snapshot.pages()),
257            VolFileState::Shared { reader, .. } => {
258                reader.snapshot().map_or(PageCount::ZERO, |s| s.pages())
259            }
260            VolFileState::Reserved { writer, .. } => writer.pages(),
261            VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
262        };
263        Ok((PAGESIZE * pages.to_usize()).as_usize())
264    }
265
266    fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
267        // locate the page offset of the requested page
268        let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
269            .try_into()
270            .expect("offset out of volume range");
271        // local_offset is the offset *within* the requested page
272        let local_offset = offset % PAGESIZE;
273
274        assert!(
275            local_offset + data.len() <= PAGESIZE,
276            "read must not cross page boundary"
277        );
278
279        // load the page
280        let page = match &mut self.state {
281            VolFileState::Idle => {
282                // sqlite sometimes reads the database header without holding a
283                // lock, in this case we are expected to read from the latest
284                // snapshot
285                self.handle()
286                    .reader()
287                    .or_into_ctx()?
288                    .read(self.oracle.as_mut(), page_idx)
289                    .or_into_ctx()?
290            }
291            VolFileState::Shared { reader } => {
292                reader.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
293            }
294            VolFileState::Reserved { writer } => {
295                writer.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
296            }
297            VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
298        };
299
300        let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
301        data.copy_from_slice(&page[range]);
302
303        // check to see if SQLite is reading the file change counter, and if so,
304        // overwrite it with a counter derived from the current snapshot
305        if page_idx == PageIdx::FIRST
306            && local_offset <= FILE_CHANGE_COUNTER_OFFSET
307            && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
308        {
309            // find the location of the file change counter within the out buffer
310            let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
311
312            // we derive the change counter from the snapshot via hashing
313            let snapshot = self.snapshot_or_latest()?;
314            let mut hasher = DefaultHasher::new();
315            snapshot.hash(&mut hasher);
316            let change_counter = hasher.finish() as u32;
317
318            // write the latest change counter to the buffer
319            data[fcc_offset..fcc_offset + 4].copy_from_slice(&change_counter.to_be_bytes());
320        }
321
322        Ok(data.len())
323    }
324
325    fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
326        let VolFileState::Reserved { writer, .. } = &mut self.state else {
327            return Err(Culprit::new_with_note(
328                ErrCtx::InvalidVolumeState,
329                "must hold reserved lock to truncate",
330            ));
331        };
332
333        assert_eq!(
334            size % PAGESIZE.as_usize(),
335            0,
336            "size must be an even multiple of {PAGESIZE}"
337        );
338
339        let pages: PageCount = (size / PAGESIZE.as_usize())
340            .try_into()
341            .expect("size too large");
342
343        writer.truncate(pages);
344        Ok(())
345    }
346
347    fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
348        let VolFileState::Reserved { writer, .. } = &mut self.state else {
349            return Err(Culprit::new_with_note(
350                ErrCtx::InvalidVolumeState,
351                "must hold reserved lock to write",
352            ));
353        };
354
355        // locate the requested page index
356        let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
357            .try_into()
358            .expect("offset out of volume range");
359        // local_offset is the offset *within* the requested page
360        let local_offset = offset % PAGESIZE;
361
362        assert!(
363            local_offset + data.len() <= PAGESIZE,
364            "write must not cross page boundary"
365        );
366
367        let page = if data.len() == PAGESIZE {
368            // writing a full page
369            Page::try_from(data).expect("data is a full page")
370        } else {
371            // writing a partial page
372            // we need to read and then update the page
373            let mut page: BytesMut = writer
374                .read(self.oracle.as_mut(), page_idx)
375                .or_into_ctx()?
376                .into();
377            // SAFETY: we already verified that the write does not cross a page boundary
378            let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
379            page[range].copy_from_slice(data);
380            page.try_into().expect("we did not change the page size")
381        };
382
383        writer.write(page_idx, page);
384        Ok(data.len())
385    }
386}