graft_client/runtime/
volume_writer.rs1use culprit::{Result, ResultExt};
2use graft_core::{PageIdx, VolumeId, page::Page, page_count::PageCount};
3
4use crate::{ClientErr, oracle::Oracle};
5
6use super::{
7 storage::{memtable::Memtable, page::PageStatus, snapshot::Snapshot},
8 volume_reader::{VolumeRead, VolumeReader},
9};
10
11pub trait VolumeWrite {
12 type CommitOutput;
13
14 fn write(&mut self, pageidx: PageIdx, page: Page);
16
17 fn truncate(&mut self, pages: PageCount);
20
21 fn commit(self) -> Result<Self::CommitOutput, ClientErr>;
23}
24
25#[derive(Debug)]
26pub struct VolumeWriter {
27 pages: PageCount,
28 reader: VolumeReader,
29 memtable: Memtable,
30}
31
32impl VolumeWriter {
33 pub fn pages(&self) -> PageCount {
34 self.pages
35 }
36}
37
38impl From<VolumeReader> for VolumeWriter {
39 fn from(reader: VolumeReader) -> Self {
40 let pages = reader.snapshot().map_or(PageCount::ZERO, |s| s.pages());
41 Self {
42 pages,
43 reader,
44 memtable: Default::default(),
45 }
46 }
47}
48
49impl VolumeRead for VolumeWriter {
50 #[inline]
51 fn vid(&self) -> &VolumeId {
52 self.reader.vid()
53 }
54
55 #[inline]
57 fn snapshot(&self) -> Option<&Snapshot> {
58 self.reader.snapshot()
59 }
60
61 fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr> {
63 if let Some(page) = self.memtable.get(pageidx) {
64 oracle.observe_cache_hit(pageidx);
65 return Ok(page.clone());
66 }
67 self.reader.read(oracle, pageidx)
68 }
69
70 fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr> {
72 if self.memtable.contains(pageidx) {
73 return Ok(PageStatus::Dirty);
74 }
75 self.reader.status(pageidx)
76 }
77}
78
79impl VolumeWrite for VolumeWriter {
80 type CommitOutput = VolumeReader;
81
82 fn write(&mut self, pageidx: PageIdx, page: Page) {
83 self.pages = self.pages.max(pageidx.pages());
84 self.memtable.insert(pageidx, page);
85 }
86
87 fn truncate(&mut self, pages: PageCount) {
88 self.pages = pages;
89 self.memtable.truncate(self.pages.last_index())
90 }
91
92 fn commit(self) -> Result<VolumeReader, ClientErr> {
93 let (vid, snapshot, clients, storage) = self.reader.into_parts();
94
95 let snapshot_pagecount = snapshot.as_ref().map_or(PageCount::ZERO, |s| s.pages());
98 let memtable_empty = self.memtable.is_empty();
99 if self.pages == snapshot_pagecount && memtable_empty {
100 return Ok(VolumeReader::new(vid, snapshot, clients, storage));
101 }
102
103 let snapshot = storage
104 .commit(&vid, snapshot, self.pages, self.memtable)
105 .or_into_ctx()?;
106 Ok(VolumeReader::new(vid, Some(snapshot), clients, storage))
107 }
108}