Skip to main content

modelvault_core/db/
maintenance.rs

1//! Checkpoint, compaction, and snapshot export/restore.
2
3use std::path::Path;
4
5use crate::config::OpenMode;
6use crate::error::{DbError, SchemaError};
7use crate::schema::CollectionId;
8use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
9use crate::segments::writer::SegmentWriter;
10use crate::storage::{FileStore, Store, VecStore};
11use crate::{checkpoint, publish};
12
13use super::fs_ops::{FsOps, StdFsOps};
14use super::{handle_registry, Database};
15
16/// Best-effort `fsync` on `dest_path`'s parent directory (Unix only).
17#[cfg(unix)]
18pub(crate) fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
19    let Some(parent) = dest_path.parent() else {
20        return;
21    };
22    let Ok(dir_f) = fs.open_dir(parent) else {
23        return;
24    };
25    let _ = dir_f.sync_all();
26}
27
28impl<S: Store> Database<S> {
29    /// Write a durable checkpoint segment and publish it via the superblock.
30    ///
31    /// The checkpoint stores the logical state (catalog + latest rows + index state) so open can
32    /// avoid scanning/replaying the full log. Works with any [`Store`] (file-backed [`FileStore`] or
33    /// [`VecStore`] snapshots).
34    pub fn checkpoint(&mut self) -> Result<(), DbError> {
35        #[cfg(feature = "tracing")]
36        let _span = tracing::info_span!("database_checkpoint").entered();
37        if self.txn_staging.is_some() {
38            return Err(DbError::Transaction(
39                crate::error::TransactionError::NestedTransaction,
40            ));
41        }
42
43        super::segment_write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
44
45        let mut cp = checkpoint::checkpoint_from_state(
46            self.catalog_for_read(),
47            self.latest_for_read(),
48            self.indexes_for_read(),
49        )?;
50
51        let file_len = self.store.len()?;
52        let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
53        let checkpoint_offset = writer.offset();
54
55        let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
56        let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
57        cp.replay_from_offset = replay_from;
58        let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
59
60        let hdr = SegmentHeader {
61            segment_type: SegmentType::Checkpoint,
62            payload_len: 0,
63            payload_crc32c: 0,
64        };
65        writer.append(hdr, &payload)?;
66
67        publish::append_manifest_and_publish_with_checkpoint(
68            &mut self.store,
69            self.segment_start,
70            Some((checkpoint_offset, payload.len() as u32)),
71        )?;
72        self.store.sync()?;
73        #[cfg(feature = "tracing")]
74        tracing::info!(
75            checkpoint_offset,
76            replay_from,
77            payload_bytes = payload.len(),
78            "database_checkpoint_ok"
79        );
80        Ok(())
81    }
82
83    pub(crate) fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
84        let mut out = Database::<VecStore>::open_in_memory()?;
85
86        // Recreate catalog (stable ids if created in id order).
87        let mut cols = self.catalog_for_read().collections();
88        cols.sort_by_key(|c| c.id.0);
89        for c in &cols {
90            let pk =
91                c.primary_field
92                    .as_deref()
93                    .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
94                        collection_id: c.id.0,
95                    }))?;
96            let (new_id, _v1) = out.register_collection_with_indexes(
97                &c.name,
98                c.fields.clone(),
99                c.indexes.clone(),
100                pk,
101            )?;
102            // Bump schema version counter to match current_version (repeat identical schema).
103            for _ in 2..=c.current_version.0 {
104                let _ = out.register_schema_version_with_indexes_force(
105                    new_id,
106                    c.fields.clone(),
107                    c.indexes.clone(),
108                )?;
109            }
110        }
111
112        // Copy latest rows (in-memory snapshot semantics).
113        for ((cid, _), row) in self.latest_for_read().iter() {
114            let collection_id = CollectionId(*cid);
115            out.insert(collection_id, row.clone())?;
116        }
117
118        Ok(out.into_snapshot_bytes())
119    }
120}
121
122impl Database<FileStore> {
123    /// Rewrite the database into a compacted single-file image at `dest_path`.
124    ///
125    /// The destination file is truncated/overwritten if it exists.
126    pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
127        self.compact_to_with_fsops(&StdFsOps, dest_path)
128    }
129
130    pub(crate) fn compact_to_with_fsops(
131        &self,
132        fs: &dyn FsOps,
133        dest_path: impl AsRef<Path>,
134    ) -> Result<(), DbError> {
135        #[cfg(feature = "tracing")]
136        let _span = tracing::info_span!(
137            "database_compact_to",
138            dest = %dest_path.as_ref().display()
139        )
140        .entered();
141        let bytes = self.compact_snapshot_bytes()?;
142        let path = dest_path.as_ref();
143        let file = fs
144            .open_read_write_create_truncate(path)
145            .map_err(DbError::Io)?;
146        let mut store = FileStore::new(file);
147        store.write_all_at(0, &bytes)?;
148        store.truncate(bytes.len() as u64)?;
149        store.sync()?;
150        #[cfg(feature = "tracing")]
151        tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
152        Ok(())
153    }
154
155    pub fn compact_in_place(&mut self) -> Result<(), DbError> {
156        self.compact_in_place_with_fsops(&StdFsOps)
157    }
158
159    pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
160        #[cfg(feature = "tracing")]
161        let _span = tracing::info_span!("database_compact_in_place").entered();
162        // Crash-safety: write a full new image to a sidecar file, fsync it, then atomically
163        // replace the live path via rename (using a backup on platforms where rename does not
164        // overwrite an existing destination).
165        let bytes = self.compact_snapshot_bytes()?;
166        let live_path = self.path.clone();
167        let parent = live_path
168            .parent()
169            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
170
171        // Pick unique temp + backup names in the same directory (so rename stays atomic on POSIX).
172        let pid = std::process::id();
173        let nanos = std::time::SystemTime::now()
174            .duration_since(std::time::UNIX_EPOCH)
175            .map(|d| d.as_nanos())
176            .unwrap_or(0);
177        let base = live_path
178            .file_name()
179            .and_then(|s| s.to_str())
180            .unwrap_or("db.modelvault");
181        let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
182        let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
183
184        // 1) Write the compacted image to tmp and fsync it.
185        {
186            let file = fs
187                .open_read_write_create_new(&tmp_path)
188                .map_err(DbError::Io)?;
189            let mut store = FileStore::new(file);
190            store.write_all_at(0, &bytes)?;
191            store.truncate(bytes.len() as u64)?;
192            store.sync()?;
193        }
194
195        // 2) Replace the live file path with the tmp image, preserving a backup until success.
196        //
197        // We do not rely on "rename over existing" being supported across platforms. Instead:
198        // - move live → bak
199        // - move tmp → live
200        // - fsync directory (best-effort)
201        // - remove bak
202        //
203        // If tmp → live fails, attempt to restore bak → live.
204        let _ = fs.remove_file(&bak_path);
205        fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
206        let replace_res = fs.rename(&tmp_path, &live_path);
207        if let Err(e) = replace_res {
208            // Best-effort restore: move backup back into place.
209            let _ = fs.rename(&bak_path, &live_path);
210            // Clean up tmp if it still exists.
211            let _ = fs.remove_file(&tmp_path);
212            return Err(DbError::Io(e));
213        }
214
215        // Best-effort directory sync: helps make the rename durable on POSIX.
216        #[cfg(unix)]
217        {
218            // Best-effort: on many Unix platforms, opening a directory and syncing it will persist
219            // the rename in the directory entry. If this fails, the data file itself is still
220            // fsync'd and the operation remains logically correct; only rename durability is weaker.
221            if let Ok(dir_f) = fs.open_dir(parent) {
222                let _ = dir_f.sync_all();
223            }
224        }
225
226        let _ = fs.remove_file(&bak_path);
227
228        // 3) Refresh in-memory state by reopening. Keep writer-registry registration for the
229        // whole operation so another writable handle cannot open the same path mid-reopen.
230        let old_registry = self.writer_registry.take();
231        self.store.release_writer_lock();
232        let reopened = match (|| {
233            let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
234            Self::open_with_store(
235                live_path.clone(),
236                store,
237                crate::config::OpenOptions::default(),
238            )
239        })() {
240            Ok(db) => db,
241            Err(e) => {
242                let _ = fs.rename(&bak_path, &live_path);
243                if let Ok(store) = FileStore::open_locked(&live_path, OpenMode::ReadWrite) {
244                    self.store = store;
245                }
246                self.writer_registry = old_registry;
247                return Err(e);
248            }
249        };
250        let mut reopened = reopened;
251        reopened.writer_registry = old_registry;
252        *self = reopened;
253        self.shared_mirror = Some(handle_registry::register(
254            &live_path,
255            handle_registry::SharedDbState {
256                catalog: self.catalog.clone(),
257                latest: self.latest.clone(),
258                indexes: self.indexes.clone(),
259                segment_start: self.segment_start,
260                format_minor: self.format_minor,
261                generation: 0,
262            },
263        )?);
264        self.push_shared_mirror();
265        #[cfg(feature = "tracing")]
266        tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
267        Ok(())
268    }
269
270    /// Create a consistent backup copy of this on-disk database.
271    ///
272    /// This writes a checkpoint (for fast reopen and a stable state marker) and then copies the
273    /// underlying file bytes to `dest_path`.
274    pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
275        self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
276    }
277
278    pub(crate) fn export_snapshot_to_path_with_fsops(
279        &mut self,
280        fs: &dyn FsOps,
281        dest_path: impl AsRef<Path>,
282    ) -> Result<(), DbError> {
283        self.checkpoint()?;
284        let dest_path = dest_path.as_ref();
285        // Read through the open store handle so export works while the writer lock is held
286        // (Windows rejects `fs::copy` on exclusively locked files).
287        let len = self.store.len()?;
288        let len_usize = usize::try_from(len)
289            .map_err(|_| DbError::Io(std::io::Error::other("database file too large")))?;
290        let mut bytes = vec![0u8; len_usize];
291        self.store.read_exact_at(0, &mut bytes)?;
292        Database::<VecStore>::export_snapshot_to_path_with_fsops(fs, dest_path, &bytes)?;
293        // Strengthen durability of the copied snapshot: fsync the destination and best-effort
294        // fsync its parent directory so the directory entry is persisted.
295        if let Ok(f) = fs.open_read(dest_path) {
296            let _ = f.sync_all();
297        }
298        #[cfg(unix)]
299        best_effort_fsync_parent_dir(fs, dest_path);
300        Ok(())
301    }
302
303    /// Restore a snapshot file into `dest_path` by atomically replacing the destination.
304    ///
305    /// This is a file operation helper intended for operational tooling.
306    pub fn restore_snapshot_to_path(
307        snapshot_path: impl AsRef<Path>,
308        dest_path: impl AsRef<Path>,
309    ) -> Result<(), DbError> {
310        Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
311    }
312
313    pub(crate) fn restore_snapshot_to_path_with_fsops(
314        fs: &dyn FsOps,
315        snapshot_path: impl AsRef<Path>,
316        dest_path: impl AsRef<Path>,
317    ) -> Result<(), DbError> {
318        let snapshot_path = snapshot_path.as_ref();
319        let dest_path = dest_path.as_ref();
320        let parent = dest_path
321            .parent()
322            .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
323
324        let pid = std::process::id();
325        let nanos = std::time::SystemTime::now()
326            .duration_since(std::time::UNIX_EPOCH)
327            .map(|d| d.as_nanos())
328            .unwrap_or(0);
329        let base = dest_path
330            .file_name()
331            .and_then(|s| s.to_str())
332            .unwrap_or("db.modelvault");
333        let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
334        let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
335
336        // Copy snapshot bytes into a temp file and fsync it.
337        fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
338        if let Ok(f) = fs.open_read(&tmp_path) {
339            let _ = f.sync_all();
340        }
341
342        // Replace destination with backup/restore semantics.
343        if dest_path.exists() {
344            let _ = fs.remove_file(&bak_path);
345            fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
346        }
347        let replace_res = fs.rename(&tmp_path, dest_path);
348        if let Err(e) = replace_res {
349            // Best-effort restore original.
350            if bak_path.exists() {
351                let _ = fs.rename(&bak_path, dest_path);
352            }
353            let _ = fs.remove_file(&tmp_path);
354            return Err(DbError::Io(e));
355        }
356
357        #[cfg(unix)]
358        {
359            if let Ok(dir_f) = fs.open_dir(parent) {
360                let _ = dir_f.sync_all();
361            }
362        }
363        let _ = fs.remove_file(&bak_path);
364        Ok(())
365    }
366
367    /// Read the on-disk image via the open store (integration tests).
368    ///
369    /// On Windows, `std::fs::read` fails while the writer holds an exclusive file lock.
370    #[doc(hidden)]
371    pub fn read_image_for_test(&mut self) -> Result<Vec<u8>, DbError> {
372        let len = self.store.len()?;
373        let len_usize = usize::try_from(len)
374            .map_err(|_| DbError::Io(std::io::Error::other("database file too large")))?;
375        let mut bytes = vec![0u8; len_usize];
376        self.store.read_exact_at(0, &mut bytes)?;
377        Ok(bytes)
378    }
379}