use std::fmt::Debug;
use std::fs;
use std::fs::OpenOptions;
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use crate::{Round, SignedProposal, SignedVote};
pub(crate) const WAL_FILE_PREFIX: &str = "wal-";
pub(crate) const WAL_FILE_EXTENSION: &str = "json";
pub(crate) fn filename(address: &impl ToString, height: u64) -> String {
let address = address.to_string();
format!("{WAL_FILE_PREFIX}{address}-{height}.{WAL_FILE_EXTENSION}")
}
pub(crate) fn delete_wal_file(
address: &impl ToString,
height: u64,
wal_dir: &Path,
) -> Result<(), std::io::Error> {
let filename = filename(address, height);
let path = wal_dir.join(&filename);
if path.exists() {
fs::remove_file(&path).map_err(|e| {
std::io::Error::other(format!(
"Failed to delete WAL file {}: {}",
path.display(),
e
))
})?;
tracing::debug!(
path = %path.display(),
"Deleted WAL file for pruned height"
);
}
Ok(())
}
pub(crate) trait WalSink<V, A>: Send {
fn append(&mut self, entry: WalEntry<V, A>);
fn is_finalized(&self) -> bool {
false }
fn mark_as_finalized(&mut self) {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum WalEntry<V, A> {
SignedProposal(SignedProposal<V, A>),
SignedVote(SignedVote<V, A>),
Timeout { kind: String, round: Round },
ProposedValue {
height: u64,
round: Round,
valid_round: Round,
proposer: A,
value: V,
validity: bool,
},
Decision { height: u64, value: V },
}
pub struct FileWalSink {
file: std::fs::File,
path: PathBuf,
has_decision: bool,
}
impl FileWalSink {
pub fn new(address: &impl ToString, height: u64, wal_dir: &Path) -> std::io::Result<Self> {
std::fs::create_dir_all(wal_dir)?;
let filename = filename(address, height);
let path = wal_dir.join(filename);
let file = OpenOptions::new().create(true).append(true).open(&path)?;
Ok(Self {
file,
path,
has_decision: false,
})
}
}
impl Drop for FileWalSink {
fn drop(&mut self) {
if let Err(e) = self.file.flush() {
tracing::error!(
path = %self.path.display(),
error = %e,
"Failed to flush WAL file before drop"
);
}
tracing::debug!(
path = %self.path.display(),
"Keeping WAL file for potential recovery (will be deleted during pruning if outside `history_depth`)"
);
}
}
impl<V: Serialize, A: Serialize> WalSink<V, A> for FileWalSink {
fn append(&mut self, entry: WalEntry<V, A>) {
if matches!(entry, WalEntry::Decision { .. }) {
self.has_decision = true;
tracing::debug!(
path = %self.path.display(),
"Marking WAL as finalized - decision reached"
);
}
let line = serde_json::to_string(&entry).expect("WAL serialization failed");
writeln!(self.file, "{line}").expect("WAL write failed");
}
fn is_finalized(&self) -> bool {
self.has_decision
}
fn mark_as_finalized(&mut self) {
self.has_decision = true;
}
}
pub(crate) struct NoopWal;
impl<V: Debug, A: Debug> WalSink<V, A> for NoopWal {
fn append(&mut self, entry: WalEntry<V, A>) {
tracing::debug!("NoopWal: Appending entry: {:?}", entry);
}
fn is_finalized(&self) -> bool {
false }
}
pub(crate) mod recovery {
use super::*;
pub(crate) fn extract_height_from_filename(path: &Path) -> u64 {
let filename = path.file_name().unwrap_or_default().to_string_lossy();
let height_str = filename
.strip_prefix(WAL_FILE_PREFIX)
.and_then(|s| s.strip_suffix(".json"))
.and_then(|s| s.split('-').nth(1))
.unwrap_or_default();
height_str.parse::<u64>().unwrap_or_else(|_| {
tracing::warn!(
filename = %filename,
path = %path.display(),
"Failed to parse height from filename, using 0"
);
0
})
}
pub(crate) fn collect_wal_files(wal_dir: &Path) -> Result<Vec<(u64, PathBuf)>, std::io::Error> {
let mut files = Vec::new();
let dir = fs::read_dir(wal_dir).map_err(|e| {
std::io::Error::other(format!(
"Failed to read WAL directory {}: {}",
wal_dir.display(),
e
))
})?;
for entry in dir {
let entry = entry?;
let path = entry.path();
if path.is_file()
&& path.extension().unwrap_or_default() == WAL_FILE_EXTENSION
&& path
.file_name()
.unwrap_or_default()
.to_string_lossy()
.starts_with(WAL_FILE_PREFIX)
{
let height = extract_height_from_filename(&path);
files.push((height, path));
}
}
files.sort_by_key(|(height, _)| *height);
Ok(files)
}
pub(crate) fn read_entries<V, A>(path: &Path) -> Result<Vec<WalEntry<V, A>>, std::io::Error>
where
V: for<'de> Deserialize<'de>,
A: for<'de> Deserialize<'de>,
{
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut entries = Vec::new();
for (line_num, line) in reader.lines().enumerate() {
let line = line?;
if !line.trim().is_empty() {
let entry: WalEntry<V, A> = serde_json::from_str(&line).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Failed to parse WAL entry at line {} in {}: {}",
line_num + 1,
path.display(),
e
),
)
})?;
entries.push(entry);
}
}
Ok(entries)
}
#[allow(clippy::type_complexity)]
pub(crate) fn recover_incomplete_heights<V, A>(
wal_dir: &Path,
highest_committed: Option<u64>,
) -> Result<
(
Vec<(u64, Vec<WalEntry<V, A>>)>,
Vec<(u64, Vec<WalEntry<V, A>>)>,
Option<u64>,
),
std::io::Error,
>
where
V: for<'de> Deserialize<'de>,
A: for<'de> Deserialize<'de>,
{
if !wal_dir.exists() {
tracing::info!(
wal_dir = %wal_dir.display(),
"WAL directory does not exist, no recovery needed"
);
return Ok((Vec::new(), Vec::new(), None));
}
let files = collect_wal_files(wal_dir)?;
tracing::info!(
files = ?files,
"Recovering incomplete heights from WAL",
);
let mut incomplete = Vec::new();
let mut finalized = Vec::new();
let mut highest_decision: Option<u64> = None;
for (height, path) in files {
let entries: Vec<WalEntry<V, A>> = read_entries(&path)?;
for entry in &entries {
if let WalEntry::Decision {
height: decision_height,
..
} = entry
{
let decision_height: u64 = *decision_height;
highest_decision = match highest_decision {
Some(current_max) => Some(std::cmp::max(current_max, decision_height)),
None => Some(decision_height),
};
}
}
let is_finalized = entries.iter().any(|e| {
matches!(e, WalEntry::Decision { .. })
|| highest_committed
.is_some_and(|highest_committed| height <= highest_committed)
});
if is_finalized {
tracing::debug!(
height = %height,
path = %path.display(),
"\"Recovering\" finalized height (may be restored if within history_depth)"
);
finalized.push((height, entries));
} else {
tracing::debug!(
height = %height,
path = %path.display(),
entry_count = entries.len(),
"Recovering incomplete height"
);
incomplete.push((height, entries));
}
}
Ok((incomplete, finalized, highest_decision))
}
}