use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use crate::error::{Result, StoreError};
use crate::types::{Key, Value};
pub const JOURNAL_VERSION: u32 = 1;
pub const DEFAULT_JOURNAL_FILE_NAME: &str = "store.journal";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "op", rename_all = "snake_case")]
pub enum JournalOperation {
Set {
key: String,
value: String,
},
Delete {
key: String,
},
Clear,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct JournalEntry {
pub version: u32,
pub sequence: u64,
pub operation: JournalOperation,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Journal {
path: PathBuf,
}
impl Journal {
#[must_use]
pub fn new(path: impl Into<PathBuf>) -> Self {
Self { path: path.into() }
}
#[must_use]
pub fn alongside_store(store_path: &Path) -> Self {
let path = store_path.with_file_name(DEFAULT_JOURNAL_FILE_NAME);
Self::new(path)
}
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
#[must_use]
pub fn exists(&self) -> bool {
self.path.is_file()
}
pub fn append(&self, entry: &JournalEntry) -> Result<()> {
validate_entry(entry)?;
if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent).map_err(|source| StoreError::PreparePath {
path: parent.to_path_buf(),
source,
})?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.path)
.map_err(|source| StoreError::Write {
path: self.path.clone(),
source,
})?;
let mut writer = BufWriter::new(file);
let line = serde_json::to_string(entry).map_err(|error| StoreError::Serialize {
message: format!("failed to serialize journal entry: {error}"),
})?;
writer
.write_all(line.as_bytes())
.map_err(|source| StoreError::Write {
path: self.path.clone(),
source,
})?;
writer
.write_all(b"\n")
.map_err(|source| StoreError::Write {
path: self.path.clone(),
source,
})?;
writer.flush().map_err(|source| StoreError::Write {
path: self.path.clone(),
source,
})?;
Ok(())
}
pub fn read_all(&self) -> Result<Vec<JournalEntry>> {
if !self.exists() {
return Ok(Vec::new());
}
let file = File::open(&self.path).map_err(|source| StoreError::Read {
path: self.path.clone(),
source,
})?;
let reader = BufReader::new(file);
let mut entries = Vec::new();
for (line_number, line) in reader.lines().enumerate() {
let line = line.map_err(|source| StoreError::Read {
path: self.path.clone(),
source,
})?;
if line.trim().is_empty() {
continue;
}
let entry: JournalEntry =
serde_json::from_str(&line).map_err(|error| StoreError::Malformed {
reason: format!("invalid journal line {}: {}", line_number + 1, error),
})?;
validate_entry(&entry)?;
entries.push(entry);
}
Ok(entries)
}
pub fn clear(&self) -> Result<()> {
if !self.exists() {
return Ok(());
}
File::create(&self.path).map_err(|source| StoreError::Write {
path: self.path.clone(),
source,
})?;
Ok(())
}
pub fn remove(&self) -> Result<()> {
if !self.exists() {
return Ok(());
}
fs::remove_file(&self.path).map_err(|source| StoreError::Write {
path: self.path.clone(),
source,
})?;
Ok(())
}
}
impl JournalEntry {
pub fn set(sequence: u64, key: &Key, value: &Value) -> Self {
Self {
version: JOURNAL_VERSION,
sequence,
operation: JournalOperation::Set {
key: key.as_str().to_owned(),
value: value.as_str().to_owned(),
},
}
}
pub fn delete(sequence: u64, key: &Key) -> Self {
Self {
version: JOURNAL_VERSION,
sequence,
operation: JournalOperation::Delete {
key: key.as_str().to_owned(),
},
}
}
#[must_use]
pub const fn clear(sequence: u64) -> Self {
Self {
version: JOURNAL_VERSION,
sequence,
operation: JournalOperation::Clear,
}
}
}
fn validate_entry(entry: &JournalEntry) -> Result<()> {
if entry.version != JOURNAL_VERSION {
return Err(StoreError::Malformed {
reason: format!("unsupported journal version: {}", entry.version),
}
.into());
}
match &entry.operation {
JournalOperation::Set { key, value } => {
let _ = Key::new(key.clone())?;
let _ = Value::new(value.clone())?;
}
JournalOperation::Delete { key } => {
let _ = Key::new(key.clone())?;
}
JournalOperation::Clear => {}
}
Ok(())
}