Skip to main content

modelvault_core/
storage.rs

1use std::fs::File;
2use std::io::{Read, Seek, SeekFrom, Write};
3use std::path::{Path, PathBuf};
4use std::sync::{Mutex, OnceLock};
5#[cfg(test)]
6use std::{cell::Cell, rc::Rc};
7
8use crate::config::OpenMode;
9use crate::error::DbError;
10
11/// Random-access byte image of a ModelVault database (length, read, write, sync).
12///
13/// Implemented by [`FileStore`] (real files) and [`VecStore`] (in-memory snapshots). A future
14/// read-only store split is deferred until a second consumer needs a smaller surface.
15pub trait Store {
16    fn len(&self) -> Result<u64, DbError>;
17    fn is_empty(&self) -> Result<bool, DbError> {
18        Ok(self.len()? == 0)
19    }
20    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError>;
21    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError>;
22    fn sync(&mut self) -> Result<(), DbError>;
23    /// Shrink or grow the logical file to `len` bytes (used for crash recovery truncation).
24    fn truncate(&mut self, len: u64) -> Result<(), DbError>;
25}
26
27// In 0.2.x this is intentionally internal scaffolding.
28// The public API should not expose storage mechanics yet.
29#[derive(Debug)]
30struct RawFileStore {
31    file: File,
32}
33
34impl RawFileStore {
35    fn new(file: File) -> Self {
36        Self { file }
37    }
38}
39
40impl Store for RawFileStore {
41    fn len(&self) -> Result<u64, DbError> {
42        Ok(self.file.metadata()?.len())
43    }
44
45    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
46        self.file.seek(SeekFrom::Start(offset))?;
47        self.file.read_exact(buf)?;
48        Ok(())
49    }
50
51    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
52        self.file.seek(SeekFrom::Start(offset))?;
53        self.file.write_all(buf)?;
54        Ok(())
55    }
56
57    fn sync(&mut self) -> Result<(), DbError> {
58        self.file.sync_all()?;
59        Ok(())
60    }
61
62    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
63        self.file.set_len(len)?;
64        Ok(())
65    }
66}
67
68/// On-disk store: a real file wrapped in a fixed-size page cache.
69///
70/// This keeps the public `FileStore` name stable while introducing the 0.11.0 pager/buffer-pool
71/// boundary via [`crate::pager::PagedStore`].
72#[derive(Debug)]
73pub struct FileStore {
74    inner: crate::pager::PagedStore<RawFileStore>,
75    _writer_lock: Option<WriterLockGuard>,
76    _reader_lock: Option<File>,
77    /// Counts [`Store::write_all_at`] invocations (test builds only).
78    #[cfg(test)]
79    test_write_counter: Option<Rc<Cell<usize>>>,
80    /// When set, the Nth [`Store::write_all_at`] call fails with synthetic [`DbError::Io`].
81    #[cfg(test)]
82    test_write_budget_remaining: Option<Rc<Cell<usize>>>,
83}
84
85#[derive(Debug)]
86struct WriterLockState {
87    _file: File,
88    refs: usize,
89}
90
91static WRITER_LOCKS: OnceLock<Mutex<std::collections::HashMap<PathBuf, WriterLockState>>> =
92    OnceLock::new();
93
94fn writer_locks() -> &'static Mutex<std::collections::HashMap<PathBuf, WriterLockState>> {
95    WRITER_LOCKS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
96}
97
98#[derive(Debug)]
99struct WriterLockGuard {
100    lock_path: PathBuf,
101}
102
103impl Drop for WriterLockGuard {
104    fn drop(&mut self) {
105        let mut g = writer_locks().lock().unwrap_or_else(|e| e.into_inner());
106        if let Some(st) = g.get_mut(&self.lock_path) {
107            st.refs = st.refs.saturating_sub(1);
108        }
109        if g.get(&self.lock_path).is_some_and(|s| s.refs == 0) {
110            g.remove(&self.lock_path);
111        }
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    include!(concat!(
118        env!("CARGO_MANIFEST_DIR"),
119        "/tests/unit/src_storage_tests.rs"
120    ));
121}
122
123impl FileStore {
124    pub fn new(file: File) -> Self {
125        Self {
126            inner: crate::pager::PagedStore::new(
127                RawFileStore::new(file),
128                crate::pager::DEFAULT_PAGE_SIZE,
129            ),
130            _writer_lock: None,
131            _reader_lock: None,
132            #[cfg(test)]
133            test_write_counter: None,
134            #[cfg(test)]
135            test_write_budget_remaining: None,
136        }
137    }
138
139    #[cfg(test)]
140    pub(crate) fn new_for_test(
141        file: File,
142        write_counter: Option<Rc<Cell<usize>>>,
143        write_budget_remaining: Option<Rc<Cell<usize>>>,
144    ) -> Self {
145        Self {
146            inner: crate::pager::PagedStore::new(
147                RawFileStore::new(file),
148                crate::pager::DEFAULT_PAGE_SIZE,
149            ),
150            _writer_lock: None,
151            _reader_lock: None,
152            test_write_counter: write_counter,
153            test_write_budget_remaining: write_budget_remaining,
154        }
155    }
156
157    fn lock_path_for_db_path(db_path: &Path) -> PathBuf {
158        // Sidecar lock file so writers can exclude other writers while read-only opens proceed.
159        // This is advisory and best-effort; platforms differ in exact semantics.
160        PathBuf::from(format!("{}.writer.lock", db_path.display()))
161    }
162
163    /// Open a file store and acquire the process-level lock for the database path.
164    ///
165    /// Locking policy (cross-process):
166    /// - `ReadWrite`: takes an **exclusive** advisory lock on the sidecar file
167    ///   `<db_path>.writer.lock` (fail-fast; does not block indefinitely).
168    /// - `ReadOnly`: opens the database read-only and takes a **shared** advisory lock on the same
169    ///   sidecar file. This prevents new writers from opening while readers are active.
170    ///
171    /// This excludes concurrent writers, but does not prevent read-only opens while a writer is
172    /// active. Callers that require stronger coordination should implement it at a higher layer.
173    pub fn open_locked(path: impl AsRef<Path>, mode: OpenMode) -> Result<Self, DbError> {
174        use fs2::FileExt;
175
176        let path = path.as_ref();
177        let file = match mode {
178            OpenMode::ReadWrite => std::fs::OpenOptions::new()
179                .read(true)
180                .write(true)
181                .create(true)
182                .truncate(false)
183                .open(path)?,
184            OpenMode::ReadOnly => std::fs::OpenOptions::new().read(true).open(path)?,
185        };
186
187        let lock_path = Self::lock_path_for_db_path(path);
188
189        let writer_lock = match mode {
190            OpenMode::ReadOnly => None,
191            OpenMode::ReadWrite => {
192                let mut g = writer_locks()
193                    .lock()
194                    .map_err(|_| std::io::Error::other("lock poisoned"))?;
195                if let Some(st) = g.get_mut(&lock_path) {
196                    st.refs = st.refs.saturating_add(1);
197                    Some(WriterLockGuard {
198                        lock_path: lock_path.clone(),
199                    })
200                } else {
201                    let lock_file = std::fs::OpenOptions::new()
202                        .read(true)
203                        .write(true)
204                        .create(true)
205                        .truncate(false)
206                        .open(&lock_path)?;
207                    // Fail fast: do not block indefinitely.
208                    lock_file.try_lock_exclusive()?;
209                    g.insert(
210                        lock_path.clone(),
211                        WriterLockState {
212                            _file: lock_file,
213                            refs: 1,
214                        },
215                    );
216                    Some(WriterLockGuard {
217                        lock_path: lock_path.clone(),
218                    })
219                }
220            }
221        };
222
223        let reader_lock = match mode {
224            OpenMode::ReadWrite => None,
225            OpenMode::ReadOnly => {
226                // Always attempt a shared lock for read-only opens so readers block new writers.
227                //
228                // Important: on some platforms, acquiring a second lock in the same process while
229                // an exclusive lock is held may downgrade/replace the existing lock. We avoid that
230                // foot-gun by failing explicitly if this process already holds the writer lock.
231                let already_writer = writer_locks()
232                    .lock()
233                    .ok()
234                    .and_then(|g| g.get(&lock_path).map(|_| ()))
235                    .is_some();
236                if already_writer {
237                    return Err(DbError::Io(std::io::Error::other(
238                        "cannot open read-only while holding writer lock in the same process",
239                    )));
240                }
241
242                let lock_file = std::fs::OpenOptions::new()
243                    .read(true)
244                    .write(true)
245                    .create(true)
246                    .truncate(false)
247                    .open(&lock_path)?;
248                match lock_file.try_lock_shared() {
249                    Ok(()) => Some(lock_file),
250                    Err(std::fs::TryLockError::WouldBlock)
251                    | Err(std::fs::TryLockError::Error(_)) => {
252                        return Err(DbError::Io(std::io::Error::new(
253                            std::io::ErrorKind::WouldBlock,
254                            "database is locked by another process",
255                        )));
256                    }
257                }
258            }
259        };
260
261        Ok(Self {
262            inner: crate::pager::PagedStore::new(
263                RawFileStore::new(file),
264                crate::pager::DEFAULT_PAGE_SIZE,
265            ),
266            _writer_lock: writer_lock,
267            _reader_lock: reader_lock,
268            #[cfg(test)]
269            test_write_counter: None,
270            #[cfg(test)]
271            test_write_budget_remaining: None,
272        })
273    }
274}
275
276impl Store for FileStore {
277    fn len(&self) -> Result<u64, DbError> {
278        self.inner.len()
279    }
280
281    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
282        self.inner.read_exact_at(offset, buf)
283    }
284
285    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
286        #[cfg(test)]
287        {
288            if let Some(c) = &self.test_write_counter {
289                c.set(c.get().saturating_add(1));
290            }
291            if let Some(budget) = &self.test_write_budget_remaining {
292                let r = budget.get();
293                if r == 0 {
294                    return Err(DbError::Io(std::io::Error::other(
295                        "FileStore write budget exhausted (test instrumentation)",
296                    )));
297                }
298                budget.set(r - 1);
299            }
300        }
301        self.inner.write_all_at(offset, buf)
302    }
303
304    fn sync(&mut self) -> Result<(), DbError> {
305        self.inner.sync()
306    }
307
308    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
309        self.inner.truncate(len)
310    }
311}
312
313/// In-memory growable byte store (same [`Store`] contract as [`FileStore`]).
314#[derive(Debug, Default)]
315pub struct VecStore {
316    buf: Vec<u8>,
317}
318
319impl VecStore {
320    pub fn new() -> Self {
321        Self { buf: Vec::new() }
322    }
323
324    pub fn into_inner(self) -> Vec<u8> {
325        self.buf
326    }
327
328    pub fn from_vec(buf: Vec<u8>) -> Self {
329        Self { buf }
330    }
331
332    /// Full buffer (read-only image of the logical file).
333    pub fn as_slice(&self) -> &[u8] {
334        &self.buf
335    }
336
337    fn ensure_len(&mut self, end: u64) {
338        let need = end as usize;
339        if self.buf.len() < need {
340            self.buf.resize(need, 0);
341        }
342    }
343}
344
345impl Store for VecStore {
346    fn len(&self) -> Result<u64, DbError> {
347        Ok(self.buf.len() as u64)
348    }
349
350    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
351        let start = offset as usize;
352        let end = start.saturating_add(buf.len());
353        if end > self.buf.len() {
354            return Err(DbError::Io(std::io::Error::new(
355                std::io::ErrorKind::UnexpectedEof,
356                "read past end of VecStore",
357            )));
358        }
359        buf.copy_from_slice(&self.buf[start..end]);
360        Ok(())
361    }
362
363    fn write_all_at(&mut self, offset: u64, data: &[u8]) -> Result<(), DbError> {
364        let end = offset
365            .checked_add(data.len() as u64)
366            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "overflow"))?;
367        self.ensure_len(end);
368        let start = offset as usize;
369        self.buf[start..start + data.len()].copy_from_slice(data);
370        Ok(())
371    }
372
373    fn sync(&mut self) -> Result<(), DbError> {
374        Ok(())
375    }
376
377    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
378        self.buf.truncate(len as usize);
379        Ok(())
380    }
381}