1use std::{
2 fmt::Debug,
3 hash::{DefaultHasher, Hash, Hasher},
4 mem,
5 sync::Arc,
6};
7
8use bytes::BytesMut;
9use culprit::{Culprit, Result, ResultExt};
10use graft_core::{
11 PageIdx, VolumeId,
12 page::{PAGESIZE, Page},
13 page_count::PageCount,
14};
15use graft_kernel::{
16 graft_reader::{GraftRead, GraftReader},
17 graft_writer::{GraftWrite, GraftWriter},
18 rt::runtime::Runtime,
19 snapshot::Snapshot,
20};
21use parking_lot::{Mutex, MutexGuard};
22use sqlite_plugin::flags::{LockLevel, OpenOpts};
23
24use crate::vfs::ErrCtx;
25
26use super::VfsFile;
27
28const FILE_CHANGE_COUNTER_OFFSET: usize = 24;
30const VERSION_VALID_FOR_NUMBER_OFFSET: usize = 92;
31
32enum VolFileState {
33 Idle,
34 Shared { reader: GraftReader },
35 Reserved { writer: GraftWriter },
36 Committing,
37}
38
39impl VolFileState {
40 fn name(&self) -> &'static str {
41 match self {
42 VolFileState::Idle => "Idle",
43 VolFileState::Shared { .. } => "Shared",
44 VolFileState::Reserved { .. } => "Reserved",
45 VolFileState::Committing => "Committing",
46 }
47 }
48}
49
50impl Debug for VolFileState {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 VolFileState::Idle => f.write_str("Idle"),
54 VolFileState::Shared { reader } => {
55 f.debug_tuple("Shared").field(reader.snapshot()).finish()
56 }
57 VolFileState::Reserved { writer } => {
58 f.debug_tuple("Reserved").field(writer.snapshot()).finish()
59 }
60 VolFileState::Committing => f.write_str("Committing"),
61 }
62 }
63}
64
65pub struct VolFile {
66 runtime: Runtime,
67 pub tag: String,
68 pub graft: VolumeId,
69 opts: OpenOpts,
70
71 reserved: Arc<Mutex<()>>,
72 state: VolFileState,
73}
74
75impl Debug for VolFile {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 f.debug_struct("VolFile")
78 .field("tag", &self.tag)
79 .field("graft", &self.graft)
80 .field("state", &self.state)
81 .finish()
82 }
83}
84
85impl VolFile {
86 pub fn new(
87 runtime: Runtime,
88 tag: String,
89 graft: VolumeId,
90 opts: OpenOpts,
91 reserved: Arc<Mutex<()>>,
92 ) -> Self {
93 Self {
94 runtime,
95 tag,
96 graft,
97 opts,
98 reserved,
99 state: VolFileState::Idle,
100 }
101 }
102
103 pub fn snapshot_or_latest(&self) -> Result<Snapshot, ErrCtx> {
104 match &self.state {
105 VolFileState::Idle => self.runtime.graft_snapshot(&self.graft).or_into_ctx(),
106 VolFileState::Shared { reader } => Ok(reader.snapshot().clone()),
107 VolFileState::Reserved { writer } => Ok(writer.snapshot().clone()),
108 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
109 }
110 }
111
112 pub fn page_count(&self) -> Result<PageCount, ErrCtx> {
113 match &self.state {
114 VolFileState::Idle => {
115 let snapshot = self.runtime.graft_snapshot(&self.graft).or_into_ctx()?;
116 self.runtime.snapshot_pages(&snapshot).or_into_ctx()
117 }
118 VolFileState::Shared { reader } => reader.page_count().or_into_ctx(),
119 VolFileState::Reserved { writer } => writer.page_count().or_into_ctx(),
120 VolFileState::Committing => ErrCtx::InvalidVolumeState.into(),
121 }
122 }
123
124 pub fn is_idle(&self) -> bool {
125 matches!(self.state, VolFileState::Idle)
126 }
127
128 pub fn opts(&self) -> OpenOpts {
129 self.opts
130 }
131
132 pub fn switch_graft(&mut self, graft: &VolumeId) -> Result<(), ErrCtx> {
133 self.runtime
134 .tag_replace(&self.tag, graft.clone())
135 .or_into_ctx()?;
136 self.graft = graft.clone();
137 Ok(())
138 }
139}
140
141impl VfsFile for VolFile {
142 fn readonly(&self) -> bool {
143 false
144 }
145
146 fn in_memory(&self) -> bool {
147 false
148 }
149
150 fn lock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
151 match level {
152 LockLevel::Unlocked => {
153 unreachable!("bug: invalid request lock(Unlocked)");
155 }
156 LockLevel::Shared => {
157 if let VolFileState::Idle = self.state {
158 let reader = self
160 .runtime
161 .graft_reader(self.graft.clone())
162 .or_into_ctx()?;
163 self.state = VolFileState::Shared { reader };
164 } else {
165 return Err(Culprit::new_with_note(
166 ErrCtx::InvalidLockTransition,
167 format!("invalid lock request Shared in state {}", self.state.name()),
168 ));
169 }
170 }
171 LockLevel::Reserved => {
172 if let VolFileState::Shared { ref reader } = self.state {
173 if self.opts.mode().is_readonly() {
177 return Err(Culprit::new_with_note(
178 ErrCtx::InvalidLockTransition,
179 "invalid lock request: Shared -> Reserved: file is read-only",
180 ));
181 }
182
183 let Some(reserved) = self.reserved.try_lock() else {
185 return Err(Culprit::new(ErrCtx::Busy));
186 };
187
188 if !self
191 .runtime
192 .snapshot_is_latest(&self.graft, reader.snapshot())
193 .or_into_ctx()?
194 {
195 return Err(Culprit::new_with_note(
196 ErrCtx::BusySnapshot,
197 "unable to lock: Shared -> Reserved: snapshot changed",
198 ));
199 }
200
201 self.state = VolFileState::Reserved {
203 writer: GraftWriter::try_from(reader.clone()).or_into_ctx()?,
204 };
205
206 MutexGuard::leak(reserved);
209 } else {
210 return Err(Culprit::new_with_note(
211 ErrCtx::InvalidLockTransition,
212 format!(
213 "invalid lock request Reserved in state {}",
214 self.state.name()
215 ),
216 ));
217 }
218 }
219 LockLevel::Pending | LockLevel::Exclusive => {
220 assert!(
222 matches!(self.state, VolFileState::Reserved { .. }),
223 "bug: invalid lock request {:?} in state {}",
224 level,
225 self.state.name()
226 );
227 }
228 }
229
230 Ok(())
231 }
232
233 fn unlock(&mut self, level: LockLevel) -> Result<(), ErrCtx> {
234 match level {
235 LockLevel::Unlocked => match self.state {
236 VolFileState::Idle | VolFileState::Shared { .. } | VolFileState::Committing => {
237 self.state = VolFileState::Idle;
238 }
239 VolFileState::Reserved { .. } => {
240 return Err(Culprit::new_with_note(
241 ErrCtx::InvalidLockTransition,
242 "invalid unlock request Unlocked in state Reserved",
243 ));
244 }
245 },
246 LockLevel::Shared => {
247 if let VolFileState::Reserved { writer } =
248 mem::replace(&mut self.state, VolFileState::Committing)
249 {
250 let reader = writer.commit().or_into_ctx()?;
256 self.state = VolFileState::Shared { reader };
257
258 assert!(self.reserved.is_locked(), "reserved lock must be locked");
262 unsafe { self.reserved.force_unlock() };
265 } else {
266 return Err(Culprit::new_with_note(
267 ErrCtx::InvalidLockTransition,
268 format!(
269 "invalid unlock request Shared in state {}",
270 self.state.name()
271 ),
272 ));
273 }
274 }
275 LockLevel::Reserved | LockLevel::Pending | LockLevel::Exclusive => {
276 unreachable!(
278 "bug: invalid unlock request {:?} in state {}",
279 level,
280 self.state.name()
281 );
282 }
283 }
284
285 Ok(())
286 }
287
288 fn file_size(&mut self) -> Result<usize, ErrCtx> {
289 Ok(PAGESIZE.as_usize() * self.page_count()?.to_usize())
290 }
291
292 fn read(&mut self, offset: usize, data: &mut [u8]) -> Result<usize, ErrCtx> {
293 let pageidx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
295 .try_into()
296 .expect("offset out of volume range");
297 let local_offset = offset % PAGESIZE;
299
300 assert!(
301 local_offset + data.len() <= PAGESIZE,
302 "read must not cross page boundary"
303 );
304
305 let page = match &mut self.state {
307 VolFileState::Idle => {
308 let reader = self
312 .runtime
313 .graft_reader(self.graft.clone())
314 .or_into_ctx()?;
315 reader.read_page(pageidx).or_into_ctx()?
316 }
317 VolFileState::Shared { reader } => reader.read_page(pageidx).or_into_ctx()?,
318 VolFileState::Reserved { writer } => writer.read_page(pageidx).or_into_ctx()?,
319 VolFileState::Committing => return ErrCtx::InvalidVolumeState.into(),
320 };
321
322 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
323 data.copy_from_slice(&page[range]);
324
325 if pageidx == PageIdx::FIRST
328 && local_offset <= FILE_CHANGE_COUNTER_OFFSET
329 && local_offset + data.len() >= FILE_CHANGE_COUNTER_OFFSET + 4
330 {
331 let fcc_offset = FILE_CHANGE_COUNTER_OFFSET - local_offset.as_usize();
333
334 let snapshot = self.snapshot_or_latest()?;
338 let mut hasher = DefaultHasher::new();
339 snapshot.hash(&mut hasher);
340 let hash = hasher.finish();
341 let change_counter = &hash.to_be_bytes()[..4];
342
343 data[fcc_offset..fcc_offset + 4].copy_from_slice(change_counter);
345 }
346
347 Ok(data.len())
348 }
349
350 fn truncate(&mut self, size: usize) -> Result<(), ErrCtx> {
351 let VolFileState::Reserved { writer, .. } = &mut self.state else {
352 return Err(Culprit::new_with_note(
353 ErrCtx::InvalidVolumeState,
354 "must hold reserved lock to truncate",
355 ));
356 };
357
358 assert_eq!(
359 size % PAGESIZE.as_usize(),
360 0,
361 "size must be an even multiple of {PAGESIZE}"
362 );
363
364 let pages: PageCount = (size / PAGESIZE.as_usize())
365 .try_into()
366 .expect("size too large");
367
368 writer.truncate(pages).or_into_ctx()?;
369 Ok(())
370 }
371
372 fn write(&mut self, offset: usize, data: &[u8]) -> Result<usize, ErrCtx> {
373 let VolFileState::Reserved { writer, .. } = &mut self.state else {
374 return Err(Culprit::new_with_note(
375 ErrCtx::InvalidVolumeState,
376 "must hold reserved lock to write",
377 ));
378 };
379
380 let page_idx: PageIdx = ((offset / PAGESIZE.as_usize()) + 1)
382 .try_into()
383 .expect("offset out of volume range");
384 let local_offset = offset % PAGESIZE;
386
387 assert!(
388 local_offset + data.len() <= PAGESIZE,
389 "write must not cross page boundary"
390 );
391
392 if page_idx == PageIdx::FIRST && data.len() == PAGESIZE && local_offset == 0 {
395 let existing: Page = writer.read_page(page_idx).or_into_ctx()?;
396
397 debug_assert_eq!(data.len(), existing.len(), "page size mismatch");
398
399 let fcc = FILE_CHANGE_COUNTER_OFFSET..FILE_CHANGE_COUNTER_OFFSET + 4;
400 let vvf = VERSION_VALID_FOR_NUMBER_OFFSET..VERSION_VALID_FOR_NUMBER_OFFSET + 4;
401
402 let unchanged =
405 data[..fcc.start] == existing[..fcc.start] &&
407 data[fcc.end..vvf.start] == existing[fcc.end..vvf.start] &&
409 data[vvf.end..] == existing[vvf.end..];
411
412 if unchanged {
413 tracing::trace!(
414 "ignoring write to header page, as only file change counter and version valid for number changed"
415 );
416 return Ok(data.len());
417 }
418 }
419
420 let page = if data.len() == PAGESIZE {
421 Page::try_from(data).expect("data is a full page")
423 } else {
424 let mut page: BytesMut = writer.read_page(page_idx).or_into_ctx()?.into();
427 let range = local_offset.as_usize()..(local_offset + data.len()).as_usize();
429 page[range].copy_from_slice(data);
430 page.try_into().expect("we did not change the page size")
431 };
432
433 writer.write_page(page_idx, page).or_into_ctx()?;
434 Ok(data.len())
435 }
436}