1use std::{borrow::Cow, collections::HashMap, iter::once, sync::Arc};
2
3use culprit::{Result, ResultExt};
4
5use graft_core::{
6 PageIdx, VolumeId,
7 lsn::LSN,
8 page::{EMPTY_PAGE, Page},
9};
10use splinter_rs::Splinter;
11use tracing::field;
12
13use crate::{ClientErr, ClientPair, oracle::Oracle};
14
15use super::{
16 storage::{
17 Storage,
18 page::{PageStatus, PageValue},
19 snapshot::Snapshot,
20 },
21 volume_writer::VolumeWriter,
22};
23
24pub trait VolumeRead {
25 fn vid(&self) -> &VolumeId;
26
27 fn snapshot(&self) -> Option<&Snapshot>;
29
30 fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr>;
32
33 fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr>;
35}
36
37#[derive(Debug, Clone)]
38pub struct VolumeReader {
39 vid: VolumeId,
40 snapshot: Option<Snapshot>,
41 clients: Arc<ClientPair>,
42 storage: Arc<Storage>,
43}
44
45impl VolumeReader {
46 pub(crate) fn new(
47 vid: VolumeId,
48 snapshot: Option<Snapshot>,
49 clients: Arc<ClientPair>,
50 storage: Arc<Storage>,
51 ) -> Self {
52 Self { vid, snapshot, clients, storage }
53 }
54
55 pub fn upgrade(self) -> VolumeWriter {
57 self.into()
58 }
59
60 pub(crate) fn into_parts(self) -> (VolumeId, Option<Snapshot>, Arc<ClientPair>, Arc<Storage>) {
62 (self.vid, self.snapshot, self.clients, self.storage)
63 }
64}
65
66impl VolumeRead for VolumeReader {
67 #[inline]
68 fn vid(&self) -> &VolumeId {
69 &self.vid
70 }
71
72 #[inline]
73 fn snapshot(&self) -> Option<&Snapshot> {
74 self.snapshot.as_ref()
75 }
76
77 fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr> {
78 if let Some(snapshot) = self.snapshot() {
79 match self
80 .storage
81 .read(self.vid(), snapshot.local(), pageidx)
82 .or_into_ctx()?
83 {
84 (_, PageValue::Available(page)) => {
85 oracle.observe_cache_hit(pageidx);
86 Ok(page)
87 }
88 (_, PageValue::Empty) => {
89 oracle.observe_cache_hit(pageidx);
90 Ok(EMPTY_PAGE)
91 }
92 (_, PageValue::Pending) => {
93 if let Some((remote_lsn, local_lsn)) = snapshot.remote_mapping().splat() {
94 fetch_page(
95 &self.clients,
96 &self.storage,
97 oracle,
98 self.vid(),
99 remote_lsn,
100 local_lsn,
101 pageidx,
102 )
103 .or_into_ctx()
104 } else {
105 Ok(EMPTY_PAGE)
106 }
107 }
108 }
109 } else {
110 Ok(EMPTY_PAGE)
111 }
112 }
113
114 fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr> {
115 if let Some(snapshot) = self.snapshot() {
116 match self
117 .storage
118 .read(self.vid(), snapshot.local(), pageidx)
119 .or_into_ctx()?
120 {
121 (lsn, PageValue::Available(_)) => Ok(PageStatus::Available(lsn)),
122 (lsn, PageValue::Empty) => Ok(PageStatus::Empty(Some(lsn))),
123 (_, PageValue::Pending) => Ok(PageStatus::Pending),
124 }
125 } else {
126 Ok(PageStatus::Empty(None))
127 }
128 }
129}
130
131fn fetch_page<O: Oracle>(
132 clients: &ClientPair,
133 storage: &Storage,
134 oracle: &mut O,
135 vid: &VolumeId,
136 remote_lsn: LSN,
137 local_lsn: LSN,
138 pageidx: PageIdx,
139) -> Result<Page, ClientErr> {
140 let span = tracing::trace_span!(
141 "fetching page from pagestore",
142 ?vid,
143 %remote_lsn,
144 %local_lsn,
145 %pageidx,
146 num_pages=field::Empty,
147 )
148 .entered();
149
150 let mut graft = Splinter::default();
153 let mut pages = HashMap::new();
154 for idx in once(pageidx).chain(oracle.predict_next(pageidx)) {
155 let (lsn, page) = storage.read(vid, local_lsn, idx).or_into_ctx()?;
156 if matches!(page, PageValue::Pending) {
157 graft.insert(idx.to_u32());
158 pages.insert(idx, (lsn, PageValue::Empty));
159 }
160 }
161
162 span.record("num_pages", pages.len());
163
164 let response = clients
166 .pagestore()
167 .read_pages(vid, remote_lsn, graft.serialize_to_bytes())?;
168 for page in response {
169 if let Some(entry) = pages.get_mut(&page.pageidx().or_into_ctx()?) {
170 entry.1 = page.page().or_into_ctx()?.into();
171 } else {
172 tracing::warn!(?vid, %remote_lsn, pageidx=page.pageidx, "unexpected page");
173 precept::expect_unreachable!(
174 "received unexpected page from pagestore",
175 {
176 "vid": vid,
177 "remote_lsn": remote_lsn,
178 "pageidx": page.pageidx,
179 }
180 );
181 }
182 }
183
184 let requested_page = pages
185 .get(&pageidx)
186 .cloned()
187 .and_then(|(_, p)| p.try_into_page())
188 .expect("requested page not found");
189
190 storage.receive_pages(vid, pages).or_into_ctx()?;
192
193 Ok(requested_page)
195}
196
197pub enum VolumeReadRef<'a> {
198 Reader(Cow<'a, VolumeReader>),
199 Writer(&'a VolumeWriter),
200}
201
202impl VolumeRead for VolumeReadRef<'_> {
203 fn vid(&self) -> &VolumeId {
204 match self {
205 VolumeReadRef::Reader(reader) => reader.vid(),
206 VolumeReadRef::Writer(writer) => writer.vid(),
207 }
208 }
209
210 fn snapshot(&self) -> Option<&Snapshot> {
211 match self {
212 VolumeReadRef::Reader(reader) => reader.snapshot(),
213 VolumeReadRef::Writer(writer) => writer.snapshot(),
214 }
215 }
216
217 fn read<O: Oracle>(&self, oracle: &mut O, pageidx: PageIdx) -> Result<Page, ClientErr> {
218 match self {
219 VolumeReadRef::Reader(reader) => reader.read(oracle, pageidx),
220 VolumeReadRef::Writer(writer) => writer.read(oracle, pageidx),
221 }
222 }
223
224 fn status(&self, pageidx: PageIdx) -> Result<PageStatus, ClientErr> {
225 match self {
226 VolumeReadRef::Reader(reader) => reader.status(pageidx),
227 VolumeReadRef::Writer(writer) => writer.status(pageidx),
228 }
229 }
230}