graft_sqlite/file/
vol_file.rs

1use std::{
2    fmt::Debug,
3    hash::{DefaultHasher, Hash, Hasher},
4    mem,
5    sync::Arc,
6};
7
8use bytes::BytesMut;
9use culprit::{Culprit, Result, ResultExt};
10use graft_core::{
11    PageIdx, VolumeId,
12    page::{PAGESIZE, Page},
13    page_count::PageCount,
14};
15use graft_kernel::{
16    graft_reader::{GraftRead, GraftReader},
17    graft_writer::{GraftWrite, GraftWriter},
18    rt::runtime::Runtime,
19    snapshot::Snapshot,
20};
21use parking_lot::{Mutex, MutexGuard};
22use sqlite_plugin::flags::{LockLevel, OpenOpts};
23
24use crate::vfs::ErrCtx;
25
26use super::VfsFile;
27
28// The byte offset of the SQLite file change counter in the database file
29const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
30const VERSION_VALID_FOR_NUMBER_OFFSET: usize = 92;
31
32enum VolFileState {
33    Idle,
34    Shared { reader: GraftReader },
35    Reserved { writer: GraftWriter },
36    Committing,
37}
38
39impl VolFileState {
40    fn name(&self) -> &'static str {
41        match self {
42            VolFileState::Idle => "Idle",
43            VolFileState::Shared { .. } => "Shared",
44            VolFileState::Reserved { .. } => "Reserved",
45            VolFileState::Committing => "Committing",
46        }
47    }
48}
49
50impl Debug for VolFileState {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            VolFileState::Idle => f.write_str("Idle"),
54            VolFileState::Shared { reader } => {
55                f.debug_tuple("Shared").field(reader.snapshot()).finish()
56            }
57            VolFileState::Reserved { writer } => {
58                f.debug_tuple("Reserved").field(writer.snapshot()).finish()
59            }
60            VolFileState::Committing => f.write_str("Committing"),
61        }
62    }
63}
64
65pub struct VolFile {
66    runtime: Runtime,
67    pub tag: String,
68    pub graft: VolumeId,
69    opts: OpenOpts,
70
71    reserved: Arc<Mutex<()>>,
72    state: VolFileState,
73}
74
75impl Debug for VolFile {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        f.debug_struct("VolFile")
78            .field("tag", &self.tag)
79            .field("graft", &self.graft)
80            .field("state", &self.state)
81            .finish()
82    }
83}
84
85impl VolFile {
86    pub fn new(
87        runtime: Runtime,
88        tag: String,
89        graft: VolumeId,
90        opts: OpenOpts,
91        reserved: Arc<Mutex<()>>,
92    ) -> Self {
93        Self {
94            runtime,
95            tag,
96            graft,
97            opts,
98            reserved,
99            state: VolFileState::Idle,
100        }
101    }
102
103    pub fn snapshot_or_latest(&self) -> Result<Snapshot, ErrCtx> {
104        match &self.state {
105            VolFileState::Idle => self.runtime.graft_snapshot(&self.graft).or_into_ctx(),
106            VolFileState::Shared { reader } => Ok(reader.snapshot().clone()),
107            VolFileState::Reserved { writer } => Ok(writer.snapshot().clone()),
108            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
109        }
110    }
111
112    pub fn page_count(&self) -> Result<PageCount, ErrCtx> {
113        match &self.state {
114            VolFileState::Idle => {
115                let snapshot = self.runtime.graft_snapshot(&self.graft).or_into_ctx()?;
116                self.runtime.snapshot_pages(&snapshot).or_into_ctx()
117            }
118            VolFileState::Shared { reader } => reader.page_count().or_into_ctx(),
119            VolFileState::Reserved { writer } => writer.page_count().or_into_ctx(),
120            VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
121        }
122    }
123
124    pub fn is_idle(&self) -> bool {
125        matches!(self.state, VolFileState::Idle)
126    }
127
128    pub fn opts(&self) -> OpenOpts {
129        self.opts
130    }
131
132    pub fn switch_graft(&mut self, graft: &VolumeId) -> Result<(), ErrCtx> {
133        self.runtime
134            .tag_replace(&self.tag, graft.clone())
135            .or_into_ctx()?;
136        self.graft = graft.clone();
137        Ok(())
138    }
139}
140
141impl VfsFile for VolFile {
142    fn readonly(&self) -> bool {
143        false
144    }
145
146    fn in_memory(&self) -> bool {
147        false
148    }
149
150    fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
151        match level {
152            LockLevel::Unlocked => {
153                // SQLite should never request an Unlocked lock
154                unreachable!("bug: invalid request lock(Unlocked)");
155            }
156            LockLevel::Shared => {
157                if let VolFileState::Idle = self.state {
158                    // Transition Idle -> Shared
159                    let reader = self
160                        .runtime
161                        .graft_reader(self.graft.clone())
162                        .or_into_ctx()?;
163                    self.state = VolFileState::Shared { reader };
164                } else {
165                    return Err(Culprit::new_with_note(
166                        ErrCtx::InvalidLockTransition,
167                        format!("invalid lock request Shared in state {}", self.state.name()),
168                    ));
169                }
170            }
171            LockLevel::Reserved => {
172                if let VolFileState::Shared { ref reader } = self.state {
173                    // Transition Shared -> Reserved
174
175                    // Ensure that this VolFile is not readonly
176                    if self.opts.mode().is_readonly() {
177                        return Err(Culprit::new_with_note(
178                            ErrCtx::InvalidLockTransition,
179                            "invalid lock request: Shared -> Reserved: file is read-only",
180                        ));
181                    }
182
183                    // try to acquire the reserved lock or fail if another thread has it
184                    let Some(reserved) = self.reserved.try_lock() else {
185                        return Err(Culprit::new(ErrCtx::Busy));
186                    };
187
188                    // check to see if the snapshot is latest. if it
189                    // has changed we can immediately reject the lock upgrade
190                    if !self
191                        .runtime
192                        .snapshot_is_latest(&self.graft, reader.snapshot())
193                        .or_into_ctx()?
194                    {
195                        return Err(Culprit::new_with_note(
196                            ErrCtx::BusySnapshot,
197                            "unable to lock: Shared -> Reserved: snapshot changed",
198                        ));
199                    }
200
201                    // convert the reader into a writer
202                    self.state = VolFileState::Reserved {
203                        writer: GraftWriter::try_from(reader.clone()).or_into_ctx()?,
204                    };
205
206                    // Explicitly leak the reserved lock
207                    // SAFETY: we depend on SQLite to release the lock when it's done
208                    MutexGuard::leak(reserved);
209                } else {
210                    return Err(Culprit::new_with_note(
211                        ErrCtx::InvalidLockTransition,
212                        format!(
213                            "invalid lock request Reserved in state {}",
214                            self.state.name()
215                        ),
216                    ));
217                }
218            }
219            LockLevel::Pending | LockLevel::Exclusive => {
220                // SQLite should only request these transitions while we are in the Reserved state
221                assert!(
222                    matches!(self.state, VolFileState::Reserved { .. }),
223                    "bug: invalid lock request {:?} in state {}",
224                    level,
225                    self.state.name()
226                );
227            }
228        }
229
230        Ok(())
231    }
232
233    fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
234        match level {
235            LockLevel::Unlocked => match self.state {
236                VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
237                    self.state = VolFileState::Idle;
238                }
239                VolFileState::Reserved { .. } => {
240                    return Err(Culprit::new_with_note(
241                        ErrCtx::InvalidLockTransition,
242                        "invalid unlock request Unlocked in state Reserved",
243                    ));
244                }
245            },
246            LockLevel::Shared => {
247                if let VolFileState::Reserved { writer } =
248                    mem::replace(&mut self.state, VolFileState::Committing)
249                {
250                    // Transition Reserved -> Shared through the Committing state
251                    // If we fail the commit, SQLite will subsequently issue an
252                    // Unlocked request after handling the error
253
254                    // Commit the writer, downgrading to a reader
255                    let reader = writer.commit().or_into_ctx()?;
256                    self.state = VolFileState::Shared { reader };
257
258                    // release the reserved lock
259                    // between threads while holding the lock
260                    // TODO: find a way to assert that this thread actually owns the lock
261                    assert!(self.reserved.is_locked(), "reserved lock must be locked");
262                    // SAFETY: we are in the Reserved state, thus we are holding the lock
263                    // SAFETY: we depend on the connection not being passed
264                    unsafe { self.reserved.force_unlock() };
265                } else {
266                    return Err(Culprit::new_with_note(
267                        ErrCtx::InvalidLockTransition,
268                        format!(
269                            "invalid unlock request Shared in state {}",
270                            self.state.name()
271                        ),
272                    ));
273                }
274            }
275            LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
276                // SQLite should only request these transitions using the lock method
277                unreachable!(
278                    "bug: invalid unlock request {:?} in state {}",
279                    level,
280                    self.state.name()
281                );
282            }
283        }
284
285        Ok(())
286    }
287
288    fn file_size(&mut self) -> Result<usize, ErrCtx> {
289        Ok(PAGESIZE.as_usize() * self.page_count()?.to_usize())
290    }
291
292    fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
293        // locate the page offset of the requested page
294        let pageidx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
295            .try_into()
296            .expect("offset out of volume range");
297        // local_offset is the offset *within* the requested page
298        let local_offset = offset % PAGESIZE;
299
300        assert!(
301            local_offset + data.len() <= PAGESIZE,
302            "read must not cross page boundary"
303        );
304
305        // load the page
306        let page = match &mut self.state {
307            VolFileState::Idle => {
308                // sqlite sometimes reads the database header without holding a
309                // lock, in this case we are expected to read from the latest
310                // snapshot
311                let reader = self
312                    .runtime
313                    .graft_reader(self.graft.clone())
314                    .or_into_ctx()?;
315                reader.read_page(pageidx).or_into_ctx()?
316            }
317            VolFileState::Shared { reader } => reader.read_page(pageidx).or_into_ctx()?,
318            VolFileState::Reserved { writer } => writer.read_page(pageidx).or_into_ctx()?,
319            VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
320        };
321
322        let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
323        data.copy_from_slice(&page[range]);
324
325        // check to see if SQLite is reading the file change counter, and if so,
326        // overwrite it with a counter derived from the current snapshot
327        if pageidx == PageIdx::FIRST
328            && local_offset <= FILE_CHANGE_COUNTER_OFFSET
329            && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
330        {
331            // find the location of the file change counter within the out buffer
332            let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
333
334            // compute the file change counter by hashing the snapshot.
335            // IMPORTANT: we use DefaultHasher which has a fixed seed/secret of
336            // 0 to ensure that the same snapshot gives the same result
337            let snapshot = self.snapshot_or_latest()?;
338            let mut hasher = DefaultHasher::new();
339            snapshot.hash(&mut hasher);
340            let hash = hasher.finish();
341            let change_counter = &hash.to_be_bytes()[..4];
342
343            // write the latest change counter to the buffer
344            data[fcc_offset..fcc_offset + 4].copy_from_slice(change_counter);
345        }
346
347        Ok(data.len())
348    }
349
350    fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
351        let VolFileState::Reserved { writer, .. } = &mut self.state else {
352            return Err(Culprit::new_with_note(
353                ErrCtx::InvalidVolumeState,
354                "must hold reserved lock to truncate",
355            ));
356        };
357
358        assert_eq!(
359            size % PAGESIZE.as_usize(),
360            0,
361            "size must be an even multiple of {PAGESIZE}"
362        );
363
364        let pages: PageCount = (size / PAGESIZE.as_usize())
365            .try_into()
366            .expect("size too large");
367
368        writer.truncate(pages).or_into_ctx()?;
369        Ok(())
370    }
371
372    fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
373        let VolFileState::Reserved { writer, .. } = &mut self.state else {
374            return Err(Culprit::new_with_note(
375                ErrCtx::InvalidVolumeState,
376                "must hold reserved lock to write",
377            ));
378        };
379
380        // locate the requested page index
381        let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
382            .try_into()
383            .expect("offset out of volume range");
384        // local_offset is the offset *within* the requested page
385        let local_offset = offset % PAGESIZE;
386
387        assert!(
388            local_offset + data.len() <= PAGESIZE,
389            "write must not cross page boundary"
390        );
391
392        // if this is a write to the first page, and the write only changes the
393        // file change counter and the version valid for number, we can ignore this write
394        if page_idx == PageIdx::FIRST && data.len() == PAGESIZE && local_offset == 0 {
395            let existing: Page = writer.read_page(page_idx).or_into_ctx()?;
396
397            debug_assert_eq!(data.len(), existing.len(), "page size mismatch");
398
399            let fcc = FILE_CHANGE_COUNTER_OFFSET..FILE_CHANGE_COUNTER_OFFSET + 4;
400            let vvf = VERSION_VALID_FOR_NUMBER_OFFSET..VERSION_VALID_FOR_NUMBER_OFFSET + 4;
401
402            // check the header page is unchanged while ignoring the file change
403            // counter and version valid for number
404            let unchanged =
405                // prefix [0,24)
406                data[..fcc.start]           == existing[..fcc.start] &&
407                // middle (28,92)
408                data[fcc.end..vvf.start]    == existing[fcc.end..vvf.start] &&
409                // suffix (96, end]
410                data[vvf.end..]             == existing[vvf.end..];
411
412            if unchanged {
413                tracing::trace!(
414                    "ignoring write to header page, as only file change counter and version valid for number changed"
415                );
416                return Ok(data.len());
417            }
418        }
419
420        let page = if data.len() == PAGESIZE {
421            // writing a full page
422            Page::try_from(data).expect("data is a full page")
423        } else {
424            // writing a partial page
425            // we need to read and then update the page
426            let mut page: BytesMut = writer.read_page(page_idx).or_into_ctx()?.into();
427            // SAFETY: we already verified that the write does not cross a page boundary
428            let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
429            page[range].copy_from_slice(data);
430            page.try_into().expect("we did not change the page size")
431        };
432
433        writer.write_page(page_idx, page).or_into_ctx()?;
434        Ok(data.len())
435    }
436}