use std::collections::{HashMap, HashSet};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use parking_lot::{Mutex, RwLock};
use crate::wal_atomic::{Lsn, WalConfig, WalPayload, WalReader, WalRecord, WalRecordType, WalWriter};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryIntentStatus {
Started,
InProgress,
Completed,
Committed,
Aborted,
Unknown,
}
#[derive(Debug)]
pub struct RecoveryIntent {
pub intent_id: u64,
pub memory_id: String,
pub start_lsn: Lsn,
pub expected_ops: usize,
pub operations: Vec<RecoveryOperation>,
pub status: RecoveryIntentStatus,
pub commit_lsn: Option<Lsn>,
pub abort_lsn: Option<Lsn>,
pub timestamp: u64,
}
#[derive(Debug, Clone)]
pub struct RecoveryOperation {
pub op_index: usize,
pub op_type: String,
pub key: Vec<u8>,
pub value: Option<Vec<u8>>,
pub lsn: Lsn,
}
#[derive(Debug, Clone)]
pub struct RecoveryConfig {
pub wal_dir: PathBuf,
pub max_parallel: usize,
pub timeout: Duration,
pub redo_incomplete: bool,
}
impl Default for RecoveryConfig {
fn default() -> Self {
Self {
wal_dir: PathBuf::from("./wal"),
max_parallel: 4,
timeout: Duration::from_secs(30),
redo_incomplete: true,
}
}
}
#[derive(Debug)]
pub struct RecoveryResult {
pub intents_recovered: usize,
pub intents_replayed: usize,
pub intents_rolled_back: usize,
pub ops_redone: usize,
pub ops_undone: usize,
pub last_lsn: Lsn,
pub duration: Duration,
pub errors: Vec<String>,
}
pub type ApplyCallback = Box<dyn Fn(&RecoveryOperation) -> io::Result<()> + Send + Sync>;
pub type UndoCallback = Box<dyn Fn(&RecoveryOperation) -> io::Result<()> + Send + Sync>;
pub struct RecoveryEngine {
config: RecoveryConfig,
reader: WalReader,
apply_callback: Option<ApplyCallback>,
undo_callback: Option<UndoCallback>,
}
impl RecoveryEngine {
pub fn new(config: RecoveryConfig) -> Self {
let reader = WalReader::new(&config.wal_dir);
Self {
config,
reader,
apply_callback: None,
undo_callback: None,
}
}
pub fn with_apply<F>(mut self, f: F) -> Self
where
F: Fn(&RecoveryOperation) -> io::Result<()> + Send + Sync + 'static,
{
self.apply_callback = Some(Box::new(f));
self
}
pub fn with_undo<F>(mut self, f: F) -> Self
where
F: Fn(&RecoveryOperation) -> io::Result<()> + Send + Sync + 'static,
{
self.undo_callback = Some(Box::new(f));
self
}
pub fn recover(&self) -> io::Result<RecoveryResult> {
let start = Instant::now();
let records = self.reader.read_all()?;
let intents = self.build_intent_map(&records);
let (to_redo, to_undo) = self.classify_intents(&intents);
let mut ops_redone = 0;
let mut ops_undone = 0;
let mut errors = Vec::new();
for intent in &to_redo {
match self.redo_intent(intent) {
Ok(n) => ops_redone += n,
Err(e) => errors.push(format!("Redo intent {}: {}", intent.intent_id, e)),
}
}
for intent in &to_undo {
match self.undo_intent(intent) {
Ok(n) => ops_undone += n,
Err(e) => errors.push(format!("Undo intent {}: {}", intent.intent_id, e)),
}
}
let last_lsn = records.last().map(|r| r.lsn).unwrap_or(Lsn::ZERO);
Ok(RecoveryResult {
intents_recovered: intents.len(),
intents_replayed: to_redo.len(),
intents_rolled_back: to_undo.len(),
ops_redone,
ops_undone,
last_lsn,
duration: start.elapsed(),
errors,
})
}
fn build_intent_map(&self, records: &[WalRecord]) -> HashMap<u64, RecoveryIntent> {
let mut intents: HashMap<u64, RecoveryIntent> = HashMap::new();
for record in records {
match &record.payload {
WalPayload::IntentStart { memory_id, op_count } => {
intents.insert(record.intent_id, RecoveryIntent {
intent_id: record.intent_id,
memory_id: memory_id.clone(),
start_lsn: record.lsn,
expected_ops: *op_count,
operations: Vec::new(),
status: RecoveryIntentStatus::Started,
commit_lsn: None,
abort_lsn: None,
timestamp: record.timestamp,
});
}
WalPayload::Operation { op_index, op_type, key, value } => {
if let Some(intent) = intents.get_mut(&record.intent_id) {
intent.operations.push(RecoveryOperation {
op_index: *op_index,
op_type: op_type.clone(),
key: key.clone(),
value: value.clone(),
lsn: record.lsn,
});
intent.status = RecoveryIntentStatus::InProgress;
}
}
WalPayload::Commit => {
if let Some(intent) = intents.get_mut(&record.intent_id) {
intent.status = RecoveryIntentStatus::Committed;
intent.commit_lsn = Some(record.lsn);
}
}
WalPayload::Abort { .. } => {
if let Some(intent) = intents.get_mut(&record.intent_id) {
intent.status = RecoveryIntentStatus::Aborted;
intent.abort_lsn = Some(record.lsn);
}
}
WalPayload::Checkpoint { .. } => {
}
}
}
intents
}
fn classify_intents<'a>(
&self,
intents: &'a HashMap<u64, RecoveryIntent>,
) -> (Vec<&'a RecoveryIntent>, Vec<&'a RecoveryIntent>) {
let mut to_redo = Vec::new();
let mut to_undo = Vec::new();
for intent in intents.values() {
match intent.status {
RecoveryIntentStatus::Started | RecoveryIntentStatus::InProgress => {
if self.config.redo_incomplete {
to_redo.push(intent);
} else {
to_undo.push(intent);
}
}
RecoveryIntentStatus::Completed => {
to_redo.push(intent);
}
RecoveryIntentStatus::Aborted => {
to_undo.push(intent);
}
RecoveryIntentStatus::Committed | RecoveryIntentStatus::Unknown => {
}
}
}
(to_redo, to_undo)
}
fn redo_intent(&self, intent: &RecoveryIntent) -> io::Result<usize> {
if let Some(ref callback) = self.apply_callback {
let mut applied = 0;
let mut ops = intent.operations.clone();
ops.sort_by_key(|op| op.op_index);
for op in &ops {
callback(op)?;
applied += 1;
}
Ok(applied)
} else {
Ok(0)
}
}
fn undo_intent(&self, intent: &RecoveryIntent) -> io::Result<usize> {
if let Some(ref callback) = self.undo_callback {
let mut undone = 0;
let mut ops = intent.operations.clone();
ops.sort_by_key(|op| std::cmp::Reverse(op.op_index));
for op in &ops {
callback(op)?;
undone += 1;
}
Ok(undone)
} else {
Ok(0)
}
}
}
#[derive(Debug, Clone)]
pub struct GcConfig {
pub wal_dir: PathBuf,
pub min_age: Duration,
pub max_wal_size: u64,
pub checkpoint_interval: Duration,
pub background: bool,
}
impl Default for GcConfig {
fn default() -> Self {
Self {
wal_dir: PathBuf::from("./wal"),
min_age: Duration::from_secs(300), max_wal_size: 1024 * 1024 * 1024, checkpoint_interval: Duration::from_secs(60),
background: true,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct GcStats {
pub intents_collected: usize,
pub files_removed: usize,
pub bytes_reclaimed: u64,
pub last_gc: Option<Instant>,
pub total_runs: u64,
}
pub struct GarbageCollector {
config: GcConfig,
wal: Arc<WalWriter>,
stats: RwLock<GcStats>,
last_checkpoint_lsn: AtomicU64,
stop_flag: AtomicU64,
}
impl GarbageCollector {
pub fn new(config: GcConfig, wal: Arc<WalWriter>) -> Self {
Self {
config,
wal,
stats: RwLock::new(GcStats::default()),
last_checkpoint_lsn: AtomicU64::new(0),
stop_flag: AtomicU64::new(0),
}
}
pub fn run_once(&self) -> io::Result<GcStats> {
let start = Instant::now();
let reader = WalReader::new(&self.config.wal_dir);
let records = reader.read_all()?;
let committed: HashSet<u64> = records.iter()
.filter(|r| r.record_type == WalRecordType::Commit)
.map(|r| r.intent_id)
.collect();
let mut safe_lsn = Lsn::ZERO;
let mut all_committed = true;
for record in &records {
if record.record_type == WalRecordType::Intent {
if !committed.contains(&record.intent_id) {
all_committed = false;
break;
}
}
if all_committed {
safe_lsn = record.lsn;
}
}
let checkpoint_lsn = self.wal.write_checkpoint(safe_lsn, committed.len())?;
self.last_checkpoint_lsn.store(checkpoint_lsn.0, Ordering::SeqCst);
let files = reader.list_files()?;
let mut files_removed = 0;
let mut bytes_reclaimed = 0u64;
for file in &files {
if let Ok(file_records) = reader.read_file(file) {
let max_lsn = file_records.iter()
.map(|r| r.lsn)
.max()
.unwrap_or(Lsn::ZERO);
if max_lsn <= safe_lsn {
if let Ok(metadata) = std::fs::metadata(file) {
if let Ok(modified) = metadata.modified() {
if let Ok(age) = modified.elapsed() {
if age >= self.config.min_age {
bytes_reclaimed += metadata.len();
std::fs::remove_file(file)?;
files_removed += 1;
}
}
}
}
}
}
}
let mut stats = self.stats.write();
stats.intents_collected = committed.len();
stats.files_removed += files_removed;
stats.bytes_reclaimed += bytes_reclaimed;
stats.last_gc = Some(start);
stats.total_runs += 1;
Ok(stats.clone())
}
pub fn start_background(self: Arc<Self>) -> thread::JoinHandle<()> {
let gc = self.clone();
thread::spawn(move || {
while gc.stop_flag.load(Ordering::Relaxed) == 0 {
thread::sleep(gc.config.checkpoint_interval);
if let Err(e) = gc.run_once() {
tracing::warn!("GC error: {}", e);
}
}
})
}
pub fn stop(&self) {
self.stop_flag.store(1, Ordering::SeqCst);
}
pub fn stats(&self) -> GcStats {
self.stats.read().clone()
}
pub fn last_checkpoint(&self) -> Lsn {
Lsn(self.last_checkpoint_lsn.load(Ordering::SeqCst))
}
}
pub struct RecoveryManager {
recovery_engine: RecoveryEngine,
gc: Arc<GarbageCollector>,
}
impl RecoveryManager {
pub fn new(
recovery_config: RecoveryConfig,
gc_config: GcConfig,
wal: Arc<WalWriter>,
) -> Self {
Self {
recovery_engine: RecoveryEngine::new(recovery_config),
gc: Arc::new(GarbageCollector::new(gc_config, wal)),
}
}
pub fn recover(&self) -> io::Result<RecoveryResult> {
self.recovery_engine.recover()
}
pub fn gc(&self) -> io::Result<GcStats> {
self.gc.run_once()
}
pub fn start_background_gc(&self) -> thread::JoinHandle<()> {
self.gc.clone().start_background()
}
pub fn stop_gc(&self) {
self.gc.stop();
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_recovery_intent_classification() {
let config = RecoveryConfig {
wal_dir: PathBuf::from("/tmp/test"),
redo_incomplete: true,
..Default::default()
};
let engine = RecoveryEngine::new(config);
let mut intents = HashMap::new();
intents.insert(1, RecoveryIntent {
intent_id: 1,
memory_id: "mem1".to_string(),
start_lsn: Lsn(1),
expected_ops: 2,
operations: vec![],
status: RecoveryIntentStatus::Committed,
commit_lsn: Some(Lsn(10)),
abort_lsn: None,
timestamp: 0,
});
intents.insert(2, RecoveryIntent {
intent_id: 2,
memory_id: "mem2".to_string(),
start_lsn: Lsn(11),
expected_ops: 2,
operations: vec![],
status: RecoveryIntentStatus::InProgress,
commit_lsn: None,
abort_lsn: None,
timestamp: 0,
});
intents.insert(3, RecoveryIntent {
intent_id: 3,
memory_id: "mem3".to_string(),
start_lsn: Lsn(20),
expected_ops: 2,
operations: vec![],
status: RecoveryIntentStatus::Aborted,
commit_lsn: None,
abort_lsn: Some(Lsn(25)),
timestamp: 0,
});
let (to_redo, to_undo) = engine.classify_intents(&intents);
assert_eq!(to_redo.len(), 1);
assert_eq!(to_redo[0].intent_id, 2);
assert_eq!(to_undo.len(), 1);
assert_eq!(to_undo[0].intent_id, 3);
}
#[test]
fn test_gc_config() {
let config = GcConfig::default();
assert_eq!(config.min_age, Duration::from_secs(300));
assert!(config.background);
}
}