Skip to main content

modelvault_core/
storage.rs

1use std::fs::File;
2use std::io::{Read, Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5#[cfg(test)]
6use std::{cell::Cell, rc::Rc};
7
8use crate::config::OpenMode;
9use crate::error::DbError;
10
11/// Random-access byte image of a ModelVault database (length, read, write, sync).
12///
13/// Implemented by [`FileStore`] (real files) and [`VecStore`] (in-memory snapshots). A future
14/// read-only store split is deferred until a second consumer needs a smaller surface.
15pub trait Store {
16    fn len(&self) -> Result<u64, DbError>;
17    fn is_empty(&self) -> Result<bool, DbError> {
18        Ok(self.len()? == 0)
19    }
20    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError>;
21    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError>;
22    fn sync(&mut self) -> Result<(), DbError>;
23    /// Shrink or grow the logical file to `len` bytes (used for crash recovery truncation).
24    fn truncate(&mut self, len: u64) -> Result<(), DbError>;
25}
26
27// In 0.2.x this is intentionally internal scaffolding.
28// The public API should not expose storage mechanics yet.
29#[derive(Debug)]
30struct RawFileStore {
31    file: File,
32}
33
34impl RawFileStore {
35    fn new(file: File) -> Self {
36        Self { file }
37    }
38}
39
40impl Store for RawFileStore {
41    fn len(&self) -> Result<u64, DbError> {
42        Ok(self.file.metadata()?.len())
43    }
44
45    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
46        self.file.seek(SeekFrom::Start(offset))?;
47        self.file.read_exact(buf)?;
48        Ok(())
49    }
50
51    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
52        self.file.seek(SeekFrom::Start(offset))?;
53        self.file.write_all(buf)?;
54        Ok(())
55    }
56
57    fn sync(&mut self) -> Result<(), DbError> {
58        self.file.sync_all()?;
59        Ok(())
60    }
61
62    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
63        self.file.set_len(len)?;
64        Ok(())
65    }
66}
67
68/// On-disk store: a real file wrapped in a fixed-size page cache.
69///
70/// This keeps the public `FileStore` name stable while introducing the 0.11.0 pager/buffer-pool
71/// boundary via [`crate::pager::PagedStore`].
72#[derive(Debug)]
73pub struct FileStore {
74    inner: crate::pager::PagedStore<RawFileStore>,
75    _writer_lock: Option<WriterLockGuard>,
76    _reader_lock: Option<File>,
77    /// Counts [`Store::write_all_at`] invocations (test builds only).
78    #[cfg(test)]
79    test_write_counter: Option<Rc<Cell<usize>>>,
80    /// When set, the Nth [`Store::write_all_at`] call fails with synthetic [`DbError::Io`].
81    #[cfg(test)]
82    test_write_budget_remaining: Option<Rc<Cell<usize>>>,
83}
84
85#[derive(Debug)]
86struct WriterLockState {
87    _file: File,
88    refs: usize,
89}
90
91static WRITER_LOCKS: OnceLock<Mutex<std::collections::HashMap<PathBuf, WriterLockState>>> =
92    OnceLock::new();
93
94fn writer_locks() -> &'static Mutex<std::collections::HashMap<PathBuf, WriterLockState>> {
95    WRITER_LOCKS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
96}
97
98#[derive(Debug)]
99struct WriterLockGuard {
100    lock_path: PathBuf,
101}
102
103impl Drop for WriterLockGuard {
104    fn drop(&mut self) {
105        let mut g = writer_locks().lock().unwrap_or_else(|e| e.into_inner());
106        if let Some(st) = g.get_mut(&self.lock_path) {
107            st.refs = st.refs.saturating_sub(1);
108        }
109        if g.get(&self.lock_path).is_some_and(|s| s.refs == 0) {
110            g.remove(&self.lock_path);
111        }
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    include!(concat!(
118        env!("CARGO_MANIFEST_DIR"),
119        "/tests/unit/src_storage_tests.rs"
120    ));
121}
122
123impl FileStore {
124    pub fn new(file: File) -> Self {
125        Self {
126            inner: crate::pager::PagedStore::new(
127                RawFileStore::new(file),
128                crate::pager::DEFAULT_PAGE_SIZE,
129            ),
130            _writer_lock: None,
131            _reader_lock: None,
132            #[cfg(test)]
133            test_write_counter: None,
134            #[cfg(test)]
135            test_write_budget_remaining: None,
136        }
137    }
138
139    #[cfg(test)]
140    pub(crate) fn new_for_test(
141        file: File,
142        write_counter: Option<Rc<Cell<usize>>>,
143        write_budget_remaining: Option<Rc<Cell<usize>>>,
144    ) -> Self {
145        Self {
146            inner: crate::pager::PagedStore::new(
147                RawFileStore::new(file),
148                crate::pager::DEFAULT_PAGE_SIZE,
149            ),
150            _writer_lock: None,
151            _reader_lock: None,
152            test_write_counter: write_counter,
153            test_write_budget_remaining: write_budget_remaining,
154        }
155    }
156
157    fn lock_path_for_db_path(db_path: &Path) -> PathBuf {
158        // Sidecar lock file so writers can exclude other writers while read-only opens proceed.
159        // This is advisory and best-effort; platforms differ in exact semantics.
160        PathBuf::from(format!("{}.writer.lock", db_path.display()))
161    }
162
163    /// Open a file store and acquire the process-level lock for the database path.
164    ///
165    /// Locking policy (cross-process):
166    /// - `ReadWrite`: takes an **exclusive** advisory lock on the sidecar file
167    ///   `<db_path>.writer.lock` (fail-fast; does not block indefinitely).
168    /// - `ReadOnly`: opens the database read-only and takes a **shared** advisory lock on the same
169    ///   sidecar file. This prevents new writers from opening while readers are active.
170    ///
171    /// This excludes concurrent writers, but does not prevent read-only opens while a writer is
172    /// active. Callers that require stronger coordination should implement it at a higher layer.
173    pub fn open_locked(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, DbError> {
174        use fs2::FileExt;
175
176        let path = path.as_ref();
177        let file = match mode {
178            OpenMode::ReadWrite => std::fs::OpenOptions::new()
179                .read(true)
180                .write(true)
181                .create(true)
182                .truncate(false)
183                .open(path)?,
184            OpenMode::ReadOnly => std::fs::OpenOptions::new().read(true).open(path)?,
185        };
186
187        let lock_path = Self::lock_path_for_db_path(path);
188
189        let writer_lock = match mode {
190            OpenMode::ReadOnly => None,
191            OpenMode::ReadWrite => {
192                let mut g = writer_locks()
193                    .lock()
194                    .map_err(|_| std::io::Error::other("lock poisoned"))?;
195                if let Some(st) = g.get_mut(&lock_path) {
196                    st.refs = st.refs.saturating_add(1);
197                    Some(WriterLockGuard {
198                        lock_path: lock_path.clone(),
199                    })
200                } else {
201                    let lock_file = std::fs::OpenOptions::new()
202                        .read(true)
203                        .write(true)
204                        .create(true)
205                        .truncate(false)
206                        .open(&lock_path)?;
207                    // Fail fast: do not block indefinitely.
208                    lock_file.try_lock_exclusive()?;
209                    g.insert(
210                        lock_path.clone(),
211                        WriterLockState {
212                            _file: lock_file,
213                            refs: 1,
214                        },
215                    );
216                    Some(WriterLockGuard {
217                        lock_path: lock_path.clone(),
218                    })
219                }
220            }
221        };
222
223        let reader_lock = match mode {
224            OpenMode::ReadWrite => None,
225            OpenMode::ReadOnly => {
226                // Always attempt a shared lock for read-only opens so readers block new writers.
227                //
228                // Important: on some platforms, acquiring a second lock in the same process while
229                // an exclusive lock is held may downgrade/replace the existing lock. We avoid that
230                // foot-gun by failing explicitly if this process already holds the writer lock.
231                let already_writer = writer_locks()
232                    .lock()
233                    .ok()
234                    .and_then(|g| g.get(&lock_path).map(|_| ()))
235                    .is_some();
236                // Same-process writer already holds the lock file; avoid a second fs2 lock
237                // (which can downgrade on some platforms). Reads use the existing writer handle.
238                if already_writer {
239                    return Ok(Self {
240                        inner: crate::pager::PagedStore::new(
241                            RawFileStore::new(file),
242                            crate::pager::DEFAULT_PAGE_SIZE,
243                        ),
244                        _writer_lock: None,
245                        _reader_lock: None,
246                        #[cfg(test)]
247                        test_write_counter: None,
248                        #[cfg(test)]
249                        test_write_budget_remaining: None,
250                    });
251                }
252
253                let lock_file = std::fs::OpenOptions::new()
254                    .read(true)
255                    .write(true)
256                    .create(true)
257                    .truncate(false)
258                    .open(&lock_path)?;
259                match lock_file.try_lock_shared() {
260                    Ok(()) => Some(lock_file),
261                    Err(std::fs::TryLockError::WouldBlock)
262                    | Err(std::fs::TryLockError::Error(_)) => {
263                        return Err(DbError::Io(std::io::Error::new(
264                            std::io::ErrorKind::WouldBlock,
265                            "database is locked by another process",
266                        )));
267                    }
268                }
269            }
270        };
271
272        Ok(Self {
273            inner: crate::pager::PagedStore::new(
274                RawFileStore::new(file),
275                crate::pager::DEFAULT_PAGE_SIZE,
276            ),
277            _writer_lock: writer_lock,
278            _reader_lock: reader_lock,
279            #[cfg(test)]
280            test_write_counter: None,
281            #[cfg(test)]
282            test_write_budget_remaining: None,
283        })
284    }
285}
286
287impl Store for FileStore {
288    fn len(&self) -> Result<u64, DbError> {
289        self.inner.len()
290    }
291
292    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
293        self.inner.read_exact_at(offset, buf)
294    }
295
296    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
297        #[cfg(test)]
298        {
299            if let Some(c) = &self.test_write_counter {
300                c.set(c.get().saturating_add(1));
301            }
302            if let Some(budget) = &self.test_write_budget_remaining {
303                let r = budget.get();
304                if r == 0 {
305                    return Err(DbError::Io(std::io::Error::other(
306                        "FileStore write budget exhausted (test instrumentation)",
307                    )));
308                }
309                budget.set(r - 1);
310            }
311        }
312        self.inner.write_all_at(offset, buf)
313    }
314
315    fn sync(&mut self) -> Result<(), DbError> {
316        self.inner.sync()
317    }
318
319    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
320        self.inner.truncate(len)
321    }
322}
323
324/// In-memory growable byte store (same [`Store`] contract as [`FileStore`]).
325#[derive(Debug, Default)]
326pub struct VecStore {
327    buf: Vec<u8>,
328}
329
330impl VecStore {
331    pub fn new() -> Self {
332        Self { buf: Vec::new() }
333    }
334
335    pub fn into_inner(self) -> Vec<u8> {
336        self.buf
337    }
338
339    pub fn from_vec(buf: Vec<u8>) -> Self {
340        Self { buf }
341    }
342
343    /// Full buffer (read-only image of the logical file).
344    pub fn as_slice(&self) -> &[u8] {
345        &self.buf
346    }
347
348    fn ensure_len(&mut self, end: u64) {
349        let need = end as usize;
350        if self.buf.len() < need {
351            self.buf.resize(need, 0);
352        }
353    }
354}
355
356impl Store for VecStore {
357    fn len(&self) -> Result<u64, DbError> {
358        Ok(self.buf.len() as u64)
359    }
360
361    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
362        let start = offset as usize;
363        let end = start.saturating_add(buf.len());
364        if end > self.buf.len() {
365            return Err(DbError::Io(std::io::Error::new(
366                std::io::ErrorKind::UnexpectedEof,
367                "read past end of VecStore",
368            )));
369        }
370        buf.copy_from_slice(&self.buf[start..end]);
371        Ok(())
372    }
373
374    fn write_all_at(&mut self, offset: u64, data: &[u8]) -> Result<(), DbError> {
375        let end = offset
376            .checked_add(data.len() as u64)
377            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "overflow"))?;
378        self.ensure_len(end);
379        let start = offset as usize;
380        self.buf[start..start + data.len()].copy_from_slice(data);
381        Ok(())
382    }
383
384    fn sync(&mut self) -> Result<(), DbError> {
385        Ok(())
386    }
387
388    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
389        self.buf.truncate(len as usize);
390        Ok(())
391    }
392}