1use std::{
2 borrow::Cow,
3 fmt::Debug,
4 hash::{DefaultHasher, Hash, Hasher},
5 mem,
6 sync::Arc,
7};
8
9use bytes::BytesMut;
10use culprit::{Culprit, Result, ResultExt};
11use graft::core::{
12 PageIdx, VolumeId,
13 page::{PAGESIZE, Page},
14 page_count::PageCount,
15};
16use graft::{
17 rt::runtime::Runtime,
18 snapshot::Snapshot,
19 volume_reader::{VolumeRead, VolumeReadRef, VolumeReader},
20 volume_writer::{VolumeWrite, VolumeWriter},
21};
22use parking_lot::{Mutex, MutexGuard};
23use sqlite_plugin::flags::{LockLevel, OpenOpts};
24
25use crate::vfs::ErrCtx;
26
27use super::VfsFile;
28
29const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
31const VERSION_VALID_FOR_NUMBER_OFFSET: usize = 92;
32
33enum VolFileState {
34 Idle,
35 Shared { reader: VolumeReader },
36 Reserved { writer: VolumeWriter },
37 Committing,
38}
39
40impl VolFileState {
41 fn name(&self) -> &'static str {
42 match self {
43 VolFileState::Idle => "Idle",
44 VolFileState::Shared { .. } => "Shared",
45 VolFileState::Reserved { .. } => "Reserved",
46 VolFileState::Committing => "Committing",
47 }
48 }
49}
50
51impl Debug for VolFileState {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
54 VolFileState::Idle => f.write_str("Idle"),
55 VolFileState::Shared { reader } => {
56 f.debug_tuple("Shared").field(reader.snapshot()).finish()
57 }
58 VolFileState::Reserved { writer } => {
59 f.debug_tuple("Reserved").field(writer.snapshot()).finish()
60 }
61 VolFileState::Committing => f.write_str("Committing"),
62 }
63 }
64}
65
66pub struct VolFile {
67 runtime: Runtime,
68 pub tag: String,
69 pub vid: VolumeId,
70 opts: OpenOpts,
71
72 reserved: Arc<Mutex<()>>,
73 state: VolFileState,
74}
75
76impl Debug for VolFile {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("VolFile")
79 .field("tag", &self.tag)
80 .field("vid", &self.vid)
81 .field("state", &self.state)
82 .finish()
83 }
84}
85
86impl VolFile {
87 pub fn new(
88 runtime: Runtime,
89 tag: String,
90 vid: VolumeId,
91 opts: OpenOpts,
92 reserved: Arc<Mutex<()>>,
93 ) -> Self {
94 Self {
95 runtime,
96 tag,
97 vid,
98 opts,
99 reserved,
100 state: VolFileState::Idle,
101 }
102 }
103
104 pub fn snapshot_or_latest(&self) -> Result<Snapshot, ErrCtx> {
105 match &self.state {
106 VolFileState::Idle => self.runtime.volume_snapshot(&self.vid).or_into_ctx(),
107 VolFileState::Shared { reader } => Ok(reader.snapshot().clone()),
108 VolFileState::Reserved { writer } => Ok(writer.snapshot().clone()),
109 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
110 }
111 }
112
113 pub fn page_count(&self) -> Result<PageCount, ErrCtx> {
114 match &self.state {
115 VolFileState::Idle => {
116 let snapshot = self.runtime.volume_snapshot(&self.vid).or_into_ctx()?;
117 self.runtime.snapshot_pages(&snapshot).or_into_ctx()
118 }
119 VolFileState::Shared { reader } => reader.page_count().or_into_ctx(),
120 VolFileState::Reserved { writer } => writer.page_count().or_into_ctx(),
121 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
122 }
123 }
124
125 pub fn is_idle(&self) -> bool {
126 matches!(self.state, VolFileState::Idle)
127 }
128
129 pub fn opts(&self) -> OpenOpts {
130 self.opts
131 }
132
133 pub fn switch_volume(&mut self, vid: &VolumeId) -> Result<(), ErrCtx> {
134 self.runtime
135 .tag_replace(&self.tag, vid.clone())
136 .or_into_ctx()?;
137 self.vid = vid.clone();
138 Ok(())
139 }
140
141 pub fn reader(&self) -> Result<VolumeReadRef<'_>, ErrCtx> {
142 match &self.state {
143 VolFileState::Idle => Ok(VolumeReadRef::Reader(Cow::Owned(
144 self.runtime.volume_reader(self.vid.clone()).or_into_ctx()?,
145 ))),
146 VolFileState::Shared { reader, .. } => Ok(VolumeReadRef::Reader(Cow::Borrowed(reader))),
147 VolFileState::Reserved { writer, .. } => Ok(VolumeReadRef::Writer(writer)),
148 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
149 }
150 }
151}
152
153impl VfsFile for VolFile {
154 fn readonly(&self) -> bool {
155 false
156 }
157
158 fn in_memory(&self) -> bool {
159 false
160 }
161
162 fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
163 match level {
164 LockLevel::Unlocked => {
165 unreachable!("bug: invalid request lock(Unlocked)");
167 }
168 LockLevel::Shared => {
169 if let VolFileState::Idle = self.state {
170 let reader = self.runtime.volume_reader(self.vid.clone()).or_into_ctx()?;
172 self.state = VolFileState::Shared { reader };
173 } else {
174 return Err(Culprit::new_with_note(
175 ErrCtx::InvalidLockTransition,
176 format!("invalid lock request Shared in state {}", self.state.name()),
177 ));
178 }
179 }
180 LockLevel::Reserved => {
181 if let VolFileState::Shared { ref reader } = self.state {
182 if self.opts.mode().is_readonly() {
186 return Err(Culprit::new_with_note(
187 ErrCtx::InvalidLockTransition,
188 "invalid lock request: Shared -> Reserved: file is read-only",
189 ));
190 }
191
192 let Some(reserved) = self.reserved.try_lock() else {
194 return Err(Culprit::new(ErrCtx::Busy));
195 };
196
197 if !self
200 .runtime
201 .snapshot_is_latest(&self.vid, reader.snapshot())
202 .or_into_ctx()?
203 {
204 return Err(Culprit::new_with_note(
205 ErrCtx::BusySnapshot,
206 "unable to lock: Shared -> Reserved: snapshot changed",
207 ));
208 }
209
210 self.state = VolFileState::Reserved {
212 writer: VolumeWriter::try_from(reader.clone()).or_into_ctx()?,
213 };
214
215 MutexGuard::leak(reserved);
218 } else {
219 return Err(Culprit::new_with_note(
220 ErrCtx::InvalidLockTransition,
221 format!(
222 "invalid lock request Reserved in state {}",
223 self.state.name()
224 ),
225 ));
226 }
227 }
228 LockLevel::Pending | LockLevel::Exclusive => {
229 assert!(
231 matches!(self.state, VolFileState::Reserved { .. }),
232 "bug: invalid lock request {:?} in state {}",
233 level,
234 self.state.name()
235 );
236 }
237 }
238
239 Ok(())
240 }
241
242 fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
243 match level {
244 LockLevel::Unlocked => match self.state {
245 VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
246 self.state = VolFileState::Idle;
247 }
248 VolFileState::Reserved { .. } => {
249 return Err(Culprit::new_with_note(
250 ErrCtx::InvalidLockTransition,
251 "invalid unlock request Unlocked in state Reserved",
252 ));
253 }
254 },
255 LockLevel::Shared => {
256 if let VolFileState::Reserved { writer } =
257 mem::replace(&mut self.state, VolFileState::Committing)
258 {
259 let reader = writer.commit().or_into_ctx()?;
265 self.state = VolFileState::Shared { reader };
266
267 assert!(self.reserved.is_locked(), "reserved lock must be locked");
271 unsafe { self.reserved.force_unlock() };
274 } else {
275 return Err(Culprit::new_with_note(
276 ErrCtx::InvalidLockTransition,
277 format!(
278 "invalid unlock request Shared in state {}",
279 self.state.name()
280 ),
281 ));
282 }
283 }
284 LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
285 unreachable!(
287 "bug: invalid unlock request {:?} in state {}",
288 level,
289 self.state.name()
290 );
291 }
292 }
293
294 Ok(())
295 }
296
297 fn file_size(&mut self) -> Result<usize, ErrCtx> {
298 Ok(PAGESIZE.as_usize() * self.page_count()?.to_usize())
299 }
300
301 fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
302 let pageidx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
304 .try_into()
305 .expect("offset out of volume range");
306 let local_offset = offset % PAGESIZE;
308
309 assert!(
310 local_offset + data.len() <= PAGESIZE,
311 "read must not cross page boundary"
312 );
313
314 let page = match &mut self.state {
316 VolFileState::Idle => {
317 let reader = self.runtime.volume_reader(self.vid.clone()).or_into_ctx()?;
321 reader.read_page(pageidx).or_into_ctx()?
322 }
323 VolFileState::Shared { reader } => reader.read_page(pageidx).or_into_ctx()?,
324 VolFileState::Reserved { writer } => writer.read_page(pageidx).or_into_ctx()?,
325 VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
326 };
327
328 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
329 data.copy_from_slice(&page[range]);
330
331 if pageidx == PageIdx::FIRST
334 && local_offset <= FILE_CHANGE_COUNTER_OFFSET
335 && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
336 {
337 let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
339
340 let snapshot = self.snapshot_or_latest()?;
344 let mut hasher = DefaultHasher::new();
345 snapshot.hash(&mut hasher);
346 let hash = hasher.finish();
347 let change_counter = &hash.to_be_bytes()[..4];
348
349 data[fcc_offset..fcc_offset + 4].copy_from_slice(change_counter);
351 }
352
353 Ok(data.len())
354 }
355
356 fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
357 let VolFileState::Reserved { writer, .. } = &mut self.state else {
358 return Err(Culprit::new_with_note(
359 ErrCtx::InvalidVolumeState,
360 "must hold reserved lock to truncate",
361 ));
362 };
363
364 assert_eq!(
365 size % PAGESIZE.as_usize(),
366 0,
367 "size must be an even multiple of {PAGESIZE}"
368 );
369
370 let pages: PageCount = (size / PAGESIZE.as_usize())
371 .try_into()
372 .expect("size too large");
373
374 writer.truncate(pages).or_into_ctx()?;
375 Ok(())
376 }
377
378 fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
379 let VolFileState::Reserved { writer, .. } = &mut self.state else {
380 return Err(Culprit::new_with_note(
381 ErrCtx::InvalidVolumeState,
382 "must hold reserved lock to write",
383 ));
384 };
385
386 let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
388 .try_into()
389 .expect("offset out of volume range");
390 let local_offset = offset % PAGESIZE;
392
393 assert!(
394 local_offset + data.len() <= PAGESIZE,
395 "write must not cross page boundary"
396 );
397
398 if page_idx == PageIdx::FIRST && data.len() == PAGESIZE && local_offset == 0 {
401 let existing: Page = writer.read_page(page_idx).or_into_ctx()?;
402
403 debug_assert_eq!(data.len(), existing.len(), "page size mismatch");
404
405 let fcc = FILE_CHANGE_COUNTER_OFFSET..FILE_CHANGE_COUNTER_OFFSET + 4;
406 let vvf = VERSION_VALID_FOR_NUMBER_OFFSET..VERSION_VALID_FOR_NUMBER_OFFSET + 4;
407
408 let unchanged =
411 data[..fcc.start] == existing[..fcc.start] &&
413 data[fcc.end..vvf.start] == existing[fcc.end..vvf.start] &&
415 data[vvf.end..] == existing[vvf.end..];
417
418 if unchanged {
419 tracing::trace!(
420 "ignoring write to header page, as only file change counter and version valid for number changed"
421 );
422 return Ok(data.len());
423 }
424 }
425
426 let page = if data.len() == PAGESIZE {
427 Page::try_from(data).expect("data is a full page")
429 } else {
430 let mut page: BytesMut = writer.read_page(page_idx).or_into_ctx()?.into();
433 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
435 page[range].copy_from_slice(data);
436 page.try_into().expect("we did not change the page size")
437 };
438
439 writer.write_page(page_idx, page).or_into_ctx()?;
440 Ok(data.len())
441 }
442}