use core::marker::PhantomData;
use core::sync::atomic::{AtomicU64, Ordering};
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Read as _, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use fs2::FileExt;
use parking_lot::{Mutex, RwLock};
use serde::Serialize;
use serde::de::DeserializeOwned;
use super::{MemoryStorage, Storage};
use crate::sampler::CompletedTrial;
pub struct JournalStorage<V = f64> {
memory: MemoryStorage<V>,
path: PathBuf,
io_lock: Mutex<()>,
file_offset: AtomicU64,
_marker: PhantomData<V>,
}
impl<V: Serialize + DeserializeOwned + Send + Sync> JournalStorage<V> {
#[must_use]
pub fn new(path: impl AsRef<Path>) -> Self {
let path = path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| path.as_ref().to_path_buf());
Self {
memory: MemoryStorage::new(),
path,
io_lock: Mutex::new(()),
file_offset: AtomicU64::new(0),
_marker: PhantomData,
}
}
pub fn open(path: impl AsRef<Path>) -> crate::Result<Self> {
let path = path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| path.as_ref().to_path_buf());
let (trials, offset) = load_trials_from_file(&path)?;
Ok(Self {
memory: MemoryStorage::with_trials(trials),
path,
io_lock: Mutex::new(()),
file_offset: AtomicU64::new(offset),
_marker: PhantomData,
})
}
fn write_to_file(&self, trial: &CompletedTrial<V>) -> crate::Result<()> {
let _guard = self.io_lock.lock();
let mut file = OpenOptions::new()
.create(true)
.truncate(false)
.read(true)
.write(true)
.open(&self.path)
.map_err(|e| crate::Error::Storage(e.to_string()))?;
file.lock_exclusive()
.map_err(|e| crate::Error::Storage(e.to_string()))?;
file.seek(SeekFrom::End(0))
.map_err(|e| crate::Error::Storage(e.to_string()))?;
let line =
serde_json::to_string(trial).map_err(|e| crate::Error::Storage(e.to_string()))?;
writeln!(file, "{line}").map_err(|e| crate::Error::Storage(e.to_string()))?;
file.sync_data()
.map_err(|e| crate::Error::Storage(e.to_string()))?;
file.unlock()
.map_err(|e| crate::Error::Storage(e.to_string()))?;
Ok(())
}
}
impl<V: Serialize + DeserializeOwned + Send + Sync> Storage<V> for JournalStorage<V> {
fn push(&self, trial: CompletedTrial<V>) {
let _ = self.write_to_file(&trial);
self.memory.push(trial);
}
fn trials_arc(&self) -> &Arc<RwLock<Vec<CompletedTrial<V>>>> {
self.memory.trials_arc()
}
fn next_trial_id(&self) -> u64 {
self.memory.next_trial_id()
}
fn peek_next_trial_id(&self) -> u64 {
self.memory.peek_next_trial_id()
}
fn refresh(&self) -> bool {
let _guard = self.io_lock.lock();
let Ok(file) = File::open(&self.path) else {
return false;
};
if file.lock_shared().is_err() {
return false;
}
let offset = self.file_offset.load(Ordering::SeqCst);
let file_size = if let Ok(m) = file.metadata() {
m.len()
} else {
let _ = file.unlock();
return false;
};
if file_size <= offset {
let _ = file.unlock();
return false;
}
let mut buf = String::new();
let mut handle = &file;
if handle.seek(SeekFrom::Start(offset)).is_err() {
let _ = file.unlock();
return false;
}
if handle.read_to_string(&mut buf).is_err() {
let _ = file.unlock();
return false;
}
let _ = file.unlock();
let bytes_read = buf.len() as u64;
let new_offset = offset + bytes_read;
let mut new_trials = Vec::new();
for line in buf.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let trial: CompletedTrial<V> = match serde_json::from_str(line) {
Ok(t) => t,
Err(_) => return false,
};
if trial.validate().is_err() {
return false;
}
new_trials.push(trial);
}
if new_trials.is_empty() {
self.file_offset.fetch_max(new_offset, Ordering::SeqCst);
return false;
}
let mut mem_guard = self.memory.trials_arc().write();
let existing_ids: std::collections::HashSet<u64> = mem_guard.iter().map(|t| t.id).collect();
new_trials.retain(|t| !existing_ids.contains(&t.id));
if let Some(max_id) = new_trials.iter().map(|t| t.id).max() {
self.memory.bump_next_id(max_id + 1);
}
let added = !new_trials.is_empty();
mem_guard.extend(new_trials);
self.file_offset.fetch_max(new_offset, Ordering::SeqCst);
added
}
}
fn load_trials_from_file<V: DeserializeOwned>(
path: &Path,
) -> crate::Result<(Vec<CompletedTrial<V>>, u64)> {
let file = match File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok((Vec::new(), 0)),
Err(e) => return Err(crate::Error::Storage(e.to_string())),
};
file.lock_shared()
.map_err(|e| crate::Error::Storage(e.to_string()))?;
let file_size = file
.metadata()
.map_err(|e| crate::Error::Storage(e.to_string()))?
.len();
let reader = BufReader::new(&file);
let mut trials = Vec::new();
for line in reader.lines() {
let line = line.map_err(|e| crate::Error::Storage(e.to_string()))?;
let line = line.trim();
if line.is_empty() {
continue;
}
let trial: CompletedTrial<V> =
serde_json::from_str(line).map_err(|e| crate::Error::Storage(e.to_string()))?;
trial.validate().map_err(crate::Error::Storage)?;
trials.push(trial);
}
file.unlock()
.map_err(|e| crate::Error::Storage(e.to_string()))?;
Ok((trials, file_size))
}