1use std::{
2 borrow::Cow,
3 fmt::Debug,
4 hash::{DefaultHasher, Hash, Hasher},
5 mem,
6 sync::Arc,
7};
8
9use bytes::BytesMut;
10use culprit::{Culprit, Result, ResultExt};
11use graft_client::{
12 oracle::LeapOracle,
13 runtime::{
14 storage::snapshot::Snapshot,
15 volume_handle::VolumeHandle,
16 volume_reader::{VolumeRead, VolumeReadRef, VolumeReader},
17 volume_writer::{VolumeWrite, VolumeWriter},
18 },
19};
20use graft_core::{
21 PageIdx, VolumeId,
22 page::{PAGESIZE, Page},
23 page_count::PageCount,
24};
25use parking_lot::{Mutex, MutexGuard};
26use sqlite_plugin::flags::{LockLevel, OpenOpts};
27
28use crate::vfs::ErrCtx;
29
30use super::VfsFile;
31
32const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
34const VERSION_VALID_FOR_NUMBER_OFFSET: usize = 92;
35
36#[derive(Debug)]
37enum VolFileState {
38 Idle,
39 Shared { reader: VolumeReader },
40 Reserved { writer: VolumeWriter },
41 Committing,
42}
43
44impl VolFileState {
45 fn name(&self) -> &'static str {
46 match self {
47 VolFileState::Idle => "Idle",
48 VolFileState::Shared { .. } => "Shared",
49 VolFileState::Reserved { .. } => "Reserved",
50 VolFileState::Committing => "Committing",
51 }
52 }
53}
54
55pub struct VolFile {
56 handle: VolumeHandle,
57 opts: OpenOpts,
58
59 reserved: Arc<Mutex<()>>,
60 state: VolFileState,
61 oracle: Box<LeapOracle>,
62}
63
64impl Debug for VolFile {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.write_str(&self.handle.vid().pretty())
67 }
68}
69
70impl VolFile {
71 pub fn new(handle: VolumeHandle, opts: OpenOpts, reserved: Arc<Mutex<()>>) -> Self {
72 Self {
73 handle,
74 opts,
75 reserved,
76 state: VolFileState::Idle,
77 oracle: Default::default(),
78 }
79 }
80
81 pub fn snapshot_or_latest(&self) -> Result<Option<Snapshot>, ErrCtx> {
82 match &self.state {
83 VolFileState::Idle => self.handle.snapshot().or_into_ctx(),
84 VolFileState::Shared { reader, .. } => Ok(reader.snapshot().cloned()),
85 VolFileState::Reserved { writer, .. } => Ok(writer.snapshot().cloned()),
86 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
87 }
88 }
89
90 pub fn reader(&self) -> Result<VolumeReadRef<'_>, ErrCtx> {
91 match &self.state {
92 VolFileState::Idle => Ok(VolumeReadRef::Reader(Cow::Owned(
93 self.handle.reader().or_into_ctx()?,
94 ))),
95 VolFileState::Shared { reader, .. } => Ok(VolumeReadRef::Reader(Cow::Borrowed(reader))),
96 VolFileState::Reserved { writer, .. } => Ok(VolumeReadRef::Writer(writer)),
97 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
98 }
99 }
100
101 pub fn pull(&mut self) -> Result<(), ErrCtx> {
103 let mut oracle = self.oracle.as_ref().clone();
104 let reader = self.reader()?;
105 let pages = reader.snapshot().map(|s| s.pages()).unwrap_or_default();
106 for pageidx in pages.iter() {
107 reader.read(&mut oracle, pageidx).or_into_ctx()?;
108 }
109 Ok(())
110 }
111
112 pub fn vid(&self) -> &VolumeId {
113 self.handle.vid()
114 }
115
116 pub fn handle(&self) -> &VolumeHandle {
117 &self.handle
118 }
119
120 pub fn opts(&self) -> OpenOpts {
121 self.opts
122 }
123
124 pub fn close(self) -> VolumeHandle {
125 self.handle
126 }
127}
128
129impl VfsFile for VolFile {
130 fn readonly(&self) -> bool {
131 false
132 }
133
134 fn in_memory(&self) -> bool {
135 false
136 }
137
138 fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
139 match level {
140 LockLevel::Unlocked => {
141 unreachable!("bug: invalid request lock(Unlocked)");
143 }
144 LockLevel::Shared => {
145 if let VolFileState::Idle = self.state {
146 let reader = self.handle.reader().or_into_ctx()?;
148 self.state = VolFileState::Shared { reader };
149 } else {
150 return Err(Culprit::new_with_note(
151 ErrCtx::InvalidLockTransition,
152 format!("invalid lock request Shared in state {}", self.state.name()),
153 ));
154 }
155 }
156 LockLevel::Reserved => {
157 if let VolFileState::Shared { ref reader } = self.state {
158 if self.opts.mode().is_readonly() {
162 return Err(Culprit::new_with_note(
163 ErrCtx::InvalidLockTransition,
164 "invalid lock request: Shared -> Reserved: file is read-only",
165 ));
166 }
167
168 let Some(reserved) = self.reserved.try_lock() else {
170 return Err(Culprit::new(ErrCtx::Busy));
171 };
172
173 let latest_snapshot = self.handle.snapshot().or_into_ctx()?;
175
176 let writer = if reader.snapshot().map(|s| s.local())
181 != latest_snapshot.as_ref().map(|s| s.local())
182 {
183 return Err(Culprit::new_with_note(
186 ErrCtx::BusySnapshot,
187 "unable to lock: Shared -> Reserved: snapshot changed",
188 ));
189 } else {
190 self.handle.writer_at(latest_snapshot)
192 };
193
194 self.state = VolFileState::Reserved { writer };
195
196 MutexGuard::leak(reserved);
199 } else {
200 return Err(Culprit::new_with_note(
201 ErrCtx::InvalidLockTransition,
202 format!(
203 "invalid lock request Reserved in state {}",
204 self.state.name()
205 ),
206 ));
207 }
208 }
209 LockLevel::Pending | LockLevel::Exclusive => {
210 assert!(
212 matches!(self.state, VolFileState::Reserved { .. }),
213 "bug: invalid lock request {:?} in state {}",
214 level,
215 self.state.name()
216 );
217 }
218 }
219
220 Ok(())
221 }
222
223 fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
224 match level {
225 LockLevel::Unlocked => match self.state {
226 VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
227 self.state = VolFileState::Idle;
228 }
229 VolFileState::Reserved { .. } => {
230 return Err(Culprit::new_with_note(
231 ErrCtx::InvalidLockTransition,
232 "invalid unlock request Unlocked in state Reserved",
233 ));
234 }
235 },
236 LockLevel::Shared => {
237 if let VolFileState::Reserved { writer } =
238 mem::replace(&mut self.state, VolFileState::Committing)
239 {
240 let reader = writer.commit().or_into_ctx()?;
246 self.state = VolFileState::Shared { reader };
247
248 assert!(self.reserved.is_locked(), "reserved lock must be locked");
254 unsafe { self.reserved.force_unlock() };
255 } else {
256 return Err(Culprit::new_with_note(
257 ErrCtx::InvalidLockTransition,
258 format!(
259 "invalid unlock request Shared in state {}",
260 self.state.name()
261 ),
262 ));
263 }
264 }
265 LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
266 unreachable!(
268 "bug: invalid unlock request {:?} in state {}",
269 level,
270 self.state.name()
271 );
272 }
273 }
274
275 Ok(())
276 }
277
278 fn file_size(&mut self) -> Result<usize, ErrCtx> {
279 let pages = match &self.state {
280 VolFileState::Idle => self
281 .handle
282 .snapshot()
283 .or_into_ctx()?
284 .map_or(PageCount::ZERO, |snapshot| snapshot.pages()),
285 VolFileState::Shared { reader, .. } => {
286 reader.snapshot().map_or(PageCount::ZERO, |s| s.pages())
287 }
288 VolFileState::Reserved { writer, .. } => writer.pages(),
289 VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
290 };
291 Ok((PAGESIZE * pages.to_usize()).as_usize())
292 }
293
294 fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
295 let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
297 .try_into()
298 .expect("offset out of volume range");
299 let local_offset = offset % PAGESIZE;
301
302 assert!(
303 local_offset + data.len() <= PAGESIZE,
304 "read must not cross page boundary"
305 );
306
307 let page = match &mut self.state {
309 VolFileState::Idle => {
310 self.handle
314 .reader()
315 .or_into_ctx()?
316 .read(self.oracle.as_mut(), page_idx)
317 .or_into_ctx()?
318 }
319 VolFileState::Shared { reader } => {
320 reader.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
321 }
322 VolFileState::Reserved { writer } => {
323 writer.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
324 }
325 VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
326 };
327
328 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
329 data.copy_from_slice(&page[range]);
330
331 if page_idx == PageIdx::FIRST
334 && local_offset <= FILE_CHANGE_COUNTER_OFFSET
335 && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
336 {
337 let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
339
340 let snapshot = self.snapshot_or_latest()?;
342 let mut hasher = DefaultHasher::new();
343 snapshot.hash(&mut hasher);
344 let change_counter = hasher.finish() as u32;
345
346 data[fcc_offset..fcc_offset + 4].copy_from_slice(&change_counter.to_be_bytes());
348 }
349
350 Ok(data.len())
351 }
352
353 fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
354 let VolFileState::Reserved { writer, .. } = &mut self.state else {
355 return Err(Culprit::new_with_note(
356 ErrCtx::InvalidVolumeState,
357 "must hold reserved lock to truncate",
358 ));
359 };
360
361 assert_eq!(
362 size % PAGESIZE.as_usize(),
363 0,
364 "size must be an even multiple of {PAGESIZE}"
365 );
366
367 let pages: PageCount = (size / PAGESIZE.as_usize())
368 .try_into()
369 .expect("size too large");
370
371 writer.truncate(pages);
372 Ok(())
373 }
374
375 fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
376 let VolFileState::Reserved { writer, .. } = &mut self.state else {
377 return Err(Culprit::new_with_note(
378 ErrCtx::InvalidVolumeState,
379 "must hold reserved lock to write",
380 ));
381 };
382
383 let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
385 .try_into()
386 .expect("offset out of volume range");
387 let local_offset = offset % PAGESIZE;
389
390 assert!(
391 local_offset + data.len() <= PAGESIZE,
392 "write must not cross page boundary"
393 );
394
395 if page_idx == PageIdx::FIRST && data.len() == PAGESIZE && local_offset == 0 {
398 let existing: Page = writer.read(self.oracle.as_mut(), page_idx).or_into_ctx()?;
399
400 debug_assert_eq!(data.len(), existing.len(), "page size mismatch");
401
402 let fcc = FILE_CHANGE_COUNTER_OFFSET..FILE_CHANGE_COUNTER_OFFSET + 4;
403 let vvf = VERSION_VALID_FOR_NUMBER_OFFSET..VERSION_VALID_FOR_NUMBER_OFFSET + 4;
404
405 let unchanged =
408 data[..fcc.start] == existing[..fcc.start] &&
410 data[fcc.end..vvf.start] == existing[fcc.end..vvf.start] &&
412 data[vvf.end..] == existing[vvf.end..];
414
415 if unchanged {
416 tracing::trace!(
417 "ignoring write to header page, file change counter and version valid for number unchanged"
418 );
419 return Ok(data.len());
420 }
421 }
422
423 let page = if data.len() == PAGESIZE {
424 Page::try_from(data).expect("data is a full page")
426 } else {
427 let mut page: BytesMut = writer
430 .read(self.oracle.as_mut(), page_idx)
431 .or_into_ctx()?
432 .into();
433 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
435 page[range].copy_from_slice(data);
436 page.try_into().expect("we did not change the page size")
437 };
438
439 writer.write(page_idx, page);
440 Ok(data.len())
441 }
442}