Skip to main content

modelvault_core/
storage.rs

1use std::fs::File;
2use std::io::{Read, Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5#[cfg(test)]
6use std::{cell::Cell, rc::Rc};
7
8use crate::config::OpenMode;
9use crate::error::DbError;
10
11/// Random-access byte image of a ModelVault database (length, read, write, sync).
12///
13/// Implemented by [`FileStore`] (real files) and [`VecStore`] (in-memory snapshots). A future
14/// read-only store split is deferred until a second consumer needs a smaller surface.
15pub trait Store {
16    fn len(&self) -> Result<u64, DbError>;
17    fn is_empty(&self) -> Result<bool, DbError> {
18        Ok(self.len()? == 0)
19    }
20    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError>;
21    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError>;
22    fn sync(&mut self) -> Result<(), DbError>;
23    /// Shrink or grow the logical file to `len` bytes (used for crash recovery truncation).
24    fn truncate(&mut self, len: u64) -> Result<(), DbError>;
25}
26
27// In 0.2.x this is intentionally internal scaffolding.
28// The public API should not expose storage mechanics yet.
29#[derive(Debug)]
30struct RawFileStore {
31    file: File,
32}
33
34impl RawFileStore {
35    fn new(file: File) -> Self {
36        Self { file }
37    }
38}
39
40impl Store for RawFileStore {
41    fn len(&self) -> Result<u64, DbError> {
42        Ok(self.file.metadata()?.len())
43    }
44
45    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
46        self.file.seek(SeekFrom::Start(offset))?;
47        self.file.read_exact(buf)?;
48        Ok(())
49    }
50
51    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
52        self.file.seek(SeekFrom::Start(offset))?;
53        self.file.write_all(buf)?;
54        Ok(())
55    }
56
57    fn sync(&mut self) -> Result<(), DbError> {
58        self.file.sync_all()?;
59        Ok(())
60    }
61
62    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
63        self.file.set_len(len)?;
64        Ok(())
65    }
66}
67
68/// On-disk store: a real file wrapped in a fixed-size page cache.
69///
70/// This keeps the public `FileStore` name stable while introducing the 0.11.0 pager/buffer-pool
71/// boundary via [`crate::pager::PagedStore`].
72#[derive(Debug)]
73pub struct FileStore {
74    inner: crate::pager::PagedStore<RawFileStore>,
75    _writer_lock: Option<WriterLockGuard>,
76    _reader_lock: Option<File>,
77    /// Counts [`Store::write_all_at`] invocations (test builds only).
78    #[cfg(test)]
79    test_write_counter: Option<Rc<Cell<usize>>>,
80    /// When set, the Nth [`Store::write_all_at`] call fails with synthetic [`DbError::Io`].
81    #[cfg(test)]
82    test_write_budget_remaining: Option<Rc<Cell<usize>>>,
83}
84
85#[derive(Debug)]
86struct WriterLockState {
87    _file: File,
88    refs: usize,
89}
90
91static WRITER_LOCKS: OnceLock<Mutex<std::collections::HashMap<PathBuf, WriterLockState>>> =
92    OnceLock::new();
93
94fn writer_locks() -> &'static Mutex<std::collections::HashMap<PathBuf, WriterLockState>> {
95    WRITER_LOCKS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
96}
97
98#[derive(Debug)]
99struct WriterLockGuard {
100    lock_path: PathBuf,
101}
102
103impl Drop for WriterLockGuard {
104    fn drop(&mut self) {
105        let mut g = writer_locks().lock().unwrap_or_else(|e| e.into_inner());
106        if let Some(st) = g.get_mut(&self.lock_path) {
107            st.refs = st.refs.saturating_sub(1);
108        }
109        if g.get(&self.lock_path).is_some_and(|s| s.refs == 0) {
110            g.remove(&self.lock_path);
111        }
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    include!(concat!(
118        env!("CARGO_MANIFEST_DIR"),
119        "/tests/unit/src_storage_tests.rs"
120    ));
121}
122
123impl FileStore {
124    /// Drop the process-local writer lock guard so the path can be reopened (e.g. after compaction).
125    pub(crate) fn release_writer_lock(&mut self) {
126        self._writer_lock = None;
127    }
128
129    pub fn new(file: File) -> Self {
130        Self {
131            inner: crate::pager::PagedStore::new(
132                RawFileStore::new(file),
133                crate::pager::DEFAULT_PAGE_SIZE,
134            ),
135            _writer_lock: None,
136            _reader_lock: None,
137            #[cfg(test)]
138            test_write_counter: None,
139            #[cfg(test)]
140            test_write_budget_remaining: None,
141        }
142    }
143
144    #[cfg(test)]
145    pub(crate) fn new_for_test(
146        file: File,
147        write_counter: Option<Rc<Cell<usize>>>,
148        write_budget_remaining: Option<Rc<Cell<usize>>>,
149    ) -> Self {
150        Self {
151            inner: crate::pager::PagedStore::new(
152                RawFileStore::new(file),
153                crate::pager::DEFAULT_PAGE_SIZE,
154            ),
155            _writer_lock: None,
156            _reader_lock: None,
157            test_write_counter: write_counter,
158            test_write_budget_remaining: write_budget_remaining,
159        }
160    }
161
162    fn lock_path_for_db_path(db_path: &Path) -> PathBuf {
163        // Sidecar lock file so writers can exclude other writers while read-only opens proceed.
164        // This is advisory and best-effort; platforms differ in exact semantics.
165        PathBuf::from(format!("{}.writer.lock", db_path.display()))
166    }
167
168    /// Open a file store and acquire the process-level lock for the database path.
169    ///
170    /// Locking policy (cross-process):
171    /// - `ReadWrite`: takes an **exclusive** advisory lock on the sidecar file
172    ///   `<db_path>.writer.lock` (fail-fast; does not block indefinitely).
173    /// - `ReadOnly`: opens the database read-only and takes a **shared** advisory lock on the same
174    ///   sidecar file. This prevents new writers from opening while readers are active.
175    ///
176    /// This excludes concurrent writers, but does not prevent read-only opens while a writer is
177    /// active. Callers that require stronger coordination should implement it at a higher layer.
178    pub fn open_locked(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, DbError> {
179        use fs2::FileExt;
180
181        let path = path.as_ref();
182        let file = match mode {
183            OpenMode::ReadWrite => std::fs::OpenOptions::new()
184                .read(true)
185                .write(true)
186                .create(true)
187                .truncate(false)
188                .open(path)?,
189            OpenMode::ReadOnly => std::fs::OpenOptions::new().read(true).open(path)?,
190        };
191
192        let lock_path = Self::lock_path_for_db_path(path);
193
194        if mode == OpenMode::ReadWrite {
195            let already_writer = writer_locks()
196                .lock()
197                .ok()
198                .and_then(|g| g.get(&lock_path).map(|_| ()))
199                .is_some();
200            if already_writer {
201                return Err(DbError::Io(std::io::Error::new(
202                    std::io::ErrorKind::AlreadyExists,
203                    format!(
204                        "writable storage already open for this path in this process: {}",
205                        path.display()
206                    ),
207                )));
208            }
209            if let Err(e) = file.try_lock_exclusive() {
210                return Err(DbError::Io(std::io::Error::new(
211                    std::io::ErrorKind::WouldBlock,
212                    format!("database file is locked by another process: {e}"),
213                )));
214            }
215        }
216
217        let writer_lock = match mode {
218            OpenMode::ReadOnly => None,
219            OpenMode::ReadWrite => {
220                let mut g = writer_locks()
221                    .lock()
222                    .map_err(|_| std::io::Error::other("lock poisoned"))?;
223                if let Some(st) = g.get_mut(&lock_path) {
224                    st.refs = st.refs.saturating_add(1);
225                    Some(WriterLockGuard {
226                        lock_path: lock_path.clone(),
227                    })
228                } else {
229                    let lock_file = std::fs::OpenOptions::new()
230                        .read(true)
231                        .write(true)
232                        .create(true)
233                        .truncate(false)
234                        .open(&lock_path)?;
235                    // Fail fast: do not block indefinitely.
236                    lock_file.try_lock_exclusive()?;
237                    g.insert(
238                        lock_path.clone(),
239                        WriterLockState {
240                            _file: lock_file,
241                            refs: 1,
242                        },
243                    );
244                    Some(WriterLockGuard {
245                        lock_path: lock_path.clone(),
246                    })
247                }
248            }
249        };
250
251        let reader_lock = match mode {
252            OpenMode::ReadWrite => None,
253            OpenMode::ReadOnly => {
254                // Always attempt a shared lock for read-only opens so readers block new writers.
255                //
256                // Important: on some platforms, acquiring a second lock in the same process while
257                // an exclusive lock is held may downgrade/replace the existing lock. We avoid that
258                // foot-gun by failing explicitly if this process already holds the writer lock.
259                let already_writer = writer_locks()
260                    .lock()
261                    .ok()
262                    .and_then(|g| g.get(&lock_path).map(|_| ()))
263                    .is_some();
264                // Same-process writer already holds the lock file; avoid a second fs2 lock
265                // (which can downgrade on some platforms). Reads use the existing writer handle.
266                if already_writer {
267                    return Ok(Self {
268                        inner: crate::pager::PagedStore::new(
269                            RawFileStore::new(file),
270                            crate::pager::DEFAULT_PAGE_SIZE,
271                        ),
272                        _writer_lock: None,
273                        _reader_lock: None,
274                        #[cfg(test)]
275                        test_write_counter: None,
276                        #[cfg(test)]
277                        test_write_budget_remaining: None,
278                    });
279                }
280
281                let lock_file = std::fs::OpenOptions::new()
282                    .read(true)
283                    .write(true)
284                    .create(true)
285                    .truncate(false)
286                    .open(&lock_path)?;
287                match lock_file.try_lock_shared() {
288                    Ok(()) => Some(lock_file),
289                    Err(std::fs::TryLockError::WouldBlock)
290                    | Err(std::fs::TryLockError::Error(_)) => {
291                        return Err(DbError::Io(std::io::Error::new(
292                            std::io::ErrorKind::WouldBlock,
293                            "database is locked by another process",
294                        )));
295                    }
296                }
297            }
298        };
299
300        Ok(Self {
301            inner: crate::pager::PagedStore::new(
302                RawFileStore::new(file),
303                crate::pager::DEFAULT_PAGE_SIZE,
304            ),
305            _writer_lock: writer_lock,
306            _reader_lock: reader_lock,
307            #[cfg(test)]
308            test_write_counter: None,
309            #[cfg(test)]
310            test_write_budget_remaining: None,
311        })
312    }
313}
314
315impl Store for FileStore {
316    fn len(&self) -> Result<u64, DbError> {
317        self.inner.len()
318    }
319
320    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
321        self.inner.read_exact_at(offset, buf)
322    }
323
324    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
325        #[cfg(test)]
326        {
327            if let Some(c) = &self.test_write_counter {
328                c.set(c.get().saturating_add(1));
329            }
330            if let Some(budget) = &self.test_write_budget_remaining {
331                let r = budget.get();
332                if r == 0 {
333                    return Err(DbError::Io(std::io::Error::other(
334                        "FileStore write budget exhausted (test instrumentation)",
335                    )));
336                }
337                budget.set(r - 1);
338            }
339        }
340        self.inner.write_all_at(offset, buf)
341    }
342
343    fn sync(&mut self) -> Result<(), DbError> {
344        self.inner.sync()
345    }
346
347    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
348        self.inner.truncate(len)
349    }
350}
351
352/// In-memory growable byte store (same [`Store`] contract as [`FileStore`]).
353#[derive(Debug, Default)]
354pub struct VecStore {
355    buf: Vec<u8>,
356}
357
358impl VecStore {
359    pub fn new() -> Self {
360        Self { buf: Vec::new() }
361    }
362
363    pub fn into_inner(self) -> Vec<u8> {
364        self.buf
365    }
366
367    pub fn from_vec(buf: Vec<u8>) -> Self {
368        Self { buf }
369    }
370
371    /// Full buffer (read-only image of the logical file).
372    pub fn as_slice(&self) -> &[u8] {
373        &self.buf
374    }
375
376    fn ensure_len(&mut self, end: u64) {
377        let need = end as usize;
378        if self.buf.len() < need {
379            self.buf.resize(need, 0);
380        }
381    }
382}
383
384impl Store for VecStore {
385    fn len(&self) -> Result<u64, DbError> {
386        Ok(self.buf.len() as u64)
387    }
388
389    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
390        let start = offset as usize;
391        let end = start.saturating_add(buf.len());
392        if end > self.buf.len() {
393            return Err(DbError::Io(std::io::Error::new(
394                std::io::ErrorKind::UnexpectedEof,
395                "read past end of VecStore",
396            )));
397        }
398        buf.copy_from_slice(&self.buf[start..end]);
399        Ok(())
400    }
401
402    fn write_all_at(&mut self, offset: u64, data: &[u8]) -> Result<(), DbError> {
403        let end = offset
404            .checked_add(data.len() as u64)
405            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "overflow"))?;
406        self.ensure_len(end);
407        let start = offset as usize;
408        self.buf[start..start + data.len()].copy_from_slice(data);
409        Ok(())
410    }
411
412    fn sync(&mut self) -> Result<(), DbError> {
413        Ok(())
414    }
415
416    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
417        self.buf.truncate(len as usize);
418        Ok(())
419    }
420}