graft_client/runtime/
volume_writer.rs

1use culprit::{Result, ResultExt};
2use graft_core::{PageIdx, VolumeId, page::Page, page_count::PageCount};
3
4use crate::{ClientErr, oracle::Oracle};
5
6use super::{
7    storage::{memtable::Memtable, page::PageStatus, snapshot::Snapshot},
8    volume_reader::{VolumeRead, VolumeReader},
9};
10
11pub trait VolumeWrite {
12    type CommitOutput;
13
14    /// Write a page
15    fn write(&mut self, pageidx: PageIdx, page: Page);
16
17    /// Truncate the volume to a new page count.
18    /// This can be used to increase or decrease the Volume's size.
19    fn truncate(&mut self, pages: PageCount);
20
21    /// Commit the transaction
22    fn commit(self) -> Result<Self::CommitOutput, ClientErr>;
23}
24
25#[derive(Debug)]
26pub struct VolumeWriter {
27    pages: PageCount,
28    reader: VolumeReader,
29    memtable: Memtable,
30}
31
32impl VolumeWriter {
33    pub fn pages(&self) -> PageCount {
34        self.pages
35    }
36}
37
38impl From<VolumeReader> for VolumeWriter {
39    fn from(reader: VolumeReader) -> Self {
40        let pages = reader.snapshot().map_or(PageCount::ZERO, |s| s.pages());
41        Self {
42            pages,
43            reader,
44            memtable: Default::default(),
45        }
46    }
47}
48
49impl VolumeRead for VolumeWriter {
50    #[inline]
51    fn vid(&self) -> &VolumeId {
52        self.reader.vid()
53    }
54
55    /// Access this writer's snapshot
56    #[inline]
57    fn snapshot(&self) -> Option<&Snapshot> {
58        self.reader.snapshot()
59    }
60
61    /// Read a page; supports read your own writes (RYOW)
62    fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr> {
63        if let Some(page) = self.memtable.get(pageidx) {
64            oracle.observe_cache_hit(pageidx);
65            return Ok(page.clone());
66        }
67        self.reader.read(oracle, pageidx)
68    }
69
70    /// Read a page's status; supports read your own writes (RYOW)
71    fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr> {
72        if self.memtable.contains(pageidx) {
73            return Ok(PageStatus::Dirty);
74        }
75        self.reader.status(pageidx)
76    }
77}
78
79impl VolumeWrite for VolumeWriter {
80    type CommitOutput = VolumeReader;
81
82    fn write(&mut self, pageidx: PageIdx, page: Page) {
83        self.pages = self.pages.max(pageidx.pages());
84        self.memtable.insert(pageidx, page);
85    }
86
87    fn truncate(&mut self, pages: PageCount) {
88        self.pages = pages;
89        self.memtable.truncate(self.pages.last_index())
90    }
91
92    fn commit(self) -> Result<VolumeReader, ClientErr> {
93        let (vid, snapshot, clients, storage) = self.reader.into_parts();
94
95        // we have nothing to commit if the page count is equal to the snapshot
96        // pagecount *and* the memtable is empty
97        let snapshot_pagecount = snapshot.as_ref().map_or(PageCount::ZERO, |s| s.pages());
98        let memtable_empty = self.memtable.is_empty();
99        if self.pages == snapshot_pagecount && memtable_empty {
100            return Ok(VolumeReader::new(vid, snapshot, clients, storage));
101        }
102
103        let snapshot = storage
104            .commit(&vid, snapshot, self.pages, self.memtable)
105            .or_into_ctx()?;
106        Ok(VolumeReader::new(vid, Some(snapshot), clients, storage))
107    }
108}