1use std::path::Path;
4
5use crate::config::OpenMode;
6use crate::error::{DbError, SchemaError};
7use crate::schema::CollectionId;
8use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
9use crate::segments::writer::SegmentWriter;
10use crate::storage::{FileStore, Store, VecStore};
11use crate::{checkpoint, publish};
12
13use super::fs_ops::{FsOps, StdFsOps};
14use super::{handle_registry, Database};
15
16#[cfg(unix)]
18pub(crate) fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
19 let Some(parent) = dest_path.parent() else {
20 return;
21 };
22 let Ok(dir_f) = fs.open_dir(parent) else {
23 return;
24 };
25 let _ = dir_f.sync_all();
26}
27
28impl<S: Store> Database<S> {
29 pub fn checkpoint(&mut self) -> Result<(), DbError> {
35 #[cfg(feature = "tracing")]
36 let _span = tracing::info_span!("database_checkpoint").entered();
37 if self.txn_staging.is_some() {
38 return Err(DbError::Transaction(
39 crate::error::TransactionError::NestedTransaction,
40 ));
41 }
42
43 super::segment_write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
44
45 let mut cp = checkpoint::checkpoint_from_state(
46 self.catalog_for_read(),
47 self.latest_for_read(),
48 self.indexes_for_read(),
49 )?;
50
51 let file_len = self.store.len()?;
52 let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
53 let checkpoint_offset = writer.offset();
54
55 let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
56 let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
57 cp.replay_from_offset = replay_from;
58 let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
59
60 let hdr = SegmentHeader {
61 segment_type: SegmentType::Checkpoint,
62 payload_len: 0,
63 payload_crc32c: 0,
64 };
65 writer.append(hdr, &payload)?;
66
67 publish::append_manifest_and_publish_with_checkpoint(
68 &mut self.store,
69 self.segment_start,
70 Some((checkpoint_offset, payload.len() as u32)),
71 )?;
72 self.store.sync()?;
73 #[cfg(feature = "tracing")]
74 tracing::info!(
75 checkpoint_offset,
76 replay_from,
77 payload_bytes = payload.len(),
78 "database_checkpoint_ok"
79 );
80 Ok(())
81 }
82
83 pub(crate) fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
84 let mut out = Database::<VecStore>::open_in_memory()?;
85
86 let mut cols = self.catalog_for_read().collections();
88 cols.sort_by_key(|c| c.id.0);
89 for c in &cols {
90 let pk =
91 c.primary_field
92 .as_deref()
93 .ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
94 collection_id: c.id.0,
95 }))?;
96 let (new_id, _v1) = out.register_collection_with_indexes(
97 &c.name,
98 c.fields.clone(),
99 c.indexes.clone(),
100 pk,
101 )?;
102 for _ in 2..=c.current_version.0 {
104 let _ = out.register_schema_version_with_indexes_force(
105 new_id,
106 c.fields.clone(),
107 c.indexes.clone(),
108 )?;
109 }
110 }
111
112 for ((cid, _), row) in self.latest_for_read().iter() {
114 let collection_id = CollectionId(*cid);
115 out.insert(collection_id, row.clone())?;
116 }
117
118 Ok(out.into_snapshot_bytes())
119 }
120}
121
122impl Database<FileStore> {
123 pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
127 self.compact_to_with_fsops(&StdFsOps, dest_path)
128 }
129
130 pub(crate) fn compact_to_with_fsops(
131 &self,
132 fs: &dyn FsOps,
133 dest_path: impl AsRef<Path>,
134 ) -> Result<(), DbError> {
135 #[cfg(feature = "tracing")]
136 let _span = tracing::info_span!(
137 "database_compact_to",
138 dest = %dest_path.as_ref().display()
139 )
140 .entered();
141 let bytes = self.compact_snapshot_bytes()?;
142 let path = dest_path.as_ref();
143 let file = fs
144 .open_read_write_create_truncate(path)
145 .map_err(DbError::Io)?;
146 let mut store = FileStore::new(file);
147 store.write_all_at(0, &bytes)?;
148 store.truncate(bytes.len() as u64)?;
149 store.sync()?;
150 #[cfg(feature = "tracing")]
151 tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
152 Ok(())
153 }
154
155 pub fn compact_in_place(&mut self) -> Result<(), DbError> {
156 self.compact_in_place_with_fsops(&StdFsOps)
157 }
158
159 pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
160 #[cfg(feature = "tracing")]
161 let _span = tracing::info_span!("database_compact_in_place").entered();
162 let bytes = self.compact_snapshot_bytes()?;
166 let live_path = self.path.clone();
167 let parent = live_path
168 .parent()
169 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
170
171 let pid = std::process::id();
173 let nanos = std::time::SystemTime::now()
174 .duration_since(std::time::UNIX_EPOCH)
175 .map(|d| d.as_nanos())
176 .unwrap_or(0);
177 let base = live_path
178 .file_name()
179 .and_then(|s| s.to_str())
180 .unwrap_or("db.modelvault");
181 let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
182 let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
183
184 {
186 let file = fs
187 .open_read_write_create_new(&tmp_path)
188 .map_err(DbError::Io)?;
189 let mut store = FileStore::new(file);
190 store.write_all_at(0, &bytes)?;
191 store.truncate(bytes.len() as u64)?;
192 store.sync()?;
193 }
194
195 let _ = fs.remove_file(&bak_path);
205 fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
206 let replace_res = fs.rename(&tmp_path, &live_path);
207 if let Err(e) = replace_res {
208 let _ = fs.rename(&bak_path, &live_path);
210 let _ = fs.remove_file(&tmp_path);
212 return Err(DbError::Io(e));
213 }
214
215 #[cfg(unix)]
217 {
218 if let Ok(dir_f) = fs.open_dir(parent) {
222 let _ = dir_f.sync_all();
223 }
224 }
225
226 let _ = fs.remove_file(&bak_path);
227
228 let old_registry = self.writer_registry.take();
231 self.store.release_writer_lock();
232 let reopened = match (|| {
233 let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
234 Self::open_with_store(
235 live_path.clone(),
236 store,
237 crate::config::OpenOptions::default(),
238 )
239 })() {
240 Ok(db) => db,
241 Err(e) => {
242 let _ = fs.rename(&bak_path, &live_path);
243 if let Ok(store) = FileStore::open_locked(&live_path, OpenMode::ReadWrite) {
244 self.store = store;
245 }
246 self.writer_registry = old_registry;
247 return Err(e);
248 }
249 };
250 let mut reopened = reopened;
251 reopened.writer_registry = old_registry;
252 *self = reopened;
253 self.shared_mirror = Some(handle_registry::register(
254 &live_path,
255 handle_registry::SharedDbState {
256 catalog: self.catalog.clone(),
257 latest: self.latest.clone(),
258 indexes: self.indexes.clone(),
259 segment_start: self.segment_start,
260 format_minor: self.format_minor,
261 generation: 0,
262 },
263 )?);
264 self.push_shared_mirror();
265 #[cfg(feature = "tracing")]
266 tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
267 Ok(())
268 }
269
270 pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
275 self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
276 }
277
278 pub(crate) fn export_snapshot_to_path_with_fsops(
279 &mut self,
280 fs: &dyn FsOps,
281 dest_path: impl AsRef<Path>,
282 ) -> Result<(), DbError> {
283 self.checkpoint()?;
284 let dest_path = dest_path.as_ref();
285 let len = self.store.len()?;
288 let len_usize = usize::try_from(len)
289 .map_err(|_| DbError::Io(std::io::Error::other("database file too large")))?;
290 let mut bytes = vec![0u8; len_usize];
291 self.store.read_exact_at(0, &mut bytes)?;
292 Database::<VecStore>::export_snapshot_to_path_with_fsops(fs, dest_path, &bytes)?;
293 if let Ok(f) = fs.open_read(dest_path) {
296 let _ = f.sync_all();
297 }
298 #[cfg(unix)]
299 best_effort_fsync_parent_dir(fs, dest_path);
300 Ok(())
301 }
302
303 pub fn restore_snapshot_to_path(
307 snapshot_path: impl AsRef<Path>,
308 dest_path: impl AsRef<Path>,
309 ) -> Result<(), DbError> {
310 Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
311 }
312
313 pub(crate) fn restore_snapshot_to_path_with_fsops(
314 fs: &dyn FsOps,
315 snapshot_path: impl AsRef<Path>,
316 dest_path: impl AsRef<Path>,
317 ) -> Result<(), DbError> {
318 let snapshot_path = snapshot_path.as_ref();
319 let dest_path = dest_path.as_ref();
320 let parent = dest_path
321 .parent()
322 .ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
323
324 let pid = std::process::id();
325 let nanos = std::time::SystemTime::now()
326 .duration_since(std::time::UNIX_EPOCH)
327 .map(|d| d.as_nanos())
328 .unwrap_or(0);
329 let base = dest_path
330 .file_name()
331 .and_then(|s| s.to_str())
332 .unwrap_or("db.modelvault");
333 let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
334 let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
335
336 fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
338 if let Ok(f) = fs.open_read(&tmp_path) {
339 let _ = f.sync_all();
340 }
341
342 if dest_path.exists() {
344 let _ = fs.remove_file(&bak_path);
345 fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
346 }
347 let replace_res = fs.rename(&tmp_path, dest_path);
348 if let Err(e) = replace_res {
349 if bak_path.exists() {
351 let _ = fs.rename(&bak_path, dest_path);
352 }
353 let _ = fs.remove_file(&tmp_path);
354 return Err(DbError::Io(e));
355 }
356
357 #[cfg(unix)]
358 {
359 if let Ok(dir_f) = fs.open_dir(parent) {
360 let _ = dir_f.sync_all();
361 }
362 }
363 let _ = fs.remove_file(&bak_path);
364 Ok(())
365 }
366
367 #[doc(hidden)]
371 pub fn read_image_for_test(&mut self) -> Result<Vec<u8>, DbError> {
372 let len = self.store.len()?;
373 let len_usize = usize::try_from(len)
374 .map_err(|_| DbError::Io(std::io::Error::other("database file too large")))?;
375 let mut bytes = vec![0u8; len_usize];
376 self.store.read_exact_at(0, &mut bytes)?;
377 Ok(bytes)
378 }
379}