1use std::fs::File;
2use std::io::{Read, Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5#[cfg(test)]
6use std::{cell::Cell, rc::Rc};
7
8use crate::config::OpenMode;
9use crate::error::DbError;
10
11pub trait Store {
16 fn len(&self) -> Result<u64, DbError>;
17 fn is_empty(&self) -> Result<bool, DbError> {
18 Ok(self.len()? == 0)
19 }
20 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError>;
21 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError>;
22 fn sync(&mut self) -> Result<(), DbError>;
23 fn truncate(&mut self, len: u64) -> Result<(), DbError>;
25}
26
27#[derive(Debug)]
30struct RawFileStore {
31 file: File,
32}
33
34impl RawFileStore {
35 fn new(file: File) -> Self {
36 Self { file }
37 }
38}
39
40impl Store for RawFileStore {
41 fn len(&self) -> Result<u64, DbError> {
42 Ok(self.file.metadata()?.len())
43 }
44
45 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
46 self.file.seek(SeekFrom::Start(offset))?;
47 self.file.read_exact(buf)?;
48 Ok(())
49 }
50
51 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
52 self.file.seek(SeekFrom::Start(offset))?;
53 self.file.write_all(buf)?;
54 Ok(())
55 }
56
57 fn sync(&mut self) -> Result<(), DbError> {
58 self.file.sync_all()?;
59 Ok(())
60 }
61
62 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
63 self.file.set_len(len)?;
64 Ok(())
65 }
66}
67
68#[derive(Debug)]
73pub struct FileStore {
74 inner: crate::pager::PagedStore<RawFileStore>,
75 _writer_lock: Option<WriterLockGuard>,
76 _reader_lock: Option<File>,
77 #[cfg(test)]
79 test_write_counter: Option<Rc<Cell<usize>>>,
80 #[cfg(test)]
82 test_write_budget_remaining: Option<Rc<Cell<usize>>>,
83}
84
85#[derive(Debug)]
86struct WriterLockState {
87 _file: File,
88 refs: usize,
89}
90
91static WRITER_LOCKS: OnceLock<Mutex<std::collections::HashMap<PathBuf, WriterLockState>>> =
92 OnceLock::new();
93
94fn writer_locks() -> &'static Mutex<std::collections::HashMap<PathBuf, WriterLockState>> {
95 WRITER_LOCKS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
96}
97
98#[derive(Debug)]
99struct WriterLockGuard {
100 lock_path: PathBuf,
101}
102
103impl Drop for WriterLockGuard {
104 fn drop(&mut self) {
105 let mut g = writer_locks().lock().unwrap_or_else(|e| e.into_inner());
106 if let Some(st) = g.get_mut(&self.lock_path) {
107 st.refs = st.refs.saturating_sub(1);
108 }
109 if g.get(&self.lock_path).is_some_and(|s| s.refs == 0) {
110 g.remove(&self.lock_path);
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 include!(concat!(
118 env!("CARGO_MANIFEST_DIR"),
119 "/tests/unit/src_storage_tests.rs"
120 ));
121}
122
123impl FileStore {
124 pub(crate) fn release_writer_lock(&mut self) {
126 self._writer_lock = None;
127 }
128
129 pub fn new(file: File) -> Self {
130 Self {
131 inner: crate::pager::PagedStore::new(
132 RawFileStore::new(file),
133 crate::pager::DEFAULT_PAGE_SIZE,
134 ),
135 _writer_lock: None,
136 _reader_lock: None,
137 #[cfg(test)]
138 test_write_counter: None,
139 #[cfg(test)]
140 test_write_budget_remaining: None,
141 }
142 }
143
144 #[cfg(test)]
145 pub(crate) fn new_for_test(
146 file: File,
147 write_counter: Option<Rc<Cell<usize>>>,
148 write_budget_remaining: Option<Rc<Cell<usize>>>,
149 ) -> Self {
150 Self {
151 inner: crate::pager::PagedStore::new(
152 RawFileStore::new(file),
153 crate::pager::DEFAULT_PAGE_SIZE,
154 ),
155 _writer_lock: None,
156 _reader_lock: None,
157 test_write_counter: write_counter,
158 test_write_budget_remaining: write_budget_remaining,
159 }
160 }
161
162 fn lock_path_for_db_path(db_path: &Path) -> PathBuf {
163 PathBuf::from(format!("{}.writer.lock", db_path.display()))
166 }
167
168 pub fn open_locked(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, DbError> {
179 use fs2::FileExt;
180
181 let path = path.as_ref();
182 let file = match mode {
183 OpenMode::ReadWrite => std::fs::OpenOptions::new()
184 .read(true)
185 .write(true)
186 .create(true)
187 .truncate(false)
188 .open(path)?,
189 OpenMode::ReadOnly => std::fs::OpenOptions::new().read(true).open(path)?,
190 };
191
192 let lock_path = Self::lock_path_for_db_path(path);
193
194 if mode == OpenMode::ReadWrite {
195 let already_writer = writer_locks()
196 .lock()
197 .ok()
198 .and_then(|g| g.get(&lock_path).map(|_| ()))
199 .is_some();
200 if already_writer {
201 return Err(DbError::Io(std::io::Error::new(
202 std::io::ErrorKind::AlreadyExists,
203 format!(
204 "writable storage already open for this path in this process: {}",
205 path.display()
206 ),
207 )));
208 }
209 if let Err(e) = file.try_lock_exclusive() {
210 return Err(DbError::Io(std::io::Error::new(
211 std::io::ErrorKind::WouldBlock,
212 format!("database file is locked by another process: {e}"),
213 )));
214 }
215 }
216
217 let writer_lock = match mode {
218 OpenMode::ReadOnly => None,
219 OpenMode::ReadWrite => {
220 let mut g = writer_locks()
221 .lock()
222 .map_err(|_| std::io::Error::other("lock poisoned"))?;
223 if let Some(st) = g.get_mut(&lock_path) {
224 st.refs = st.refs.saturating_add(1);
225 Some(WriterLockGuard {
226 lock_path: lock_path.clone(),
227 })
228 } else {
229 let lock_file = std::fs::OpenOptions::new()
230 .read(true)
231 .write(true)
232 .create(true)
233 .truncate(false)
234 .open(&lock_path)?;
235 lock_file.try_lock_exclusive()?;
237 g.insert(
238 lock_path.clone(),
239 WriterLockState {
240 _file: lock_file,
241 refs: 1,
242 },
243 );
244 Some(WriterLockGuard {
245 lock_path: lock_path.clone(),
246 })
247 }
248 }
249 };
250
251 let reader_lock = match mode {
252 OpenMode::ReadWrite => None,
253 OpenMode::ReadOnly => {
254 let already_writer = writer_locks()
260 .lock()
261 .ok()
262 .and_then(|g| g.get(&lock_path).map(|_| ()))
263 .is_some();
264 if already_writer {
267 return Ok(Self {
268 inner: crate::pager::PagedStore::new(
269 RawFileStore::new(file),
270 crate::pager::DEFAULT_PAGE_SIZE,
271 ),
272 _writer_lock: None,
273 _reader_lock: None,
274 #[cfg(test)]
275 test_write_counter: None,
276 #[cfg(test)]
277 test_write_budget_remaining: None,
278 });
279 }
280
281 let lock_file = std::fs::OpenOptions::new()
282 .read(true)
283 .write(true)
284 .create(true)
285 .truncate(false)
286 .open(&lock_path)?;
287 match lock_file.try_lock_shared() {
288 Ok(()) => Some(lock_file),
289 Err(std::fs::TryLockError::WouldBlock)
290 | Err(std::fs::TryLockError::Error(_)) => {
291 return Err(DbError::Io(std::io::Error::new(
292 std::io::ErrorKind::WouldBlock,
293 "database is locked by another process",
294 )));
295 }
296 }
297 }
298 };
299
300 Ok(Self {
301 inner: crate::pager::PagedStore::new(
302 RawFileStore::new(file),
303 crate::pager::DEFAULT_PAGE_SIZE,
304 ),
305 _writer_lock: writer_lock,
306 _reader_lock: reader_lock,
307 #[cfg(test)]
308 test_write_counter: None,
309 #[cfg(test)]
310 test_write_budget_remaining: None,
311 })
312 }
313}
314
315impl Store for FileStore {
316 fn len(&self) -> Result<u64, DbError> {
317 self.inner.len()
318 }
319
320 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
321 self.inner.read_exact_at(offset, buf)
322 }
323
324 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
325 #[cfg(test)]
326 {
327 if let Some(c) = &self.test_write_counter {
328 c.set(c.get().saturating_add(1));
329 }
330 if let Some(budget) = &self.test_write_budget_remaining {
331 let r = budget.get();
332 if r == 0 {
333 return Err(DbError::Io(std::io::Error::other(
334 "FileStore write budget exhausted (test instrumentation)",
335 )));
336 }
337 budget.set(r - 1);
338 }
339 }
340 self.inner.write_all_at(offset, buf)
341 }
342
343 fn sync(&mut self) -> Result<(), DbError> {
344 self.inner.sync()
345 }
346
347 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
348 self.inner.truncate(len)
349 }
350}
351
352#[derive(Debug, Default)]
354pub struct VecStore {
355 buf: Vec<u8>,
356}
357
358impl VecStore {
359 pub fn new() -> Self {
360 Self { buf: Vec::new() }
361 }
362
363 pub fn into_inner(self) -> Vec<u8> {
364 self.buf
365 }
366
367 pub fn from_vec(buf: Vec<u8>) -> Self {
368 Self { buf }
369 }
370
371 pub fn as_slice(&self) -> &[u8] {
373 &self.buf
374 }
375
376 fn ensure_len(&mut self, end: u64) {
377 let need = end as usize;
378 if self.buf.len() < need {
379 self.buf.resize(need, 0);
380 }
381 }
382}
383
384impl Store for VecStore {
385 fn len(&self) -> Result<u64, DbError> {
386 Ok(self.buf.len() as u64)
387 }
388
389 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
390 let start = offset as usize;
391 let end = start.saturating_add(buf.len());
392 if end > self.buf.len() {
393 return Err(DbError::Io(std::io::Error::new(
394 std::io::ErrorKind::UnexpectedEof,
395 "read past end of VecStore",
396 )));
397 }
398 buf.copy_from_slice(&self.buf[start..end]);
399 Ok(())
400 }
401
402 fn write_all_at(&mut self, offset: u64, data: &[u8]) -> Result<(), DbError> {
403 let end = offset
404 .checked_add(data.len() as u64)
405 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "overflow"))?;
406 self.ensure_len(end);
407 let start = offset as usize;
408 self.buf[start..start + data.len()].copy_from_slice(data);
409 Ok(())
410 }
411
412 fn sync(&mut self) -> Result<(), DbError> {
413 Ok(())
414 }
415
416 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
417 self.buf.truncate(len as usize);
418 Ok(())
419 }
420}