1use std::{
2 fmt::Debug,
3 hash::{DefaultHasher, Hash, Hasher},
4 mem,
5 sync::Arc,
6};
7
8use bytes::BytesMut;
9use culprit::{Culprit, Result, ResultExt};
10use graft_client::{
11 oracle::LeapOracle,
12 runtime::{
13 storage::snapshot::Snapshot,
14 volume_handle::VolumeHandle,
15 volume_reader::{VolumeRead, VolumeReader},
16 volume_writer::{VolumeWrite, VolumeWriter},
17 },
18};
19use graft_core::{
20 PageIdx,
21 page::{PAGESIZE, Page},
22 page_count::PageCount,
23};
24use parking_lot::{Mutex, MutexGuard};
25use sqlite_plugin::flags::{LockLevel, OpenOpts};
26
27use crate::vfs::ErrCtx;
28
29use super::VfsFile;
30
31const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
33
34#[derive(Debug)]
35enum VolFileState {
36 Idle,
37 Shared { reader: VolumeReader },
38 Reserved { writer: VolumeWriter },
39 Committing,
40}
41
42impl VolFileState {
43 fn name(&self) -> &'static str {
44 match self {
45 VolFileState::Idle => "Idle",
46 VolFileState::Shared { .. } => "Shared",
47 VolFileState::Reserved { .. } => "Reserved",
48 VolFileState::Committing => "Committing",
49 }
50 }
51}
52
53pub struct VolFile {
54 handle: VolumeHandle,
55 opts: OpenOpts,
56
57 reserved: Arc<Mutex<()>>,
58 state: VolFileState,
59 oracle: Box<LeapOracle>,
60}
61
62impl Debug for VolFile {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.write_str(&self.handle.vid().pretty())
65 }
66}
67
68impl VolFile {
69 pub fn new(handle: VolumeHandle, opts: OpenOpts, reserved: Arc<Mutex<()>>) -> Self {
70 Self {
71 handle,
72 opts,
73 reserved,
74 state: VolFileState::Idle,
75 oracle: Default::default(),
76 }
77 }
78
79 pub fn snapshot_or_latest(&self) -> Result<Option<Snapshot>, ErrCtx> {
80 match &self.state {
81 VolFileState::Idle => self.handle().snapshot().or_into_ctx(),
82 VolFileState::Shared { reader, .. } => Ok(reader.snapshot().cloned()),
83 VolFileState::Reserved { writer, .. } => Ok(writer.snapshot().cloned()),
84 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
85 }
86 }
87
88 pub fn handle(&self) -> &VolumeHandle {
89 &self.handle
90 }
91
92 pub fn opts(&self) -> OpenOpts {
93 self.opts
94 }
95
96 pub fn close(self) -> VolumeHandle {
97 self.handle
98 }
99}
100
101impl VfsFile for VolFile {
102 fn readonly(&self) -> bool {
103 false
104 }
105
106 fn in_memory(&self) -> bool {
107 false
108 }
109
110 fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
111 match level {
112 LockLevel::Unlocked => {
113 unreachable!("bug: invalid request lock(Unlocked)");
115 }
116 LockLevel::Shared => {
117 if let VolFileState::Idle = self.state {
118 let reader = self.handle.reader().or_into_ctx()?;
120 self.state = VolFileState::Shared { reader };
121 } else {
122 return Err(Culprit::new_with_note(
123 ErrCtx::InvalidLockTransition,
124 format!("invalid lock request Shared in state {}", self.state.name()),
125 ));
126 }
127 }
128 LockLevel::Reserved => {
129 if let VolFileState::Shared { ref reader } = self.state {
130 if self.opts.mode().is_readonly() {
134 return Err(Culprit::new_with_note(
135 ErrCtx::InvalidLockTransition,
136 "invalid lock request: Shared -> Reserved: file is read-only",
137 ));
138 }
139
140 let Some(reserved) = self.reserved.try_lock() else {
142 return Err(Culprit::new(ErrCtx::Busy));
143 };
144
145 let latest_snapshot = self.handle.snapshot().or_into_ctx()?;
147
148 let writer = if reader.snapshot().map(|s| s.local())
153 != latest_snapshot.as_ref().map(|s| s.local())
154 {
155 return Err(Culprit::new_with_note(
158 ErrCtx::BusySnapshot,
159 "unable to lock: Shared -> Reserved: snapshot changed",
160 ));
161 } else {
162 self.handle.writer_at(latest_snapshot)
164 };
165
166 self.state = VolFileState::Reserved { writer };
167
168 MutexGuard::leak(reserved);
171 } else {
172 return Err(Culprit::new_with_note(
173 ErrCtx::InvalidLockTransition,
174 format!(
175 "invalid lock request Reserved in state {}",
176 self.state.name()
177 ),
178 ));
179 }
180 }
181 LockLevel::Pending | LockLevel::Exclusive => {
182 assert!(
184 matches!(self.state, VolFileState::Reserved { .. }),
185 "bug: invalid lock request {:?} in state {}",
186 level,
187 self.state.name()
188 );
189 }
190 }
191
192 Ok(())
193 }
194
195 fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
196 match level {
197 LockLevel::Unlocked => match self.state {
198 VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
199 self.state = VolFileState::Idle;
200 }
201 VolFileState::Reserved { .. } => {
202 return Err(Culprit::new_with_note(
203 ErrCtx::InvalidLockTransition,
204 "invalid unlock request Unlocked in state Reserved",
205 ));
206 }
207 },
208 LockLevel::Shared => {
209 if let VolFileState::Reserved { writer } =
210 mem::replace(&mut self.state, VolFileState::Committing)
211 {
212 let reader = writer.commit().or_into_ctx()?;
218 self.state = VolFileState::Shared { reader };
219
220 assert!(self.reserved.is_locked(), "reserved lock must be locked");
226 unsafe { self.reserved.force_unlock() };
227 } else {
228 return Err(Culprit::new_with_note(
229 ErrCtx::InvalidLockTransition,
230 format!(
231 "invalid unlock request Shared in state {}",
232 self.state.name()
233 ),
234 ));
235 }
236 }
237 LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
238 unreachable!(
240 "bug: invalid unlock request {:?} in state {}",
241 level,
242 self.state.name()
243 );
244 }
245 }
246
247 Ok(())
248 }
249
250 fn file_size(&mut self) -> Result<usize, ErrCtx> {
251 let pages = match &self.state {
252 VolFileState::Idle => self
253 .handle
254 .snapshot()
255 .or_into_ctx()?
256 .map_or(PageCount::ZERO, |snapshot| snapshot.pages()),
257 VolFileState::Shared { reader, .. } => {
258 reader.snapshot().map_or(PageCount::ZERO, |s| s.pages())
259 }
260 VolFileState::Reserved { writer, .. } => writer.pages(),
261 VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
262 };
263 Ok((PAGESIZE * pages.to_usize()).as_usize())
264 }
265
266 fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
267 let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
269 .try_into()
270 .expect("offset out of volume range");
271 let local_offset = offset % PAGESIZE;
273
274 assert!(
275 local_offset + data.len() <= PAGESIZE,
276 "read must not cross page boundary"
277 );
278
279 let page = match &mut self.state {
281 VolFileState::Idle => {
282 self.handle()
286 .reader()
287 .or_into_ctx()?
288 .read(self.oracle.as_mut(), page_idx)
289 .or_into_ctx()?
290 }
291 VolFileState::Shared { reader } => {
292 reader.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
293 }
294 VolFileState::Reserved { writer } => {
295 writer.read(self.oracle.as_mut(), page_idx).or_into_ctx()?
296 }
297 VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
298 };
299
300 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
301 data.copy_from_slice(&page[range]);
302
303 if page_idx == PageIdx::FIRST
306 && local_offset <= FILE_CHANGE_COUNTER_OFFSET
307 && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
308 {
309 let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
311
312 let snapshot = self.snapshot_or_latest()?;
314 let mut hasher = DefaultHasher::new();
315 snapshot.hash(&mut hasher);
316 let change_counter = hasher.finish() as u32;
317
318 data[fcc_offset..fcc_offset + 4].copy_from_slice(&change_counter.to_be_bytes());
320 }
321
322 Ok(data.len())
323 }
324
325 fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
326 let VolFileState::Reserved { writer, .. } = &mut self.state else {
327 return Err(Culprit::new_with_note(
328 ErrCtx::InvalidVolumeState,
329 "must hold reserved lock to truncate",
330 ));
331 };
332
333 assert_eq!(
334 size % PAGESIZE.as_usize(),
335 0,
336 "size must be an even multiple of {PAGESIZE}"
337 );
338
339 let pages: PageCount = (size / PAGESIZE.as_usize())
340 .try_into()
341 .expect("size too large");
342
343 writer.truncate(pages);
344 Ok(())
345 }
346
347 fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
348 let VolFileState::Reserved { writer, .. } = &mut self.state else {
349 return Err(Culprit::new_with_note(
350 ErrCtx::InvalidVolumeState,
351 "must hold reserved lock to write",
352 ));
353 };
354
355 let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
357 .try_into()
358 .expect("offset out of volume range");
359 let local_offset = offset % PAGESIZE;
361
362 assert!(
363 local_offset + data.len() <= PAGESIZE,
364 "write must not cross page boundary"
365 );
366
367 let page = if data.len() == PAGESIZE {
368 Page::try_from(data).expect("data is a full page")
370 } else {
371 let mut page: BytesMut = writer
374 .read(self.oracle.as_mut(), page_idx)
375 .or_into_ctx()?
376 .into();
377 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
379 page[range].copy_from_slice(data);
380 page.try_into().expect("we did not change the page size")
381 };
382
383 writer.write(page_idx, page);
384 Ok(data.len())
385 }
386}