use std::collections::HashMap;
use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Lsn(pub u64);
impl Lsn {
pub const ZERO: Lsn = Lsn(0);
pub fn next(&self) -> Lsn {
Lsn(self.0 + 1)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[repr(u8)]
pub enum WalRecordType {
Intent = 1,
Operation = 2,
Commit = 3,
Abort = 4,
Checkpoint = 5,
}
impl WalRecordType {
pub fn to_canonical(self) -> sochdb_core::txn::WalRecordType {
match self {
Self::Intent => sochdb_core::txn::WalRecordType::TxnBegin,
Self::Operation => sochdb_core::txn::WalRecordType::Data,
Self::Commit => sochdb_core::txn::WalRecordType::TxnCommit,
Self::Abort => sochdb_core::txn::WalRecordType::TxnAbort,
Self::Checkpoint => sochdb_core::txn::WalRecordType::Checkpoint,
}
}
pub fn from_canonical(rt: sochdb_core::txn::WalRecordType) -> Option<Self> {
match rt {
sochdb_core::txn::WalRecordType::TxnBegin => Some(Self::Intent),
sochdb_core::txn::WalRecordType::Data => Some(Self::Operation),
sochdb_core::txn::WalRecordType::TxnCommit => Some(Self::Commit),
sochdb_core::txn::WalRecordType::TxnAbort => Some(Self::Abort),
sochdb_core::txn::WalRecordType::Checkpoint => Some(Self::Checkpoint),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalRecord {
pub lsn: Lsn,
pub record_type: WalRecordType,
pub intent_id: u64,
pub payload: WalPayload,
pub timestamp: u64,
pub checksum: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalPayload {
IntentStart {
memory_id: String,
op_count: usize,
},
Operation {
op_index: usize,
op_type: String,
key: Vec<u8>,
value: Option<Vec<u8>>,
},
Commit,
Abort {
reason: String,
},
Checkpoint {
last_committed_lsn: Lsn,
intent_count: usize,
},
}
impl WalRecord {
pub fn new(lsn: Lsn, record_type: WalRecordType, intent_id: u64, payload: WalPayload) -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let mut record = Self {
lsn,
record_type,
intent_id,
payload,
timestamp,
checksum: 0,
};
record.checksum = record.compute_checksum();
record
}
fn compute_checksum(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(&self.lsn.0.to_le_bytes());
hasher.update(&[self.record_type as u8]);
hasher.update(&self.intent_id.to_le_bytes());
hasher.update(&self.timestamp.to_le_bytes());
if let Ok(payload_bytes) = bincode::serialize(&self.payload) {
hasher.update(&payload_bytes);
}
hasher.finalize()
}
pub fn verify(&self) -> bool {
let expected = {
let mut temp = self.clone();
temp.checksum = 0;
temp.compute_checksum()
};
self.checksum == expected
}
pub fn to_bytes(&self) -> io::Result<Vec<u8>> {
bincode::serialize(self)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
pub fn from_bytes(bytes: &[u8]) -> io::Result<Self> {
bincode::deserialize(bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
}
#[derive(Debug, Clone)]
pub struct WalConfig {
pub dir: PathBuf,
pub max_file_size: u64,
pub sync_mode: SyncMode,
pub buffer_size: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncMode {
EveryRecord,
OnCommit,
Periodic,
None,
}
impl Default for WalConfig {
fn default() -> Self {
Self {
dir: PathBuf::from("./wal"),
max_file_size: 64 * 1024 * 1024, sync_mode: SyncMode::OnCommit,
buffer_size: 64 * 1024, }
}
}
pub struct WalWriter {
config: WalConfig,
current_file: Mutex<Option<BufWriter<File>>>,
current_lsn: AtomicU64,
current_file_size: AtomicU64,
current_file_id: AtomicU64,
}
impl WalWriter {
pub fn new(config: WalConfig) -> io::Result<Self> {
std::fs::create_dir_all(&config.dir)?;
let writer = Self {
config,
current_file: Mutex::new(None),
current_lsn: AtomicU64::new(1),
current_file_size: AtomicU64::new(0),
current_file_id: AtomicU64::new(1),
};
writer.rotate_if_needed()?;
Ok(writer)
}
pub fn current_lsn(&self) -> Lsn {
Lsn(self.current_lsn.load(Ordering::SeqCst))
}
fn next_lsn(&self) -> Lsn {
Lsn(self.current_lsn.fetch_add(1, Ordering::SeqCst))
}
fn rotate_if_needed(&self) -> io::Result<()> {
let size = self.current_file_size.load(Ordering::Relaxed);
if size >= self.config.max_file_size || self.current_file.lock().is_none() {
let file_id = self.current_file_id.fetch_add(1, Ordering::SeqCst);
let path = self.config.dir.join(format!("wal_{:016x}.log", file_id));
let file = OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&path)?;
let mut writer = BufWriter::with_capacity(self.config.buffer_size, file);
let header = b"SOCHWAL1"; writer.write_all(header)?;
*self.current_file.lock() = Some(writer);
self.current_file_size.store(8, Ordering::Relaxed);
}
Ok(())
}
pub fn append(&self, record: WalRecord) -> io::Result<Lsn> {
self.rotate_if_needed()?;
let bytes = record.to_bytes()?;
let record_len = bytes.len() as u32;
let mut file = self.current_file.lock();
if let Some(ref mut writer) = *file {
writer.write_all(&record_len.to_le_bytes())?;
writer.write_all(&bytes)?;
match self.config.sync_mode {
SyncMode::EveryRecord => writer.flush()?,
SyncMode::OnCommit if record.record_type == WalRecordType::Commit => {
writer.flush()?;
writer.get_ref().sync_all()?;
}
_ => {}
}
self.current_file_size.fetch_add(4 + bytes.len() as u64, Ordering::Relaxed);
}
Ok(record.lsn)
}
pub fn write_intent(&self, intent_id: u64, memory_id: &str, op_count: usize) -> io::Result<Lsn> {
let lsn = self.next_lsn();
let record = WalRecord::new(
lsn,
WalRecordType::Intent,
intent_id,
WalPayload::IntentStart {
memory_id: memory_id.to_string(),
op_count,
},
);
self.append(record)
}
pub fn write_operation(
&self,
intent_id: u64,
op_index: usize,
op_type: &str,
key: &[u8],
value: Option<&[u8]>,
) -> io::Result<Lsn> {
let lsn = self.next_lsn();
let record = WalRecord::new(
lsn,
WalRecordType::Operation,
intent_id,
WalPayload::Operation {
op_index,
op_type: op_type.to_string(),
key: key.to_vec(),
value: value.map(|v| v.to_vec()),
},
);
self.append(record)
}
pub fn write_commit(&self, intent_id: u64) -> io::Result<Lsn> {
let lsn = self.next_lsn();
let record = WalRecord::new(
lsn,
WalRecordType::Commit,
intent_id,
WalPayload::Commit,
);
self.append(record)
}
pub fn write_abort(&self, intent_id: u64, reason: &str) -> io::Result<Lsn> {
let lsn = self.next_lsn();
let record = WalRecord::new(
lsn,
WalRecordType::Abort,
intent_id,
WalPayload::Abort {
reason: reason.to_string(),
},
);
self.append(record)
}
pub fn write_checkpoint(&self, last_committed_lsn: Lsn, intent_count: usize) -> io::Result<Lsn> {
let lsn = self.next_lsn();
let record = WalRecord::new(
lsn,
WalRecordType::Checkpoint,
0,
WalPayload::Checkpoint {
last_committed_lsn,
intent_count,
},
);
self.append(record)?;
if let Some(ref mut writer) = *self.current_file.lock() {
writer.flush()?;
writer.get_ref().sync_all()?;
}
Ok(lsn)
}
pub fn sync(&self) -> io::Result<()> {
if let Some(ref mut writer) = *self.current_file.lock() {
writer.flush()?;
writer.get_ref().sync_all()?;
}
Ok(())
}
}
pub struct WalReader {
dir: PathBuf,
}
impl WalReader {
pub fn new(dir: impl AsRef<Path>) -> Self {
Self {
dir: dir.as_ref().to_path_buf(),
}
}
pub fn list_files(&self) -> io::Result<Vec<PathBuf>> {
let mut files: Vec<PathBuf> = std::fs::read_dir(&self.dir)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| {
p.extension().map(|e| e == "log").unwrap_or(false)
&& p.file_name()
.map(|n| n.to_string_lossy().starts_with("wal_"))
.unwrap_or(false)
})
.collect();
files.sort();
Ok(files)
}
pub fn read_file(&self, path: &Path) -> io::Result<Vec<WalRecord>> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut header = [0u8; 8];
reader.read_exact(&mut header)?;
if &header != b"SOCHWAL1" {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid WAL header",
));
}
let mut records = Vec::new();
loop {
let mut len_buf = [0u8; 4];
match reader.read_exact(&mut len_buf) {
Ok(_) => {}
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e),
}
let len = u32::from_le_bytes(len_buf) as usize;
let mut record_buf = vec![0u8; len];
reader.read_exact(&mut record_buf)?;
let record = WalRecord::from_bytes(&record_buf)?;
if !record.verify() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Checksum mismatch at LSN {:?}", record.lsn),
));
}
records.push(record);
}
Ok(records)
}
pub fn read_all(&self) -> io::Result<Vec<WalRecord>> {
let mut all_records = Vec::new();
for path in self.list_files()? {
let records = self.read_file(&path)?;
all_records.extend(records);
}
all_records.sort_by_key(|r| r.lsn);
Ok(all_records)
}
pub fn read_since(&self, checkpoint_lsn: Lsn) -> io::Result<Vec<WalRecord>> {
let all = self.read_all()?;
Ok(all.into_iter().filter(|r| r.lsn > checkpoint_lsn).collect())
}
}
pub struct WalAtomicWriter {
wal: Arc<WalWriter>,
next_intent_id: AtomicU64,
pending: RwLock<HashMap<u64, PendingIntent>>,
}
#[derive(Debug)]
struct PendingIntent {
intent_id: u64,
memory_id: String,
start_lsn: Lsn,
ops_completed: usize,
total_ops: usize,
}
impl WalAtomicWriter {
pub fn new(wal: Arc<WalWriter>) -> Self {
Self {
wal,
next_intent_id: AtomicU64::new(1),
pending: RwLock::new(HashMap::new()),
}
}
pub fn begin(&self, memory_id: &str, op_count: usize) -> io::Result<u64> {
let intent_id = self.next_intent_id.fetch_add(1, Ordering::SeqCst);
let lsn = self.wal.write_intent(intent_id, memory_id, op_count)?;
self.pending.write().insert(intent_id, PendingIntent {
intent_id,
memory_id: memory_id.to_string(),
start_lsn: lsn,
ops_completed: 0,
total_ops: op_count,
});
Ok(intent_id)
}
pub fn record_op(
&self,
intent_id: u64,
op_type: &str,
key: &[u8],
value: Option<&[u8]>,
) -> io::Result<Lsn> {
let op_index = {
let mut pending = self.pending.write();
let intent = pending.get_mut(&intent_id)
.ok_or_else(|| io::Error::new(
io::ErrorKind::NotFound,
format!("Intent {} not found", intent_id),
))?;
let idx = intent.ops_completed;
intent.ops_completed += 1;
idx
};
self.wal.write_operation(intent_id, op_index, op_type, key, value)
}
pub fn commit(&self, intent_id: u64) -> io::Result<Lsn> {
let lsn = self.wal.write_commit(intent_id)?;
self.pending.write().remove(&intent_id);
Ok(lsn)
}
pub fn abort(&self, intent_id: u64, reason: &str) -> io::Result<Lsn> {
let lsn = self.wal.write_abort(intent_id, reason)?;
self.pending.write().remove(&intent_id);
Ok(lsn)
}
pub fn pending_count(&self) -> usize {
self.pending.read().len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_wal_write_read() -> io::Result<()> {
let tmp = TempDir::new()?;
let config = WalConfig {
dir: tmp.path().to_path_buf(),
sync_mode: SyncMode::None,
..Default::default()
};
let writer = WalWriter::new(config)?;
let lsn1 = writer.write_intent(1, "memory1", 2)?;
let lsn2 = writer.write_operation(1, 0, "PUT", b"key1", Some(b"value1"))?;
let lsn3 = writer.write_operation(1, 1, "PUT", b"key2", Some(b"value2"))?;
let lsn4 = writer.write_commit(1)?;
writer.sync()?;
let reader = WalReader::new(tmp.path());
let records = reader.read_all()?;
assert_eq!(records.len(), 4);
assert_eq!(records[0].lsn, lsn1);
assert_eq!(records[3].lsn, lsn4);
assert_eq!(records[3].record_type, WalRecordType::Commit);
Ok(())
}
#[test]
fn test_atomic_writer() -> io::Result<()> {
let tmp = TempDir::new()?;
let config = WalConfig {
dir: tmp.path().to_path_buf(),
sync_mode: SyncMode::None,
..Default::default()
};
let wal = Arc::new(WalWriter::new(config)?);
let writer = WalAtomicWriter::new(wal.clone());
let intent_id = writer.begin("test_memory", 2)?;
assert_eq!(writer.pending_count(), 1);
writer.record_op(intent_id, "PUT", b"key1", Some(b"value1"))?;
writer.record_op(intent_id, "PUT", b"key2", Some(b"value2"))?;
writer.commit(intent_id)?;
assert_eq!(writer.pending_count(), 0);
Ok(())
}
}