graft_client/runtime/
volume_reader.rs

1use std::{borrow::Cow, collections::HashMap, iter::once, sync::Arc};
2
3use culprit::{Result, ResultExt};
4
5use graft_core::{
6    PageIdx, VolumeId,
7    lsn::LSN,
8    page::{EMPTY_PAGE, Page},
9};
10use splinter_rs::Splinter;
11use tracing::field;
12
13use crate::{ClientErr, ClientPair, oracle::Oracle};
14
15use super::{
16    storage::{
17        Storage,
18        page::{PageStatus, PageValue},
19        snapshot::Snapshot,
20    },
21    volume_writer::VolumeWriter,
22};
23
24pub trait VolumeRead {
25    fn vid(&self) -> &VolumeId;
26
27    /// Retrieve the Volume snapshot backing this reader
28    fn snapshot(&self) -> Option<&Snapshot>;
29
30    /// Read a page from the snapshot
31    fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr>;
32
33    /// Retrieve a page's status
34    fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr>;
35}
36
37#[derive(Debug, Clone)]
38pub struct VolumeReader {
39    vid: VolumeId,
40    snapshot: Option<Snapshot>,
41    clients: Arc<ClientPair>,
42    storage: Arc<Storage>,
43}
44
45impl VolumeReader {
46    pub(crate) fn new(
47        vid: VolumeId,
48        snapshot: Option<Snapshot>,
49        clients: Arc<ClientPair>,
50        storage: Arc<Storage>,
51    ) -> Self {
52        Self { vid, snapshot, clients, storage }
53    }
54
55    /// Upgrade this reader into a writer
56    pub fn upgrade(self) -> VolumeWriter {
57        self.into()
58    }
59
60    /// decompose this reader into snapshot and storage
61    pub(crate) fn into_parts(self) -> (VolumeId, Option<Snapshot>, Arc<ClientPair>, Arc<Storage>) {
62        (self.vid, self.snapshot, self.clients, self.storage)
63    }
64}
65
66impl VolumeRead for VolumeReader {
67    #[inline]
68    fn vid(&self) -> &VolumeId {
69        &self.vid
70    }
71
72    #[inline]
73    fn snapshot(&self) -> Option<&Snapshot> {
74        self.snapshot.as_ref()
75    }
76
77    fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr> {
78        if let Some(snapshot) = self.snapshot() {
79            match self
80                .storage
81                .read(self.vid(), snapshot.local(), pageidx)
82                .or_into_ctx()?
83            {
84                (_, PageValue::Available(page)) => {
85                    oracle.observe_cache_hit(pageidx);
86                    Ok(page)
87                }
88                (_, PageValue::Empty) => {
89                    oracle.observe_cache_hit(pageidx);
90                    Ok(EMPTY_PAGE)
91                }
92                (_, PageValue::Pending) => {
93                    if let Some((remote_lsn, local_lsn)) = snapshot.remote_mapping().splat() {
94                        fetch_page(
95                            &self.clients,
96                            &self.storage,
97                            oracle,
98                            self.vid(),
99                            remote_lsn,
100                            local_lsn,
101                            pageidx,
102                        )
103                        .or_into_ctx()
104                    } else {
105                        Ok(EMPTY_PAGE)
106                    }
107                }
108            }
109        } else {
110            Ok(EMPTY_PAGE)
111        }
112    }
113
114    fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr> {
115        if let Some(snapshot) = self.snapshot() {
116            match self
117                .storage
118                .read(self.vid(), snapshot.local(), pageidx)
119                .or_into_ctx()?
120            {
121                (lsn, PageValue::Available(_)) => Ok(PageStatus::Available(lsn)),
122                (lsn, PageValue::Empty) => Ok(PageStatus::Empty(Some(lsn))),
123                (_, PageValue::Pending) => Ok(PageStatus::Pending),
124            }
125        } else {
126            Ok(PageStatus::Empty(None))
127        }
128    }
129}
130
131fn fetch_page<O: Oracle>(
132    clients: &ClientPair,
133    storage: &Storage,
134    oracle: &mut O,
135    vid: &VolumeId,
136    remote_lsn: LSN,
137    local_lsn: LSN,
138    pageidx: PageIdx,
139) -> Result<Page, ClientErr> {
140    let span = tracing::trace_span!(
141        "fetching page from pagestore",
142        ?vid,
143        %remote_lsn,
144        %local_lsn,
145        %pageidx,
146        num_pages=field::Empty,
147    )
148    .entered();
149
150    // predict future page fetches using the oracle, then eliminate pages we
151    // have already fetched while building our update hashmap.
152    let mut graft = Splinter::default();
153    let mut pages = HashMap::new();
154    for idx in once(pageidx).chain(oracle.predict_next(pageidx)) {
155        let (lsn, page) = storage.read(vid, local_lsn, idx).or_into_ctx()?;
156        if matches!(page, PageValue::Pending) {
157            graft.insert(idx.to_u32());
158            pages.insert(idx, (lsn, PageValue::Empty));
159        }
160    }
161
162    span.record("num_pages", pages.len());
163
164    // process client results and update the hashmap
165    let response = clients
166        .pagestore()
167        .read_pages(vid, remote_lsn, graft.serialize_to_bytes())?;
168    for page in response {
169        if let Some(entry) = pages.get_mut(&page.pageidx().or_into_ctx()?) {
170            entry.1 = page.page().or_into_ctx()?.into();
171        } else {
172            tracing::warn!(?vid, %remote_lsn, pageidx=page.pageidx, "unexpected page");
173            precept::expect_unreachable!(
174                "received unexpected page from pagestore",
175                {
176                    "vid": vid,
177                    "remote_lsn": remote_lsn,
178                    "pageidx": page.pageidx,
179                }
180            );
181        }
182    }
183
184    let requested_page = pages
185        .get(&pageidx)
186        .cloned()
187        .and_then(|(_, p)| p.try_into_page())
188        .expect("requested page not found");
189
190    // update local storage with fetched pages
191    storage.receive_pages(vid, pages).or_into_ctx()?;
192
193    // return the requested page
194    Ok(requested_page)
195}
196
197pub enum VolumeReadRef<'a> {
198    Reader(Cow<'a, VolumeReader>),
199    Writer(&'a VolumeWriter),
200}
201
202impl VolumeRead for VolumeReadRef<'_> {
203    fn vid(&self) -> &VolumeId {
204        match self {
205            VolumeReadRef::Reader(reader) => reader.vid(),
206            VolumeReadRef::Writer(writer) => writer.vid(),
207        }
208    }
209
210    fn snapshot(&self) -> Option<&Snapshot> {
211        match self {
212            VolumeReadRef::Reader(reader) => reader.snapshot(),
213            VolumeReadRef::Writer(writer) => writer.snapshot(),
214        }
215    }
216
217    fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr> {
218        match self {
219            VolumeReadRef::Reader(reader) => reader.read(oracle, pageidx),
220            VolumeReadRef::Writer(writer) => writer.read(oracle, pageidx),
221        }
222    }
223
224    fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr> {
225        match self {
226            VolumeReadRef::Reader(reader) => reader.status(pageidx),
227            VolumeReadRef::Writer(writer) => writer.status(pageidx),
228        }
229    }
230}