use std::{fs, io};
use crate::utils::{path_type, PathBufExt};
use chrono::NaiveDate;
path_type! {
ServerDataDir: dir
}
impl ServerDataDir {
pub fn config_toml(&self) -> ConfigToml {
ConfigToml(self.0.join("config.toml"))
}
pub fn logs(&self) -> LogsDir {
LogsDir(self.0.join("logs"))
}
pub fn wasmtime_cache(&self) -> WasmtimeCacheDir {
WasmtimeCacheDir(self.0.join("cache/wasmtime"))
}
pub fn metadata_toml(&self) -> MetadataTomlPath {
MetadataTomlPath(self.0.join("metadata.toml"))
}
pub fn pid_file(&self) -> Result<PidFile, PidFileError> {
use fs2::FileExt;
use io::{Read, Write};
self.create()?;
let path = self.0.join("spacetime.pid");
let mut file = fs::File::options()
.create(true)
.write(true)
.truncate(false)
.read(true)
.open(&path)?;
match file.try_lock_exclusive() {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
let mut s = String::new();
let pid = file.read_to_string(&mut s).ok().and_then(|_| s.trim().parse().ok());
return Err(PidFileError::Exists { pid });
}
Err(e) => return Err(e.into()),
}
let mut pidfile = PidFile { file, path };
pidfile.file.set_len(0)?;
write!(pidfile.file, "{}", std::process::id())?;
pidfile.file.flush()?;
Ok(pidfile)
}
pub fn replica(&self, replica_id: u64) -> ReplicaDir {
ReplicaDir(self.0.join("replicas").joined_int(replica_id))
}
}
path_type! {
ConfigToml: file
}
path_type! {
LogsDir: dir
}
impl LogsDir {
pub fn filename_prefix(edition: &str) -> String {
format!("spacetime-{edition}")
}
pub fn filename_extension() -> String {
"log".to_owned()
}
}
path_type! {
WasmtimeCacheDir: dir
}
path_type! {
MetadataTomlPath: file
}
#[derive(thiserror::Error, Debug)]
pub enum PidFileError {
#[error("error while taking database lock on spacetime.pid")]
Io(#[from] io::Error),
#[error("cannot take lock on database; spacetime.pid already exists (owned by pid {pid:?})")]
Exists { pid: Option<u32> },
}
pub struct PidFile {
file: fs::File,
path: std::path::PathBuf,
}
impl Drop for PidFile {
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
}
path_type! {
ReplicaDir: dir
}
impl ReplicaDir {
pub fn module_logs(self) -> ModuleLogsDir {
ModuleLogsDir(self.0.joined("module_logs"))
}
pub fn snapshots(&self) -> SnapshotsPath {
SnapshotsPath(self.0.join("snapshots"))
}
pub fn commit_log(&self) -> CommitLogDir {
CommitLogDir(self.0.join("clog"))
}
}
path_type! {
ModuleLogsDir: dir
}
impl ModuleLogsDir {
pub fn logfile(self, date: NaiveDate) -> ModuleLogPath {
ModuleLogPath(self.0.joined(format!("{date}.log")))
}
pub fn today(self) -> ModuleLogPath {
self.logfile(chrono::Utc::now().date_naive())
}
pub fn most_recent(self) -> io::Result<Option<ModuleLogPath>> {
let mut max_file_name = None;
for entry in std::fs::read_dir(&self)? {
let file_name = entry?.file_name();
max_file_name = std::cmp::max(max_file_name, Some(file_name));
}
Ok(max_file_name.map(|file_name| ModuleLogPath(self.0.joined(file_name))))
}
}
path_type! {
ModuleLogPath: file
}
impl ModuleLogPath {
pub fn date(&self) -> NaiveDate {
self.0
.file_stem()
.and_then(|s| s.to_str()?.parse().ok())
.expect("ModuleLogPath should always have a filename of the form `{date}.log`")
}
pub fn with_date(&self, date: NaiveDate) -> Self {
Self(self.0.with_file_name(format!("{date}.log")))
}
pub fn yesterday(&self) -> Self {
self.with_date(self.date().pred_opt().unwrap())
}
pub fn popped(mut self) -> ModuleLogsDir {
self.0.pop();
ModuleLogsDir(self.0)
}
}
path_type! {
SnapshotsPath: dir
}
impl SnapshotsPath {
pub fn snapshot_dir(&self, tx_offset: u64) -> SnapshotDirPath {
let dir_name = format!("{tx_offset:0>20}.snapshot_dir");
SnapshotDirPath(self.0.join(dir_name))
}
}
path_type! {
SnapshotDirPath: dir
}
impl SnapshotDirPath {
pub fn snapshot_file(&self, tx_offset: u64) -> SnapshotFilePath {
let file_name = format!("{tx_offset:0>20}.snapshot_bsatn");
SnapshotFilePath(self.0.join(file_name))
}
pub fn objects(&self) -> SnapshotObjectsPath {
SnapshotObjectsPath(self.0.join("objects"))
}
pub fn rename_invalid(&self) -> io::Result<()> {
let invalid_path = self.0.with_extension("invalid_snapshot");
fs::rename(self, invalid_path)
}
pub fn tx_offset(&self) -> Option<u64> {
self.0
.file_stem()
.and_then(|s| s.to_str()?.split('.').next()?.parse::<u64>().ok())
}
}
path_type! {
SnapshotFilePath: file
}
path_type! {
SnapshotObjectsPath: dir
}
path_type! {
CommitLogDir: dir
}
impl CommitLogDir {
pub fn segment(&self, offset: u64) -> SegmentFile {
let file_name = format!("{offset:0>20}.stdb.log");
SegmentFile(self.0.join(file_name))
}
pub fn index(&self, offset: u64) -> OffsetIndexFile {
let file_name = format!("{offset:0>20}.stdb.ofs");
OffsetIndexFile(self.0.join(file_name))
}
}
path_type!(SegmentFile: file);
path_type!(OffsetIndexFile: file);
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use std::fs;
use tempfile::TempDir;
#[test]
fn test_pid_file_is_written() -> Result<()> {
let tempdir = TempDir::new()?;
let sdd = ServerDataDir(tempdir.path().to_path_buf());
let lock = sdd.pid_file()?;
let pidstring = fs::read_to_string(lock.path.clone())?;
let _pid = pidstring.trim().parse::<u32>()?;
Ok(())
}
#[test]
fn test_pid_is_exclusive() -> Result<()> {
let tempdir = TempDir::new()?;
let sdd = ServerDataDir(tempdir.path().to_path_buf());
let lock = sdd.pid_file()?;
let pidstring = fs::read_to_string(lock.path.clone())?;
let _pid = pidstring.trim().parse::<u32>()?;
let attempt = sdd.pid_file();
assert!(attempt.is_err());
drop(lock);
sdd.pid_file()?;
Ok(())
}
#[test]
fn test_snapshot_parsing() -> Result<()> {
let tempdir = TempDir::new()?;
let sdd = ServerDataDir(tempdir.path().to_path_buf());
const SNAPSHOT_OFFSET: u64 = 123456;
let snapshot_dir = sdd.replica(1).snapshots().snapshot_dir(SNAPSHOT_OFFSET);
assert_eq!(Some(SNAPSHOT_OFFSET), snapshot_dir.tx_offset());
Ok(())
}
}