use std::{path::Path, sync::Arc};
use super::{CheckpointPolicy, Db};
use crate::{
Catalog, CheckpointGeneration, CommitSeq, DbError, TransactionId,
backing::Base,
freeze::{self, FreezeStamps},
overlay::{Overlay, Snapshot, WriteOverlay},
state::NextIds,
storage, wal,
wire::SuperblockRecord,
};
pub(super) fn base_file(generation: u64) -> String {
format!("base-{generation}.oxgdb")
}
pub(super) fn delta_file(generation: u64) -> String {
format!("delta-{generation}.log")
}
impl Db {
pub fn create(path: impl AsRef<Path>) -> Result<Self, DbError> {
let root = path.as_ref().to_path_buf();
if root.join(wal::SUPERBLOCK_FILE).exists() {
return Err(DbError::Storage(crate::error::StorageError::AlreadyExists));
}
let empty_base = crate::overlay::BaseRecords::empty();
let empty_overlay = Overlay::empty(NextIds::INITIAL, Catalog::empty());
let view = crate::overlay::MergedState::new(&empty_base, &empty_overlay);
let base_bytes = freeze::freeze_view(
&view,
FreezeStamps {
commit_seq: 0,
transaction_id: 0,
generation: 0,
},
)?;
storage::atomic_write(
&root,
&root.join(format!("{}.tmp", base_file(0))),
&root.join(base_file(0)),
&base_bytes,
)?;
create_empty_log(&root, 0)?;
write_superblock(&root, 0, 0, 0, 0)?;
Self::open(&root)
}
pub fn open(path: impl AsRef<Path>) -> Result<Self, DbError> {
let root = path.as_ref().to_path_buf();
let superblock = wal::read_superblock(&root)?;
let generation = superblock.base_generation.get();
let base = Arc::new(Base::open(&root.join(base_file(generation)), false)?);
let base_records = Arc::new(crate::overlay::BaseRecords::open(&base)?);
let base_header = *base.get().header();
let base_catalog = base.get().catalog().clone();
let base_next = NextIds::from_header(&base_header);
let log_path = root.join(delta_file(generation));
let log_bytes = read_log(&log_path)?;
let outcome = wal::replay(generation, &log_bytes)?;
if outcome.valid_len < log_bytes.len() {
truncate_log(&log_path, outcome.valid_len)?;
}
let mut write = WriteOverlay::new(base_next, base_catalog);
let mut recovered_next = base_next;
let mut last_commit_seq = superblock.commit_seq.get();
let mut last_txn = superblock.transaction_id.get();
for frame in &outcome.frames {
for op in &frame.ops {
write.apply_replay_op(&base_records, op, &frame.blob, frame.lsn)?;
}
recovered_next = recovered_next.elementwise_max(write.next_ids());
last_commit_seq = frame.lsn;
last_txn = last_txn.max(frame.txn_id);
}
write.set_next_ids(recovered_next);
let overlay = Arc::new(write.freeze());
let snapshot = Arc::new(Snapshot::with_shared_base_records(
CheckpointGeneration::new(generation),
CommitSeq::new(last_commit_seq),
base,
overlay,
base_records,
));
Ok(Self {
root,
current: snapshot,
base_generation: generation,
last_transaction_id: TransactionId::new(last_txn),
checkpoint_policy: CheckpointPolicy::default(),
})
}
}
pub(super) fn file_len(path: &Path) -> u64 {
std::fs::metadata(path).map_or(0, |meta| meta.len())
}
pub(super) fn read_log(path: &Path) -> Result<Vec<u8>, DbError> {
match std::fs::read(path) {
Ok(bytes) => Ok(bytes),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
Err(error) => Err(DbError::io("read delta-log", error)),
}
}
pub(super) fn truncate_log(path: &Path, len: usize) -> Result<(), DbError> {
let file = std::fs::OpenOptions::new()
.write(true)
.open(path)
.map_err(|error| DbError::io("open delta-log for truncate", error))?;
let len = u64::try_from(len)
.map_err(|_overflow| DbError::invalid_store("delta-log length overflow"))?;
file.set_len(len)
.map_err(|error| DbError::io("truncate delta-log", error))?;
file.sync_all()
.map_err(|error| DbError::io("sync truncated delta-log", error))
}
pub(super) fn create_empty_log(root: &Path, generation: u64) -> Result<(), DbError> {
let path = root.join(delta_file(generation));
let file =
std::fs::File::create(&path).map_err(|error| DbError::io("create delta-log", error))?;
file.sync_all()
.map_err(|error| DbError::io("sync delta-log", error))?;
Ok(storage::sync_directory(root)?)
}
pub(super) fn open_log_for_append(root: &Path, generation: u64) -> Result<std::fs::File, DbError> {
std::fs::OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.append(true)
.open(root.join(delta_file(generation)))
.map_err(|error| DbError::io("open delta-log for append", error))
}
pub(super) fn write_superblock(
root: &Path,
generation: u64,
checkpoint_lsn: u64,
commit_seq: u64,
transaction_id: u64,
) -> Result<(), DbError> {
Ok(wal::write_superblock(
root,
&SuperblockRecord {
magic: crate::wire::SUPERBLOCK_MAGIC,
base_generation: generation.into(),
checkpoint_lsn: checkpoint_lsn.into(),
log_byte_offset: 0u64.into(),
commit_seq: commit_seq.into(),
transaction_id: transaction_id.into(),
format_version: crate::wire::OXGDB_FORMAT_VERSION.into(),
flags: 0u32.into(),
crc32c: 0u32.into(),
pad: 0u32.into(),
},
)?)
}