1use std::fs::File;
2use std::io::{Read, Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5#[cfg(test)]
6use std::{cell::Cell, rc::Rc};
7
8use crate::config::OpenMode;
9use crate::error::DbError;
10
11pub trait Store {
16 fn len(&self) -> Result<u64, DbError>;
17 fn is_empty(&self) -> Result<bool, DbError> {
18 Ok(self.len()? == 0)
19 }
20 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError>;
21 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError>;
22 fn sync(&mut self) -> Result<(), DbError>;
23 fn truncate(&mut self, len: u64) -> Result<(), DbError>;
25}
26
27#[derive(Debug)]
30struct RawFileStore {
31 file: File,
32}
33
34impl RawFileStore {
35 fn new(file: File) -> Self {
36 Self { file }
37 }
38}
39
40impl Store for RawFileStore {
41 fn len(&self) -> Result<u64, DbError> {
42 Ok(self.file.metadata()?.len())
43 }
44
45 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
46 self.file.seek(SeekFrom::Start(offset))?;
47 self.file.read_exact(buf)?;
48 Ok(())
49 }
50
51 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
52 self.file.seek(SeekFrom::Start(offset))?;
53 self.file.write_all(buf)?;
54 Ok(())
55 }
56
57 fn sync(&mut self) -> Result<(), DbError> {
58 self.file.sync_all()?;
59 Ok(())
60 }
61
62 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
63 self.file.set_len(len)?;
64 Ok(())
65 }
66}
67
68#[derive(Debug)]
73pub struct FileStore {
74 inner: crate::pager::PagedStore<RawFileStore>,
75 _writer_lock: Option<WriterLockGuard>,
76 _reader_lock: Option<File>,
77 #[cfg(test)]
79 test_write_counter: Option<Rc<Cell<usize>>>,
80 #[cfg(test)]
82 test_write_budget_remaining: Option<Rc<Cell<usize>>>,
83}
84
85#[derive(Debug)]
86struct WriterLockState {
87 _file: File,
88 refs: usize,
89}
90
91static WRITER_LOCKS: OnceLock<Mutex<std::collections::HashMap<PathBuf, WriterLockState>>> =
92 OnceLock::new();
93
94fn writer_locks() -> &'static Mutex<std::collections::HashMap<PathBuf, WriterLockState>> {
95 WRITER_LOCKS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
96}
97
98#[derive(Debug)]
99struct WriterLockGuard {
100 lock_path: PathBuf,
101}
102
103impl Drop for WriterLockGuard {
104 fn drop(&mut self) {
105 let mut g = writer_locks().lock().unwrap_or_else(|e| e.into_inner());
106 if let Some(st) = g.get_mut(&self.lock_path) {
107 st.refs = st.refs.saturating_sub(1);
108 }
109 if g.get(&self.lock_path).is_some_and(|s| s.refs == 0) {
110 g.remove(&self.lock_path);
111 }
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 include!(concat!(
118 env!("CARGO_MANIFEST_DIR"),
119 "/tests/unit/src_storage_tests.rs"
120 ));
121}
122
123impl FileStore {
124 pub fn new(file: File) -> Self {
125 Self {
126 inner: crate::pager::PagedStore::new(
127 RawFileStore::new(file),
128 crate::pager::DEFAULT_PAGE_SIZE,
129 ),
130 _writer_lock: None,
131 _reader_lock: None,
132 #[cfg(test)]
133 test_write_counter: None,
134 #[cfg(test)]
135 test_write_budget_remaining: None,
136 }
137 }
138
139 #[cfg(test)]
140 pub(crate) fn new_for_test(
141 file: File,
142 write_counter: Option<Rc<Cell<usize>>>,
143 write_budget_remaining: Option<Rc<Cell<usize>>>,
144 ) -> Self {
145 Self {
146 inner: crate::pager::PagedStore::new(
147 RawFileStore::new(file),
148 crate::pager::DEFAULT_PAGE_SIZE,
149 ),
150 _writer_lock: None,
151 _reader_lock: None,
152 test_write_counter: write_counter,
153 test_write_budget_remaining: write_budget_remaining,
154 }
155 }
156
157 fn lock_path_for_db_path(db_path: &Path) -> PathBuf {
158 PathBuf::from(format!("{}.writer.lock", db_path.display()))
161 }
162
163 pub fn open_locked(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, DbError> {
174 use fs2::FileExt;
175
176 let path = path.as_ref();
177 let file = match mode {
178 OpenMode::ReadWrite => std::fs::OpenOptions::new()
179 .read(true)
180 .write(true)
181 .create(true)
182 .truncate(false)
183 .open(path)?,
184 OpenMode::ReadOnly => std::fs::OpenOptions::new().read(true).open(path)?,
185 };
186
187 let lock_path = Self::lock_path_for_db_path(path);
188
189 let writer_lock = match mode {
190 OpenMode::ReadOnly => None,
191 OpenMode::ReadWrite => {
192 let mut g = writer_locks()
193 .lock()
194 .map_err(|_| std::io::Error::other("lock poisoned"))?;
195 if let Some(st) = g.get_mut(&lock_path) {
196 st.refs = st.refs.saturating_add(1);
197 Some(WriterLockGuard {
198 lock_path: lock_path.clone(),
199 })
200 } else {
201 let lock_file = std::fs::OpenOptions::new()
202 .read(true)
203 .write(true)
204 .create(true)
205 .truncate(false)
206 .open(&lock_path)?;
207 lock_file.try_lock_exclusive()?;
209 g.insert(
210 lock_path.clone(),
211 WriterLockState {
212 _file: lock_file,
213 refs: 1,
214 },
215 );
216 Some(WriterLockGuard {
217 lock_path: lock_path.clone(),
218 })
219 }
220 }
221 };
222
223 let reader_lock = match mode {
224 OpenMode::ReadWrite => None,
225 OpenMode::ReadOnly => {
226 let already_writer = writer_locks()
232 .lock()
233 .ok()
234 .and_then(|g| g.get(&lock_path).map(|_| ()))
235 .is_some();
236 if already_writer {
237 return Err(DbError::Io(std::io::Error::other(
238 "cannot open read-only while holding writer lock in the same process",
239 )));
240 }
241
242 let lock_file = std::fs::OpenOptions::new()
243 .read(true)
244 .write(true)
245 .create(true)
246 .truncate(false)
247 .open(&lock_path)?;
248 match lock_file.try_lock_shared() {
249 Ok(()) => Some(lock_file),
250 Err(std::fs::TryLockError::WouldBlock)
251 | Err(std::fs::TryLockError::Error(_)) => {
252 return Err(DbError::Io(std::io::Error::new(
253 std::io::ErrorKind::WouldBlock,
254 "database is locked by another process",
255 )));
256 }
257 }
258 }
259 };
260
261 Ok(Self {
262 inner: crate::pager::PagedStore::new(
263 RawFileStore::new(file),
264 crate::pager::DEFAULT_PAGE_SIZE,
265 ),
266 _writer_lock: writer_lock,
267 _reader_lock: reader_lock,
268 #[cfg(test)]
269 test_write_counter: None,
270 #[cfg(test)]
271 test_write_budget_remaining: None,
272 })
273 }
274}
275
276impl Store for FileStore {
277 fn len(&self) -> Result<u64, DbError> {
278 self.inner.len()
279 }
280
281 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
282 self.inner.read_exact_at(offset, buf)
283 }
284
285 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
286 #[cfg(test)]
287 {
288 if let Some(c) = &self.test_write_counter {
289 c.set(c.get().saturating_add(1));
290 }
291 if let Some(budget) = &self.test_write_budget_remaining {
292 let r = budget.get();
293 if r == 0 {
294 return Err(DbError::Io(std::io::Error::other(
295 "FileStore write budget exhausted (test instrumentation)",
296 )));
297 }
298 budget.set(r - 1);
299 }
300 }
301 self.inner.write_all_at(offset, buf)
302 }
303
304 fn sync(&mut self) -> Result<(), DbError> {
305 self.inner.sync()
306 }
307
308 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
309 self.inner.truncate(len)
310 }
311}
312
313#[derive(Debug, Default)]
315pub struct VecStore {
316 buf: Vec<u8>,
317}
318
319impl VecStore {
320 pub fn new() -> Self {
321 Self { buf: Vec::new() }
322 }
323
324 pub fn into_inner(self) -> Vec<u8> {
325 self.buf
326 }
327
328 pub fn from_vec(buf: Vec<u8>) -> Self {
329 Self { buf }
330 }
331
332 pub fn as_slice(&self) -> &[u8] {
334 &self.buf
335 }
336
337 fn ensure_len(&mut self, end: u64) {
338 let need = end as usize;
339 if self.buf.len() < need {
340 self.buf.resize(need, 0);
341 }
342 }
343}
344
345impl Store for VecStore {
346 fn len(&self) -> Result<u64, DbError> {
347 Ok(self.buf.len() as u64)
348 }
349
350 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
351 let start = offset as usize;
352 let end = start.saturating_add(buf.len());
353 if end > self.buf.len() {
354 return Err(DbError::Io(std::io::Error::new(
355 std::io::ErrorKind::UnexpectedEof,
356 "read past end of VecStore",
357 )));
358 }
359 buf.copy_from_slice(&self.buf[start..end]);
360 Ok(())
361 }
362
363 fn write_all_at(&mut self, offset: u64, data: &[u8]) -> Result<(), DbError> {
364 let end = offset
365 .checked_add(data.len() as u64)
366 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "overflow"))?;
367 self.ensure_len(end);
368 let start = offset as usize;
369 self.buf[start..start + data.len()].copy_from_slice(data);
370 Ok(())
371 }
372
373 fn sync(&mut self) -> Result<(), DbError> {
374 Ok(())
375 }
376
377 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
378 self.buf.truncate(len as usize);
379 Ok(())
380 }
381}