use std::path::Path;
use crate::config::OpenMode;
use crate::error::{DbError, SchemaError};
use crate::schema::CollectionId;
use crate::segments::header::{SegmentHeader, SegmentType, SEGMENT_HEADER_LEN};
use crate::segments::writer::SegmentWriter;
use crate::storage::{FileStore, Store, VecStore};
use crate::{checkpoint, publish};
use super::fs_ops::{FsOps, StdFsOps};
use super::{handle_registry, Database};
#[cfg(unix)]
pub(crate) fn best_effort_fsync_parent_dir(fs: &dyn FsOps, dest_path: &Path) {
let Some(parent) = dest_path.parent() else {
return;
};
let Ok(dir_f) = fs.open_dir(parent) else {
return;
};
let _ = dir_f.sync_all();
}
impl<S: Store> Database<S> {
pub fn checkpoint(&mut self) -> Result<(), DbError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("database_checkpoint").entered();
if self.txn_staging.is_some() {
return Err(DbError::Transaction(
crate::error::TransactionError::NestedTransaction,
));
}
super::segment_write::ensure_header_v0_6(&mut self.store, &mut self.format_minor)?;
let mut cp = checkpoint::checkpoint_from_state(
self.catalog_for_read(),
self.latest_for_read(),
self.indexes_for_read(),
)?;
let file_len = self.store.len()?;
let mut writer = SegmentWriter::new(&mut self.store, file_len.max(self.segment_start));
let checkpoint_offset = writer.offset();
let payload_len = checkpoint::encode_checkpoint_payload_v0(&cp).len() as u64;
let replay_from = checkpoint_offset + SEGMENT_HEADER_LEN as u64 + payload_len;
cp.replay_from_offset = replay_from;
let payload = checkpoint::encode_checkpoint_payload_v0(&cp);
let hdr = SegmentHeader {
segment_type: SegmentType::Checkpoint,
payload_len: 0,
payload_crc32c: 0,
};
writer.append(hdr, &payload)?;
publish::append_manifest_and_publish_with_checkpoint(
&mut self.store,
self.segment_start,
Some((checkpoint_offset, payload.len() as u32)),
)?;
self.store.sync()?;
#[cfg(feature = "tracing")]
tracing::info!(
checkpoint_offset,
replay_from,
payload_bytes = payload.len(),
"database_checkpoint_ok"
);
Ok(())
}
pub(crate) fn compact_snapshot_bytes(&self) -> Result<Vec<u8>, DbError> {
let mut out = Database::<VecStore>::open_in_memory()?;
let mut cols = self.catalog_for_read().collections();
cols.sort_by_key(|c| c.id.0);
for c in &cols {
let pk =
c.primary_field
.as_deref()
.ok_or(DbError::Schema(SchemaError::NoPrimaryKey {
collection_id: c.id.0,
}))?;
let (new_id, _v1) = out.register_collection_with_indexes(
&c.name,
c.fields.clone(),
c.indexes.clone(),
pk,
)?;
for _ in 2..=c.current_version.0 {
let _ = out.register_schema_version_with_indexes_force(
new_id,
c.fields.clone(),
c.indexes.clone(),
)?;
}
}
for ((cid, _), row) in self.latest_for_read().iter() {
let collection_id = CollectionId(*cid);
out.insert(collection_id, row.clone())?;
}
Ok(out.into_snapshot_bytes())
}
}
impl Database<FileStore> {
pub fn compact_to(&self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
self.compact_to_with_fsops(&StdFsOps, dest_path)
}
pub(crate) fn compact_to_with_fsops(
&self,
fs: &dyn FsOps,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!(
"database_compact_to",
dest = %dest_path.as_ref().display()
)
.entered();
let bytes = self.compact_snapshot_bytes()?;
let path = dest_path.as_ref();
let file = fs
.open_read_write_create_truncate(path)
.map_err(DbError::Io)?;
let mut store = FileStore::new(file);
store.write_all_at(0, &bytes)?;
store.truncate(bytes.len() as u64)?;
store.sync()?;
#[cfg(feature = "tracing")]
tracing::info!(bytes = bytes.len(), "database_compact_to_ok");
Ok(())
}
pub fn compact_in_place(&mut self) -> Result<(), DbError> {
self.compact_in_place_with_fsops(&StdFsOps)
}
pub(crate) fn compact_in_place_with_fsops(&mut self, fs: &dyn FsOps) -> Result<(), DbError> {
#[cfg(feature = "tracing")]
let _span = tracing::info_span!("database_compact_in_place").entered();
let bytes = self.compact_snapshot_bytes()?;
let live_path = self.path.clone();
let parent = live_path
.parent()
.ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let base = live_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("db.modelvault");
let tmp_path = parent.join(format!("{base}.compact.{pid}.{nanos}.tmp"));
let bak_path = parent.join(format!("{base}.compact.{pid}.{nanos}.bak"));
{
let file = fs
.open_read_write_create_new(&tmp_path)
.map_err(DbError::Io)?;
let mut store = FileStore::new(file);
store.write_all_at(0, &bytes)?;
store.truncate(bytes.len() as u64)?;
store.sync()?;
}
let _ = fs.remove_file(&bak_path);
fs.rename(&live_path, &bak_path).map_err(DbError::Io)?;
let replace_res = fs.rename(&tmp_path, &live_path);
if let Err(e) = replace_res {
let _ = fs.rename(&bak_path, &live_path);
let _ = fs.remove_file(&tmp_path);
return Err(DbError::Io(e));
}
#[cfg(unix)]
{
if let Ok(dir_f) = fs.open_dir(parent) {
let _ = dir_f.sync_all();
}
}
let _ = fs.remove_file(&bak_path);
let old_registry = self.writer_registry.take();
self.store.release_writer_lock();
let reopened = match (|| {
let store = FileStore::open_locked(&live_path, OpenMode::ReadWrite)?;
Self::open_with_store(
live_path.clone(),
store,
crate::config::OpenOptions::default(),
)
})() {
Ok(db) => db,
Err(e) => {
let _ = fs.rename(&bak_path, &live_path);
if let Ok(store) = FileStore::open_locked(&live_path, OpenMode::ReadWrite) {
self.store = store;
}
self.writer_registry = old_registry;
return Err(e);
}
};
let mut reopened = reopened;
reopened.writer_registry = old_registry;
*self = reopened;
self.shared_mirror = Some(handle_registry::register(
&live_path,
handle_registry::SharedDbState {
catalog: self.catalog.clone(),
latest: self.latest.clone(),
indexes: self.indexes.clone(),
segment_start: self.segment_start,
format_minor: self.format_minor,
generation: 0,
},
)?);
self.push_shared_mirror();
#[cfg(feature = "tracing")]
tracing::info!(bytes = bytes.len(), "database_compact_in_place_ok");
Ok(())
}
pub fn export_snapshot_to_path(&mut self, dest_path: impl AsRef<Path>) -> Result<(), DbError> {
self.export_snapshot_to_path_with_fsops(&StdFsOps, dest_path)
}
pub(crate) fn export_snapshot_to_path_with_fsops(
&mut self,
fs: &dyn FsOps,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
self.checkpoint()?;
let dest_path = dest_path.as_ref();
let len = self.store.len()?;
let len_usize = usize::try_from(len)
.map_err(|_| DbError::Io(std::io::Error::other("database file too large")))?;
let mut bytes = vec![0u8; len_usize];
self.store.read_exact_at(0, &mut bytes)?;
Database::<VecStore>::export_snapshot_to_path_with_fsops(fs, dest_path, &bytes)?;
if let Ok(f) = fs.open_read(dest_path) {
let _ = f.sync_all();
}
#[cfg(unix)]
best_effort_fsync_parent_dir(fs, dest_path);
Ok(())
}
pub fn restore_snapshot_to_path(
snapshot_path: impl AsRef<Path>,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
Self::restore_snapshot_to_path_with_fsops(&StdFsOps, snapshot_path, dest_path)
}
pub(crate) fn restore_snapshot_to_path_with_fsops(
fs: &dyn FsOps,
snapshot_path: impl AsRef<Path>,
dest_path: impl AsRef<Path>,
) -> Result<(), DbError> {
let snapshot_path = snapshot_path.as_ref();
let dest_path = dest_path.as_ref();
let parent = dest_path
.parent()
.ok_or_else(|| DbError::Io(std::io::Error::other("no parent")))?;
let pid = std::process::id();
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let base = dest_path
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("db.modelvault");
let tmp_path = parent.join(format!("{base}.restore.{pid}.{nanos}.tmp"));
let bak_path = parent.join(format!("{base}.restore.{pid}.{nanos}.bak"));
fs.copy(snapshot_path, &tmp_path).map_err(DbError::Io)?;
if let Ok(f) = fs.open_read(&tmp_path) {
let _ = f.sync_all();
}
if dest_path.exists() {
let _ = fs.remove_file(&bak_path);
fs.rename(dest_path, &bak_path).map_err(DbError::Io)?;
}
let replace_res = fs.rename(&tmp_path, dest_path);
if let Err(e) = replace_res {
if bak_path.exists() {
let _ = fs.rename(&bak_path, dest_path);
}
let _ = fs.remove_file(&tmp_path);
return Err(DbError::Io(e));
}
#[cfg(unix)]
{
if let Ok(dir_f) = fs.open_dir(parent) {
let _ = dir_f.sync_all();
}
}
let _ = fs.remove_file(&bak_path);
Ok(())
}
#[doc(hidden)]
pub fn read_image_for_test(&mut self) -> Result<Vec<u8>, DbError> {
let len = self.store.len()?;
let len_usize = usize::try_from(len)
.map_err(|_| DbError::Io(std::io::Error::other("database file too large")))?;
let mut bytes = vec![0u8; len_usize];
self.store.read_exact_at(0, &mut bytes)?;
Ok(bytes)
}
}