1use std::fs::File;
2use std::io::{Read, Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5#[cfg(test)]
6use std::{cell::Cell, rc::Rc};
7
8use crate::config::OpenMode;
9use crate::error::DbError;
10
11pub trait Store {
16 fn len(&self) -> Result<u64, DbError>;
17 fn is_empty(&self) -> Result<bool, DbError> {
18 Ok(self.len()? == 0)
19 }
20 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError>;
21 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError>;
22 fn sync(&mut self) -> Result<(), DbError>;
23 fn truncate(&mut self, len: u64) -> Result<(), DbError>;
25}
26
27#[derive(Debug)]
30struct RawFileStore {
31 file: File,
32}
33
34impl RawFileStore {
35 fn new(file: File) -> Self {
36 Self { file }
37 }
38}
39
40impl Store for RawFileStore {
41 fn len(&self) -> Result<u64, DbError> {
42 Ok(self.file.metadata()?.len())
43 }
44
45 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
46 self.file.seek(SeekFrom::Start(offset))?;
47 self.file.read_exact(buf)?;
48 Ok(())
49 }
50
51 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
52 self.file.seek(SeekFrom::Start(offset))?;
53 self.file.write_all(buf)?;
54 Ok(())
55 }
56
57 fn sync(&mut self) -> Result<(), DbError> {
58 self.file.sync_all()?;
59 Ok(())
60 }
61
62 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
63 self.file.set_len(len)?;
64 Ok(())
65 }
66}
67
68#[derive(Debug)]
73pub struct FileStore {
74 inner: crate::pager::PagedStore<RawFileStore>,
75 _writer_lock: Option<WriterLockGuard>,
76 _reader_lock: Option<File>,
77 #[cfg(test)]
79 test_write_counter: Option<Rc<Cell<usize>>>,
80 #[cfg(test)]
82 test_write_budget_remaining: Option<Rc<Cell<usize>>>,
83}
84
85#[derive(Debug)]
86struct WriterLockState {
87 _file: File,
88 refs: usize,
89}
90
91static WRITER_LOCKS: OnceLock<Mutex<std::collections::HashMap<PathBuf, WriterLockState>>> =
92 OnceLock::new();
93
94fn writer_locks() -> &'static Mutex<std::collections::HashMap<PathBuf, WriterLockState>> {
95 WRITER_LOCKS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
96}
97
98#[derive(Debug)]
99struct WriterLockGuard {
100 lock_path: PathBuf,
101}
102
103impl Drop for WriterLockGuard {
104 fn drop(&mut self) {
105 let mut g = writer_locks().lock().unwrap_or_else(|e| e.into_inner());
106 if let Some(st) = g.get_mut(&self.lock_path) {
107 st.refs = st.refs.saturating_sub(1);
108 }
109 if g.get(&self.lock_path).is_some_and(|s| s.refs == 0) {
110 g.remove(&self.lock_path);
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 include!(concat!(
118 env!("CARGO_MANIFEST_DIR"),
119 "/tests/unit/src_storage_tests.rs"
120 ));
121}
122
123impl FileStore {
124 pub fn new(file: File) -> Self {
125 Self {
126 inner: crate::pager::PagedStore::new(
127 RawFileStore::new(file),
128 crate::pager::DEFAULT_PAGE_SIZE,
129 ),
130 _writer_lock: None,
131 _reader_lock: None,
132 #[cfg(test)]
133 test_write_counter: None,
134 #[cfg(test)]
135 test_write_budget_remaining: None,
136 }
137 }
138
139 #[cfg(test)]
140 pub(crate) fn new_for_test(
141 file: File,
142 write_counter: Option<Rc<Cell<usize>>>,
143 write_budget_remaining: Option<Rc<Cell<usize>>>,
144 ) -> Self {
145 Self {
146 inner: crate::pager::PagedStore::new(
147 RawFileStore::new(file),
148 crate::pager::DEFAULT_PAGE_SIZE,
149 ),
150 _writer_lock: None,
151 _reader_lock: None,
152 test_write_counter: write_counter,
153 test_write_budget_remaining: write_budget_remaining,
154 }
155 }
156
157 fn lock_path_for_db_path(db_path: &Path) -> PathBuf {
158 PathBuf::from(format!("{}.writer.lock", db_path.display()))
161 }
162
163 pub fn open_locked(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, DbError> {
174 use fs2::FileExt;
175
176 let path = path.as_ref();
177 let file = match mode {
178 OpenMode::ReadWrite => std::fs::OpenOptions::new()
179 .read(true)
180 .write(true)
181 .create(true)
182 .truncate(false)
183 .open(path)?,
184 OpenMode::ReadOnly => std::fs::OpenOptions::new().read(true).open(path)?,
185 };
186
187 let lock_path = Self::lock_path_for_db_path(path);
188
189 let writer_lock = match mode {
190 OpenMode::ReadOnly => None,
191 OpenMode::ReadWrite => {
192 let mut g = writer_locks()
193 .lock()
194 .map_err(|_| std::io::Error::other("lock poisoned"))?;
195 if let Some(st) = g.get_mut(&lock_path) {
196 st.refs = st.refs.saturating_add(1);
197 Some(WriterLockGuard {
198 lock_path: lock_path.clone(),
199 })
200 } else {
201 let lock_file = std::fs::OpenOptions::new()
202 .read(true)
203 .write(true)
204 .create(true)
205 .truncate(false)
206 .open(&lock_path)?;
207 lock_file.try_lock_exclusive()?;
209 g.insert(
210 lock_path.clone(),
211 WriterLockState {
212 _file: lock_file,
213 refs: 1,
214 },
215 );
216 Some(WriterLockGuard {
217 lock_path: lock_path.clone(),
218 })
219 }
220 }
221 };
222
223 let reader_lock = match mode {
224 OpenMode::ReadWrite => None,
225 OpenMode::ReadOnly => {
226 let already_writer = writer_locks()
232 .lock()
233 .ok()
234 .and_then(|g| g.get(&lock_path).map(|_| ()))
235 .is_some();
236 if already_writer {
239 return Ok(Self {
240 inner: crate::pager::PagedStore::new(
241 RawFileStore::new(file),
242 crate::pager::DEFAULT_PAGE_SIZE,
243 ),
244 _writer_lock: None,
245 _reader_lock: None,
246 #[cfg(test)]
247 test_write_counter: None,
248 #[cfg(test)]
249 test_write_budget_remaining: None,
250 });
251 }
252
253 let lock_file = std::fs::OpenOptions::new()
254 .read(true)
255 .write(true)
256 .create(true)
257 .truncate(false)
258 .open(&lock_path)?;
259 match lock_file.try_lock_shared() {
260 Ok(()) => Some(lock_file),
261 Err(std::fs::TryLockError::WouldBlock)
262 | Err(std::fs::TryLockError::Error(_)) => {
263 return Err(DbError::Io(std::io::Error::new(
264 std::io::ErrorKind::WouldBlock,
265 "database is locked by another process",
266 )));
267 }
268 }
269 }
270 };
271
272 Ok(Self {
273 inner: crate::pager::PagedStore::new(
274 RawFileStore::new(file),
275 crate::pager::DEFAULT_PAGE_SIZE,
276 ),
277 _writer_lock: writer_lock,
278 _reader_lock: reader_lock,
279 #[cfg(test)]
280 test_write_counter: None,
281 #[cfg(test)]
282 test_write_budget_remaining: None,
283 })
284 }
285}
286
287impl Store for FileStore {
288 fn len(&self) -> Result<u64, DbError> {
289 self.inner.len()
290 }
291
292 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
293 self.inner.read_exact_at(offset, buf)
294 }
295
296 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
297 #[cfg(test)]
298 {
299 if let Some(c) = &self.test_write_counter {
300 c.set(c.get().saturating_add(1));
301 }
302 if let Some(budget) = &self.test_write_budget_remaining {
303 let r = budget.get();
304 if r == 0 {
305 return Err(DbError::Io(std::io::Error::other(
306 "FileStore write budget exhausted (test instrumentation)",
307 )));
308 }
309 budget.set(r - 1);
310 }
311 }
312 self.inner.write_all_at(offset, buf)
313 }
314
315 fn sync(&mut self) -> Result<(), DbError> {
316 self.inner.sync()
317 }
318
319 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
320 self.inner.truncate(len)
321 }
322}
323
324#[derive(Debug, Default)]
326pub struct VecStore {
327 buf: Vec<u8>,
328}
329
330impl VecStore {
331 pub fn new() -> Self {
332 Self { buf: Vec::new() }
333 }
334
335 pub fn into_inner(self) -> Vec<u8> {
336 self.buf
337 }
338
339 pub fn from_vec(buf: Vec<u8>) -> Self {
340 Self { buf }
341 }
342
343 pub fn as_slice(&self) -> &[u8] {
345 &self.buf
346 }
347
348 fn ensure_len(&mut self, end: u64) {
349 let need = end as usize;
350 if self.buf.len() < need {
351 self.buf.resize(need, 0);
352 }
353 }
354}
355
356impl Store for VecStore {
357 fn len(&self) -> Result<u64, DbError> {
358 Ok(self.buf.len() as u64)
359 }
360
361 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
362 let start = offset as usize;
363 let end = start.saturating_add(buf.len());
364 if end > self.buf.len() {
365 return Err(DbError::Io(std::io::Error::new(
366 std::io::ErrorKind::UnexpectedEof,
367 "read past end of VecStore",
368 )));
369 }
370 buf.copy_from_slice(&self.buf[start..end]);
371 Ok(())
372 }
373
374 fn write_all_at(&mut self, offset: u64, data: &[u8]) -> Result<(), DbError> {
375 let end = offset
376 .checked_add(data.len() as u64)
377 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "overflow"))?;
378 self.ensure_len(end);
379 let start = offset as usize;
380 self.buf[start..start + data.len()].copy_from_slice(data);
381 Ok(())
382 }
383
384 fn sync(&mut self) -> Result<(), DbError> {
385 Ok(())
386 }
387
388 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
389 self.buf.truncate(len as usize);
390 Ok(())
391 }
392}